Kafka Connect

Introduction

A Kafka Connect plugin for transferring data between XTDB nodes and Kafka.

The XTDB source connector will publish transacations on a node to a Kafka topic, and the sink connector can receive transactions from a Kafka topic and submit them to a node.

Table 1. Currently supported data formats
Data format Sink/Source

JSON

Both

Avro

Sink

Transit

Source

EDN

Both

To get started with the connector, there are two separate guides (depending on whether you are using a full Confluent Platform installation, or a basic Kafka installation):

Confluent Platform Quickstart

Installing the connector

Use confluent-hub install juxt/kafka-connect-crux:1.23.3 to download and install the connector from Confluent hub. The downloaded connector is then placed within your confluent install’s 'share/confluent-hub-components' folder.

The connector can be used as either a source or a sink. In either case, there should be an associated XTDB node to communicate with.

Creating the XTDB node

To use our connector, you must first have a XTDB node connected to Kafka. To do this, we start by adding the following dependencies to a project:

com.xtdb/xtdb-core {:mvn/version "1.23.3"}
com.xtdb/xtdb-kafka {:mvn/version "1.23.3"}
com.xtdb/xtdb-http-server {:mvn/version "1.23.3"}
com.xtdb/xtdb-rocksdb {:mvn/version "1.23.3"}

Ensure first that you have a running Kafka broker to connect to. We import the dependencies into a file or REPL, then create our Kafka connected 'node' with an associated http server for the connector to communicate with:

(require '[xtdb.api :as xt]
         '[xtdb.http-server :as srv])
(import (xtdb.api IXtdb))

(def ^xtdb.api.IXtdb node
  (crux/start-node {:crux.node/topology '[xtdb.kafka/topology crux.http-server/module]
                    :crux.kafka/bootstrap-servers "localhost:9092"
                    :xtdb.http-server/port 3000}))

Sink Connector

Run the following command within the base of the Confluent folder, to create a worker which connects to the 'connect-test' topic, ready to send messages to the node. This also makes use of connect-file-source, checking for changes in a file called 'test.txt':

./bin/connect-standalone etc/kafka/connect-standalone.properties share/confluent-hub-components/juxt-kafka-connect-crux/etc/local-crux-sink.properties etc/kafka/connect-file-source.properties

Run the following within your Confluent directory, to add a line of JSON to 'test.txt':

echo '{"xt/id": "415c45c9-7cbe-4660-801b-dab9edc60c84", "value": "baz"}' >> test.txt

Now, verify that this was transacted within your REPL:

(xt/entity (xt/db node) "415c45c9-7cbe-4660-801b-dab9edc60c84")
==>
{:xt/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c84", :value "baz"}

Source Connector

Run the following command within the base of the Confluent folder, to create a worker connects to the 'connect-test' topic, ready to receive messages from the node. This also makes use of 'connect-file-sink', outputting transactions to your node within 'test.sink.txt':

./bin/connect-standalone etc/kafka/connect-standalone.properties share/confluent-hub-components/juxt-kafka-connect-crux/etc/local-crux-source.properties etc/kafka/connect-file-sink.properties

Within your REPL, transact an element into XTDB:

(xt/submit-tx node [[::xt/put {:xt/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c82", :value "baz-source"}]])

Check the contents of 'test.sink.txt' using the command below, and you should see that the transactions were outputted to the 'connect-test' topic:

tail test.sink.txt
==>
[[::xt/put {:xt/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c82", :value "baz-source"} #inst "2019-09-19T12:31:21.342-00:00"]]

Kafka Quickstart

Installing the connector

Download the connector from Confluent hub, then unzip the downloaded folder:

unzip juxt-kafka-connect-xtdb-1.23.3.zip

Navigate into the base of the Kafka folder, then run the following commands:

cp $CONNECTOR_PATH/lib/*-standalone.jar $KAFKA_HOME/libs
cp $CONNECTOR_PATH/etc/*.properties $KAFKA_HOME/config

The connector can be used as either a source or a sink. In either case, there should be an associated XTDB node to communicate with.

Creating the XTDB node

To use our connector, you must first have a XTDB node connected to Kafka. To do this, we start by adding the following dependencies to a project:

com.xtdb/xtdb-core {:mvn/version "1.23.3"}
com.xtdb/xtdb-kafka {:mvn/version "1.23.3"}
com.xtdb/xtdb-http-server {:mvn/version "1.23.3"}
com.xtdb/xtdb-rocksdb {:mvn/version "1.23.3"}

Ensure first that you have a running Kafka broker to connect to. We import the dependencies into a file or REPL, then create our Kafka connected 'node' with an associated http server for the connector to communicate with:

(require '[xtdb.api :as xt]
         '[xtdb.http-server :as srv])
(import (xtdb.api IXtdb))

(def ^xtdb.api.IXtdb node
  (crux/start-node {:crux.node/topology '[xtdb.kafka/topology crux.http-server/module]
                    :crux.kafka/bootstrap-servers "localhost:9092"
                    :xtdb.http-server/port 3000}))

Sink Connector

Run the following command within the base of the Kafka folder, to create a worker which connects to the 'connect-test' topic, ready to send messages to the node. This also makes use of connect-file-source, checking for changes in a file called 'test.txt':

./bin/connect-standalone.sh config/connect-standalone.properties config/local-xtdb-sink.properties config/connect-file-source.properties

Run the following within your Kafka directory, to add a line of JSON to 'test.txt':

echo '{"xt/id": "415c45c9-7cbe-4660-801b-dab9edc60c84", "value": "baz"}' >> test.txt

Now, verify that this was transacted within your REPL:

(xt/entity (xt/db node) "415c45c9-7cbe-4660-801b-dab9edc60c84")
==>
{:xt/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c84", :value "baz"}

Source Connector

Run the following command within the base of the Kafka folder, to create a worker connects to the 'connect-test' topic, ready to receive messages from the node. This also makes use of 'connect-file-sink', outputting transactions to your node within 'test.sink.txt':

./bin/connect-standalone.sh config/connect-standalone.properties config/local-xtdb-source.properties config/connect-file-sink.properties

Within your REPL, transact an element into XTDB:

(xt/submit-tx node [[::xt/put {:xt/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c82", :value "baz-source"}]])

Check the contents of 'test.sink.txt' using the command below, and you should see that the transactions were outputted to the 'connect-test' topic:

tail test.sink.txt
==>
[[::xt/put {:xt/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c82", :value "baz-source"} #inst "2019-09-19T12:31:21.342-00:00"]]

Source Configuration

url
  • Destination URL of XTDB HTTP end point

  • Type: String

  • Importance: High

  • Default: "http://localhost:3000"

topic
  • The Kafka topic to publish data to

  • Type: String

  • Importance: High

  • Default: "connect-test"

format
  • Format to send data out as: edn, json or transit

  • Type: String

  • Importance: Low

  • Default: "edn"

mode
  • Mode to use: tx or doc

  • Type: String

  • Importance: Low

  • Default: "tx"

batch.size
  • The maximum number of records the Source task can read from XTDB at one time.

  • Type: Int

  • Importance: LOW

  • Default: 2000

Sink Configuration

url
  • Destination URL of XTDB HTTP end point

  • Type: String

  • Importance: High

  • Default: "http://localhost:3000"

id.key
  • Record key to use as :xt/id

  • Type: String

  • Importance: Low

  • Default: "xt/id"