The Kafka table engine can be used to read data from and write data to Apache Kafka and other Kafka API-compatible brokers (e.g., Redpanda, Amazon MSK).Documentation Index
Fetch the complete documentation index at: https://private-7c7dfe99-page-updates.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Kafka to ClickHouse
If you’re on ClickHouse Cloud, we recommend using ClickPipes instead. ClickPipes natively supports private network connections, scaling ingestion and cluster resources independently, and comprehensive monitoring for streaming Kafka data into ClickHouse.
Overview
Initially, we focus on the most common use case: using the Kafka table engine to insert data into ClickHouse from Kafka. The Kafka table engine allows ClickHouse to read from a Kafka topic directly. Whilst useful for viewing messages on a topic, the engine by design only permits one-time retrieval, i.e. when a query is issued to the table, it consumes data from the queue and increases the consumer offset before returning results to the caller. Data can’t, in effect, be re-read without resetting these offsets. To persist this data from a read of the table engine, we need a means of capturing the data and inserting it into another table. Trigger-based materialized views natively provide this functionality. A materialized view initiates a read on the table engine, receiving batches of documents. The TO clause determines the destination of the data - typically a table of the Merge Tree family. This process is visualized below:Steps
If you have data populated on a target topic, you can adapt the following for use in your dataset. Alternatively, a sample Github dataset is provided here. This dataset is used in the examples below and uses a reduced schema and subset of the rows (specifically, we limit to Github events concerning the ClickHouse repository), compared to the full dataset available here, for brevity. This is still sufficient for most of the queries published with the dataset to work. This step is required if you’re connecting to a secure Kafka. These settings can’t be passed through the SQL DDL commands and must be configured in the ClickHouse config.xml. We assume you’re connecting to a SASL secured instance. This is the simplest method when interacting with Confluent Cloud.KafkaEngine to use in this tutorial:
github with 5 partitions by running the following command:
JSONEachRow as the data type for consuming JSON from a Kafka topic. The values github and clickhouse represent the name of the topic and consumer group names, respectively. The topics can actually be a list of values.
github_queue should read some rows. Note that this will move the consumer offsets forward, preventing these rows from being re-read without a reset. Note the limit and required parameter stream_like_engine_allow_direct_select.
The materialized view will connect the two previously created tables, reading data from the Kafka table engine and inserting it into the target merge tree table. We can do a number of data transformations. We will do a simple read and insert. The use of * assumes column names are identical (case sensitive).
Common operations
To stop message consumption, you can detach the Kafka engine table:_.
A complete listing of virtual columns can be found here.
To update our table with the virtual columns, we’ll need to drop the materialized view, re-attach the Kafka engine table, and re-create the materialized view.
| actor_login | event_type | created_at | topic | partition |
|---|---|---|---|---|
| IgorMinar | CommitCommentEvent | 2011-02-12 02:22:00 | github | 0 |
| queeup | CommitCommentEvent | 2011-02-12 02:23:23 | github | 0 |
| IgorMinar | CommitCommentEvent | 2011-02-12 02:23:24 | github | 0 |
| IgorMinar | CommitCommentEvent | 2011-02-12 02:24:50 | github | 0 |
| IgorMinar | CommitCommentEvent | 2011-02-12 02:25:20 | github | 0 |
| dapi | CommitCommentEvent | 2011-02-12 06:18:36 | github | 0 |
| sourcerebels | CommitCommentEvent | 2011-02-12 06:34:10 | github | 0 |
| jamierumbelow | CommitCommentEvent | 2011-02-12 12:21:40 | github | 0 |
| jpn | CommitCommentEvent | 2011-02-12 12:24:31 | github | 0 |
| Oxonium | CommitCommentEvent | 2011-02-12 12:31:28 | github | 0 |
- Treat the message field as strings. Functions can be used in the materialized view statement to perform cleansing and casting if required. This shouldn’t represent a production solution but might assist in one-off ingestion.
- If you’re consuming JSON from a topic, using the JSONEachRow format, use the setting
input_format_skip_unknown_fields. When writing data, by default, ClickHouse throws an exception if input data contains columns that don’t exist in the target table. However, if this option is enabled, these excess columns will be ignored. Again this isn’t a production-level solution and might confuse others. - Consider the setting
kafka_skip_broken_messages. This requires the user to specify the level of tolerance per block for malformed messages - considered in the context of kafka_max_block_size. If this tolerance is exceeded (measured in absolute messages) the usual exception behaviour will revert, and other messages will be skipped.
ClickHouse to Kafka
Although a rarer use case, ClickHouse data can also be persisted in Kafka. For example, we will insert rows manually into a Kafka table engine. This data will be read by the same Kafka engine, whose materialized view will place the data into a Merge Tree table. Finally, we demonstrate the application of materialized views in inserts to Kafka to read tables from existing source tables.Steps
Our initial objective is best illustrated: We assume you have the tables and views created under steps for Kafka to ClickHouse and that the topic has been fully consumed. First, confirm the count of the target table.github_out or equivalent. Ensure a Kafka table engine github_out_queue points to this topic.
github_out_mv to point at the GitHub table, inserting rows to the above engine when it triggers. Additions to the GitHub table will, as a result, be pushed to our new Kafka topic.
github_out topic should confirm delivery of the messages.
Clusters and performance
Working with ClickHouse Clusters
Through Kafka consumer groups, multiple ClickHouse instances can potentially read from the same topic. Each consumer will be assigned to a topic partition in a 1:1 mapping. When scaling ClickHouse consumption using the Kafka table engine, consider that the total number of consumers within a cluster can’t exceed the number of partitions on the topic. Therefore ensure partitioning is appropriately configured for the topic in advance. Multiple ClickHouse instances can all be configured to read from a topic using the same consumer group id - specified during the Kafka table engine creation. Therefore, each instance will read from one or more partitions, inserting segments to their local target table. The target tables can, in turn, be configured to use a ReplicatedMergeTree to handle duplication of the data. This approach allows Kafka reads to be scaled with the ClickHouse cluster, provided there are sufficient Kafka partitions.Tuning performance
Consider the following when looking to increase Kafka Engine table throughput performance:- The performance will vary depending on the message size, format, and target table types. 100k rows/sec on a single table engine should be considered obtainable. By default, messages are read in blocks, controlled by the parameter kafka_max_block_size. By default, this is set to the max_insert_block_size, defaulting to 1,048,576. Unless messages are extremely large, this should nearly always be increased. Values between 500k to 1M aren’t uncommon. Test and evaluate the effect on throughput performance.
- The number of consumers for a table engine can be increased using kafka_num_consumers. However, by default, inserts will be linearized in a single thread unless kafka_thread_per_consumer is changed from the default value of 1. Set this to 1 to ensure flushes are performed in parallel. Note that creating a Kafka engine table with N consumers (and kafka_thread_per_consumer=1) is logically equivalent to creating N Kafka engines, each with a materialized view and kafka_thread_per_consumer=0.
- Increasing consumers isn’t a free operation. Each consumer maintains its own buffers and threads, increasing the overhead on the server. Be conscious of the overhead of consumers and scale linearly across your cluster first and if possible.
- If the throughput of Kafka messages is variable and delays are acceptable, consider increasing the stream_flush_interval_ms to ensure larger blocks are flushed.
- background_message_broker_schedule_pool_size sets the number of threads performing background tasks. These threads are used for Kafka streaming. This setting is applied at the ClickHouse server start and can’t be changed in a user session, defaulting to 16. If you see timeouts in the logs, it may be appropriate to increase this.
- For communication with Kafka, the librdkafka library is used, which itself creates threads. Large numbers of Kafka tables, or consumers, can thus result in large numbers of context switches. Either distribute this load across the cluster, only replicating the target tables if possible, or consider using a table engine to read from multiple topics - a list of values is supported. Multiple materialized views can be read from a single table, each filtering to the data from a specific topic.
Additional settings
Aside from the settings discussed above, the following may be of interest:- Kafka_max_wait_ms - The wait time in milliseconds for reading messages from Kafka before retry. Set at a user profile level and defaults to 5000.