Kafka connector

You can set up a Kafka connector to consume data from a Kafka topic and store it in Tinybird by creating a .connection and .datasource file. Use tb datasource create --kafka command for a guided the process.

Set up the connector

To set up the Kafka connector, follow these steps.

1

Create a Kafka connection

You can create a Kafka connection in Tinybird using either the CLI or by manually creating a connection file.

Run the following command to create a connection:

tb connection create kafka

You will be prompted to enter:

  1. A name for your connection.
  2. The bootstrap server
  3. The Kafka key
  4. The Kafka secret

If you need to add KAFKA_SCHEMA_REGISTRY_URL or any of the Kafka .connection settings, edit the .connection file manually.

Option 2: Manually create a connection file

Create a .connection file with the required credentials stored in secrets. For example:

kafka_sample.connection
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS bootsrap_servers:port
KAFKA_SECURITY_PROTOCOL SASL_SSL
KAFKA_SASL_MECHANISM PLAIN
KAFKA_KEY {{ tb_secret("KAFKA_KEY", "key") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET", "secret") }}

For a complete list of Kafka connection settings, see Kafka .connection settings.

Set the values of the secrets using tb secret:

tb [--cloud] secret set KAFKA_KEY mykey

Secrets are only replaced in your resources when you deploy. If you change a secret, you need to deploy for the changes to take effect.

2

Create a Kafka data source

Create a .datasource file using tb datasource create --kafka or manually.

Define the data source schema as with any non-Kafka datasource and specify the required Kafka settings. The value of KAFKA_CONNECTION_NAME must match the name of the .connection file you created in the previous step.

Default .datasource created will store the whole message in a column called data. Then, you can use JSONExtract functions to access the message fields, either at query time or using materialized views.

kafka_default.datasource
SCHEMA >
    `data` String `json:$`

KAFKA_CONNECTION_NAME kafka_connection # The name of the .connection file
KAFKA_TOPIC topic_name
KAFKA_GROUP_ID {{ tb_secret("KAFKA_GROUP_ID") }}

You can always use JSONPaths syntax to extract the message fields into separate columns at ingest time.

kafka_sample.datasource
SCHEMA >
   `timestamp` DateTime(3) `json:$.timestamp`,
   `session_id` String `json:$.session_id`,
   `action` LowCardinality(String) `json:$.action`,
   `version` LowCardinality(String) `json:$.version`,
   `payload` String `json:$.payload`,
   `data` String `json:$`

KAFKA_CONNECTION_NAME kafka_sample # The name of the .connection file
KAFKA_TOPIC test_topic
KAFKA_GROUP_ID {{ tb_secret("KAFKA_GROUP_ID") }}

In addition to the columns specified in SCHEMA, Kafka data sources have additional columns that store metadata of the messages ingested. See Kafka meta columns for more information.

For a complete list of Kafka data source settings, see Kafka .datasource settings.

Use different consumer group values for KAFKA_GROUP_ID at different environments to isolate consumers and their committed offset.

3

Connectivity check

After defining your Kafka data source and connection, validate the setup by running a deploy check:

tb --cloud deploy --check

This will check that the Kafka broker is reachable and that Tinybird can connect to it with the provided credentials. Remember to set any secrets used by the connection.

Compatibility

The connector is compatible with Apache Kafka and works with any compatible implementation and vendor. The following are tried and tested:

  • Apache Kafka
  • Confluent Platform and Confluent Cloud
  • Redpanda
  • AWS MSK
  • Azure Event Hubs for Apache Kafka
  • Estuary

Kafka .datasource settings

InstructionRequiredDescription
KAFKA_CONNECTION_NAMEYesName of the configured Kafka connection in Tinybird. It must match the name of the connection file (without the extension).
KAFKA_TOPICYesName of the Kafka topic to consume from.
KAFKA_GROUP_IDYesConsumer Group ID to use when consuming from Kafka.
KAFKA_AUTO_OFFSET_RESETNoOffset to use when no previous offset can be found, like when creating a new consumer. Supported values are latest and earliest. Default: latest.
KAFKA_STORE_HEADERSNoAdds a __headers Map(String, String) column to the data source, and stores Kafka headers in it for later processing. Default value is False.
KAFKA_STORE_RAW_VALUENoStores the raw message in its entirety in the __value column. Default: False.
KAFKA_KEY_FORMATNoFormat of the message key. Valid values are avro, json_with_schema, and json_without_schema. Using avro or json_with_schema requires KAFKA_SCHEMA_REGISTRY_URL to be set in the connection file used by the data source.
KAFKA_VALUE_FORMATNoFormat of the message value. Valid values are avro, json_with_schema, and json_without_schema. Using avro or json_with_schema requires KAFKA_SCHEMA_REGISTRY_URL to be set in the connection file used by the data source.

Kafka .connection settings

InstructionRequiredDescription
KAFKA_BOOTSTRAP_SERVERSYesComma-separated list of one or more Kafka brokers, including Port numbers.
KAFKA_KEYYesKey used to authenticate with Kafka. Sometimes called Key, Client Key, or Username depending on the Kafka distribution.
KAFKA_SECRETYesSecret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password depending on the Kafka distribution.
KAFKA_SECURITY_PROTOCOLNoSecurity protocol for the connection. Accepted values are PLAINTEXT and SASL_SSL. Default value is SASL_SSL.
KAFKA_SASL_MECHANISMNoSASL mechanism to use for authentication. Supported values are PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. Default value is PLAIN.
KAFKA_SCHEMA_REGISTRY_URLNoURL of the Kafka schema registry. Used for avro and json_with_schema deserialization of keys and values. If Basic Auth is required, it must be included in the URL as in https://user:password@registry_url
KAFKA_SSL_CA_PEMNoContent of the CA certificate in PEM format for SSL connections.

Kafka connector in the local environment

You can use the Kafka connector in the Tinybird Local container to consume messages from a local Kafka server or a Kafka server in the cloud.

Local Kafka server with Docker Compose

When using a local Kafka server, ensure the Tinybird Local container can access it. If you are running Kafka using Docker, Docker Compose is the best option to set up both Kafka and Tinybird Local in the same network. Here's an example using apache/kafka:

networks:
  kafka_network:
    driver: bridge

volumes:
  kafka-data:

services:

  tinybird-local:
    image: tinybirdco/tinybird-local:latest
    container_name: tinybird-local
    platform: linux/amd64
    ports:
      - "7181:7181"
    networks:
      - kafka_network

  kafka:
    image: apache/kafka:latest
    hostname: broker
    container_name: broker
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker:29093"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    volumes:
      - kafka-data:/var/lib/kafka/data
    networks:
      - kafka_network

Network Configuration

The key points about the network configuration:

  1. The example uses a bridge network (kafka_network) to enable communication between containers
  2. The Kafka service exposes ports for both internal container communication and external access
  3. Tinybird Local connects to Kafka using the internal network address
  4. The bootstrap servers address in your Kafka Connection should match the KAFKA_ADVERTISED_LISTENERS in the docker-compose.yml file (e.g., kafka:29092)

Creating the Kafka Connection and Data Source

The following examples use default values in the tb_secret() function, which are suitable for this local setup. When deploying to Tinybird Cloud, you'll set these secrets in the Cloud environment instead.

Connection file /connections/kafka_conn.connection

/connections/kafka_conn.connection
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("KAFKA_PROD_SERVER", "kafka:29092") }}
KAFKA_SECURITY_PROTOCOL {{ tb_secret("KAFKA_PROD_SECURITY_PROTOCOL", "PLAINTEXT") }}
KAFKA_SASL_MECHANISM {{ tb_secret("KAFKA_PROD_SASL_MECHANISM", "PLAIN") }}
KAFKA_KEY {{ tb_secret("KAFKA_KEY", "key") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET", "secret") }}

A Schemaless kafka data source file /datasources/kafka_ds.datasource

datasources/kafka_ds.datasource
SCHEMA >
    `data` String `json:$`

KAFKA_CONNECTION_NAME kafka_conn
KAFKA_TOPIC sample-topic
KAFKA_GROUP_ID my_group_id

Usage example

1

Start the Docker containers

docker compose up
2

Create the sample-topic

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --create --topic sample-topic --bootstrap-server localhost:9092

# Created topic sample-topic.
3

Deploy the project

tb deploy
# Running against Tinybird Local

# * Changes to be deployed:
# ------------------------------------------------------------------------
# | status | name       | type       | path                              |
# ------------------------------------------------------------------------
# | new    | kafka_ds   | datasource | datasources/kafka_ds.datasource   |
# | new    | kafka_conn | connection | connections/kafka_conn.connection |
# ------------------------------------------------------------------------
# * No changes in tokens to be deployed

# Deployment URL: http://cloud.tinybird.co/local/7181/None/deployments/1

# * Deployment submitted
# » Waiting for deployment to be ready...
# ✓ Deployment is ready
# » Removing old deployment
# ✓ Old deployment removed
# » Waiting for deployment to be promoted...
# ✓ Deployment #1 is live!
# A deployment with no data is useless. Learn how to ingest at https://www.tinybird.co/docs/forward/get-data-in
4

Send data to the topic and query it

echo '{"data": "test"}' | docker exec -i broker /opt/kafka/bin/kafka-console-producer.sh --topic sample-topic --bootstrap-server localhost:9092

tb sql "select * from kafka_ds"
# Running against Tinybird Local
#   data               __value   __topic                  __partition   __offset   __timestamp           __key   
#   String             String    LowCardinality(String)         Int16      Int64   DateTime              String  
# ───────────────────────────────────────────────────────────────────────────────────────────────────────────────
#   {"data": "test"}             sample-topic                       0          1   2025-06-20 12:17:49     

Docker Compose troubleshooting

If you encounter connection issues:

  1. Ensure all containers are running: docker-compose ps
  2. Check container logs: docker-compose logs kafka
  3. Ensure the bootstrap servers address in your Connection file matches the KAFKA_ADVERTISED_LISTENERS value in your docker-compose.yml file.

Kafka meta columns

When you connect a data source to Kafka, the following columns are added to store metadata from Kafka messages:

nametypedescription
__value StringA String representing the entire unparsed value of the Kafka message. It is only populated if KAFKA_STORE_RAW_VALUE is set to True.
__topic LowCardinality(String)The topic that the message was read from.
__partition Int16The partition that the message was read from.
__offset Int16The offset of the message.
__timestamp DatetimeThe timestamp of the message.
__key StringThe key of the message.

Optionally, when KAFKA_STORE_HEADERS is set to True, the following column is added and populated:

nametypedescription
__headersMap(String, String)Kafka headers of the message.

When you iterate your Kafka data source, you might need to use the meta columns in the FORWARD_QUERY. Tinybird suggests a valid forward query that you can tweak to get the desired values for each column.

Kafka logs

You can find global logs in the datasources_ops_log Service Data Source. Filter by datasource_id to select the correct datasource, and by event_type='append-kafka'.

For example, to select all Kafka releated logs in the last day, run the following query:

SELECT *
FROM tinybird.datasources_ops_log
WHERE datasource_id = 't_1234'
  AND event_type = 'append-kafka'
  AND timestamp > now() - INTERVAL 1 day
ORDER BY timestamp DESC

If you can't find logs in datasources_ops_log, the kafka_ops_log Service Data Source contains more detailed logs. Filter by datasource_id to select the correct datasource, and use msg_type to select the desired log level (info, warning, or error).

SELECT *
FROM tinybird.kafka_ops_log
WHERE datasource_id = 't_1234'
  AND timestamp > now() - interval 1 day
  AND msg_type IN ['info', 'warning', 'error']

Troubleshooting

Each combination of KAFKA_TOPIC and KAFKA_GROUP_ID can only be used in one data source, otherwise the offsets committed by the consumers of different data sources clash.

If you connect a data source to Kafka using a KAFKA_TOPIC and KAFKA_GROUP_ID that were previously used by another data source in your workspace, the data source only receives data from the last committed offset, even if KAFKA_AUTO_OFFSET_RESET is set to earliest.

To prevent these issues, always use unique KAFKA_GROUP_IDs when testing Kafka data sources.

See Kafka logs to learn how to diagnose any other issues

Compressed messages

Tinybird can consume from Kafka topics where Kafka compression is turned on; decompressing the message is a standard function of the Kafka consumer. However, if you compressed the message before passing it through the Kafka producer, Tinybird can't do post-consumer processing to decompress the message.

For example, if you compressed a JSON message through gzip and produced it to a Kafka topic as a bytes message, it would be ingested by Tinybird as bytes. If you produced a JSON message to a Kafka topic with the Kafka producer setting compression.type=gzip, while it would be stored in Kafka as compressed bytes, it would be decoded on ingestion and arrive to Tinybird as JSON.

Connecting an existing data source to Kafka

You can connect an existing, default data source to Kafka.

Create the Kafka .connection file if it does not exist, add the desired Kafka settings to the .datasource file, and add a FORWARD_QUERY to provide default values for the Kafka meta columns.

Disconnecting a data source from Kafka

To disconnect a data source from Kafka, remove the Kafka settings from the .datasource file.

If you want to keep any of the Kafka meta columns, add them to the schema with a default value and adjust the FORWARD_QUERY accordingly.

Updated