Kafka

When using XTDB at scale it is recommended to use multiple XTDB nodes connected via a Kafka cluster. Use of multiple nodes provides availability and Kafka itself provides strong fault-tolerance guarantees. Kafka can be used for XTDB’s transaction log and document store components.

Each XTDB node must be assigned a unique groupId for correct operation.

Local Cluster Mode

Kafka’s document store requires a copy of the documents kept locally for random access - these can be stored in a KV store like RocksDB or LMDB.

For this reason, unless you want to keep both transactions and documents on Kafka (e.g. for high write throughput, or for architectural simplicity), we now recommend a different document store implementation - JDBC or S3, for example.

(The Kafka transaction log does not have this requirement)

Project Dependencies

  • deps.edn

  • pom.xml

com.xtdb/xtdb-kafka {:mvn/version "1.24.3"}
<dependency>
    <groupId>com.xtdb</groupId>
    <artifactId>xtdb-kafka</artifactId>
    <version>1.24.3</version>
</dependency>

Example configuration

Kafka as a Transaction Log

  • JSON

  • Clojure

  • EDN

{
  "xtdb/tx-log": {
    "xtdb/module": "xtdb.kafka/->tx-log",
    "kafka-config": {
      "bootstrap-servers": "localhost:9092",
      ...
    },

    "tx-topic-opts": {
      "topic-name": "crux-transaction-log",
      ...
    },

    "poll-wait-duration": "PT1S"
  },

  ...
}
{:xtdb/tx-log {:xtdb/module 'xtdb.kafka/->tx-log
               :kafka-config {:bootstrap-servers "localhost:9092"}
               :tx-topic-opts {:topic-name "crux-transaction-log"}
               :poll-wait-duration (Duration/ofSeconds 1)}
 ...}
{:xtdb/tx-log {:xtdb/module xtdb.kafka/->tx-log
               :kafka-config {:bootstrap-servers "localhost:9092"}
               :tx-topic-opts {:topic-name "crux-transaction-log"}
               :poll-wait-duration "PT1S"}
 ...}

Kafka as a Document Store

  • JSON

  • Clojure

  • EDN

{
  "xtdb/document-store": {
    "xtdb/module": "xtdb.kafka/->document-store",
    "kafka-config": {
      "bootstrap-servers": "localhost:9092",
      ...
    },
    "doc-topic-opts": {
      "topic-name": "crux-docs",
      ...
    },
    "local-document-store": {
      "kv-store": {
        "xtdb/module": "xtdb.rocksdb/->kv-store",
        "db-dir": "/tmp/rocksdb"
      }
    },
    "poll-wait-duration": "PT1S"
  },

  ...
}
{:xtdb/document-store {:xtdb/module 'xtdb.kafka/->document-store
                       :kafka-config {:bootstrap-servers "localhost:9092"
                                      ...}
                       :doc-topic-opts {:topic-name "crux-docs"
                                        ...}
                       :local-document-store {:kv-store {:xtdb/module 'xtdb.rocksdb/->kv-store
                                                         :db-dir (io/file "/tmp/rocksdb")}}
                       :poll-wait-duration (Duration/ofSeconds 1)}
 ...}
{:xtdb/document-store {:xtdb/module xtdb.kafka/->document-store
                       :kafka-config {:bootstrap-servers "localhost:9092"
                                      ...}
                       :doc-topic-opts {:topic-name "crux-docs"
                                        ...}
                       :local-document-store {:kv-store {:xtdb/module xtdb.rocksdb/->kv-store
                                                         :db-dir "/tmp/rocksdb"}}
                       :poll-wait-duration "PT1S"}
 ...}

If you do not want the local node to index transactions, you can use the xtdb.kafka/->submit-only-document-store module.

Sharing the local KV store

You can safely use the same KV storage to hold both the local document store and the main query indices, which can potentially simplify related configurations (e.g. checkpointing) and improve resource utilization.

  • JSON

  • Clojure

  • EDN

{
  "local-rocksdb": {
    "xtdb/module": "xtdb.rocksdb/->kv-store",
    "db-dir": "/tmp/rocksdb"
  },

  "xtdb/document-store": {
    ...
    "local-document-store": {
      "kv-store": "local-rocksdb"
    }
  },

  "xtdb/index-store": {
    "kv-store": "local-rocksdb"
  }

  ...
}
{...
 :local-rocksdb {:xtdb/module 'xtdb.rocksdb/->kv-store
                 :db-dir (io/file "/tmp/rocksdb")}
 :xtdb/document-store {...
                       :local-document-store {:kv-store :local-rocksdb}}
 :xtdb/index-store {:kv-store :local-rocksdb}}
{...
 :local-rocksdb {:xtdb/module xtdb.rocksdb/->kv-store
                 :db-dir "/tmp/rocksdb"}
 :xtdb/document-store {...
                       :local-document-store {:kv-store :local-rocksdb}}
 :xtdb/index-store {:kv-store :local-rocksdb}}

Sharing connection config between the transaction log and the document store

If you’re using Kafka for both the transaction log and the document store, you can share connection config between them:

  • JSON

  • Clojure

  • EDN

{
  "kafka-config": {
    "xtdb/module": "xtdb.kafka/->kafka-config",
    "bootstrap-servers": "localhost:9092",
    ...
  },

  "xtdb/tx-log": {
    "xtdb/module": "xtdb.kafka/->tx-log",
    "kafka-config": "kafka-config",
    ...
  }

  "xtdb/document-store": {
    "xtdb/module": "xtdb.kafka/->document-store",
    "kafka-config": "kafka-config",
    ...
  }
}
{:kafka-config {:xtdb/module 'xtdb.kafka/->kafka-config
                :bootstrap-servers "localhost:9092"
                ...}
 :xtdb/tx-log {:xtdb/module 'xtdb.kafka/->tx-log
               :kafka-config :kafka-config
               ...}
 :xtdb/document-store {:xtdb/module 'xtdb.kafka/->document-store
                       :kafka-config :kafka-config
                       ...}}
{:kafka-config {:xtdb/module xtdb.kafka/->kafka-config
                :bootstrap-servers "localhost:9092"
                ...}
 :xtdb/tx-log {:xtdb/module xtdb.kafka/->tx-log
               :kafka-config :kafka-config
               ...}
 :xtdb/document-store {:xtdb/module xtdb.kafka/->document-store
                       :kafka-config :kafka-config
                       ...}}

Parameters

Connection config (xtdb.kafka/->kafka-config)

  • tx-topic-opts (topic options)

  • bootstrap-servers (string, default "localhost:9092"): URL for connecting to Kafka

  • properties-file (string/File/Path): Kafka connection properties file, supplied directly to Kafka

  • properties-map (map): Kafka connection properties map, supplied directly to Kafka

Topic options (xtdb.kafka/->topic-opts)

  • topic-name (string, required, default "tx-topic" for tx-log, "doc-topic" for document-store)

  • num-partitions (int, default 1)

  • replication-factor (int, default 1): level of durability for Kafka

  • create-topics? (boolean, default true): whether to create topics if they do not exist

  • topic-config (map): any further topic config to pass directly to Kafka

Transaction log (xtdb.kafka/->tx-log)

  • kafka-config (connection config)

  • tx-topic-opts (topic options)

  • poll-wait-duration (string/Duration, default 1 second, "PT1S"): time to wait on each Kafka poll.

Note

poll-wait-duration specifies the duration for which open-tx-log will wait for a record to show up on the transaction topic. If for any reason (e.g., latency to the Kafka broker) no record was received during that time, the call would return an empty cursor (i.e., on which .hasNext is false). For this reason, the Kafka implementation of open-tx-log accepts also a map (instead of the with-ops? Boolean) with the following optional keys:

 {:with-ops? true
  :kafka/poll-wait-duration (Duration/ofMillis 2000)
 }

Document store (xtdb.kafka/->document-store)

  • kafka-config (connection config)

  • doc-topic-opts (topic options)

  • local-document-store (document store, default local in-memory kv-store)

  • poll-wait-duration (string/Duration, default 1 second, "PT1S"): time to wait on each Kafka poll.

Submit-only document store (xtdb.kafka/->submit-only-document-store)

  • kafka-config (connection config)

  • tx-topic-opts (topic options)