Kafka Connect Bigtable sink connector


Sink connectors are plugins for the Kafka Connect frameworkthat you can use to stream data from Kafka directly to other systemsfor storage and processing.The Kafka Connect Bigtable sink is a dedicated connectordesigned to stream data into Bigtable in real time with aslittle latency as possible.

This page describes the connector's features and limitations. It also providesexample usage for advanced scenarios withSingle Message Transforms (SMTs) and automated table creation.For installation instructions and full reference documentation,see theKafka Connect Bigtable Sink Connector repository.

Features

The Bigtable sink connector subscribes to your Kafka topics, readsmessages received on these topics, and then writes the data to Bigtabletables. The following sections provide a high-level overview for each feature.For usage details, seeConfiguration section ofthis document.

Key mapping, SMTs, and converters

To write data to a Bigtable table, you need to provide a unique row key, column family, and column name for each operation. This information is inferred from the fields in Kafka messages. You can construct all the required identifiers with settings likerow.key.definition,row.key.delimiter, ordefault.column.family.

Automatic table creation

You can use theauto.create.tables andauto.create.column.families settings to automatically create destination tables and column families if they don't exist in your Bigtable destination. This flexibility comes at a certain performance cost, so we generally recommend that you first create the tables where you want to stream data.

Write modes and deleting rows

When you write to a table, you can completely overwrite the data if a row already exists, or choose to abandon the operation with theinsert.mode setting. You can leverage this setting in conjunction with DLQ error handling to achieve theat least once delivery guarantee.

To issueDELETE commands, configure thevalue.null.mode property. You can use it for deleting full rows, column families, or individual columns.

Dead Letter Queue

Configure theerrors.deadletterqueue.topic.name property and seterrors.tolerance=all to post messages that fail to process to your DLQ topic.

Compatibility with the Confluent Platform Bigtable Sink Connector

The Bigtable Kafka Connect sink connector from Google Cloud offers full parity with the self-managed Confluent Platform Bigtable Sink Connector. You can use your existing configuration file for the Confluent Platform connector by adjusting theconnector.class setting toconnector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector.

Limitations

The following limitations apply:

  • The Kafka Connect Bigtable sink connector is currently only supportedfor Kafka clusters where you are able to independently install connectors(self-managed or on-premises Kafka clusters). This connector isn't currently supportedforGoogle Cloud Managed Service for Apache Kafka.

  • This connector can create column families and columns from field names up totwo nesting levels:

    • Structs nested deeper than two levels are converted toJSONand saved in their parent column.
    • Root-level structs are transformed into column families. Fields in those structsbecome column names.
    • Root-level primitive values are by default saved to a column family thatuses the Kafka topic as its name. Columns in that family have names equal tothe field names. You can modify this behavior by using thedefault.column.familyanddefault.column.qualifier settings.

Installation

To install this connector, you follow standard installation steps:build the project with Maven, copy the.jar files to yourKafka Connect plugin directory, and create the configuration file.For step-by-step instructions, see theRunning the connector section in the repository.

Configuration

To configure Kafka Connect connectors, you need to write configuration files.The Bigtable Kafka Connect sink connector from Google Cloudsupports allbasic Kafka connector properties,as well as some extra fields tailored for working with Bigtable tables.

The following sections provide detailed examples for more advanced use cases,but they don't describe all available settings. For basic usage examplesand the full properties reference, see theKafka Connect Bigtable Sink Connector repository.

Example: flexible row key and column family creation

Sample scenario

Your incoming Kafka messages contain details for shopping orderswith user identifiers. You want to write each order to a row with two columnfamilies: one for user details, one for order details.

Source Kafka message format

You format Kafka messages posted to the topic with theJsonConverter to achieve the following structure:

{"user":"user123","phone":"800‑555‑0199","email":"business@example.com","order":{id:"order123",items:["itemUUID1","itemUUID2"],discount:0.2}}
Expected Bigtable row

You want to write each message as a Bigtable row with the following structure:

Row keycontact_detailsorder_details
namephoneemailorderIditemsdiscount
user123#order123user123800‑555‑0199business@example.comorder123["itemUUID1", "itemUUID2"]0.2
Connector configuration
To achieve the expected result, you write the following configuration file:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.# Refer to the GitHub repository for full settings reference.# Connector name, class, Bigtable and Google Cloud identifiersname=BigtableSinkConnectorconnector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnectorgcp.bigtable.project.id=my_project_idgcp.bigtable.instance.id=my_bigtable_instance_id# Use JsonConverter to format Kafka messages as JSONkey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter# Name of the topic where shopping details are postedtopics=shopping_topic# Settings for row key creationrow.key.definition=user,order.idrow.key.delimiter=## All user identifiers are root level fields.# Use the default column family to aggregate them into a single family.default.column.family=contact_details# Use SMT to rename "orders" field into "order_details" for the new column familytransforms=renameOrderFieldtransforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Keytransforms.renameOrderField.renames=order:order_details
The results of using this file are as follows:
  • row.key.definition=user,order.id is a comma-separated list of the fields you want to use to construct the row key. Each entry is concatenated with the character set in therow.key.delimiter setting.

    When you userow.key.definition, all your messages need to use the same schema. If you need to process messages with different structures into different columns or column families, we recommend that you create separate connector instances. For more information, see the Example: write messages to multiple tables section of this document.

  • Bigtable column family names are based on the names of non-null root-level structs. As such:

    • Values for the contact details are root-level primitive data types, so you aggregate them into a default column family with thedefault.column.family=contact_details setting.
    • Order details are already wrapped in theorder object, but you want to useorder_details as the column family name. To achieve that, you use theReplaceFields SMT and rename the field.

Example: automatic table creation and idempotent writes

Sample scenario

Your incoming Kafka messages contain details for shopping orders. Customerscan edit their baskets before fulfillment, so you expect to receive follow-upmessages with changed orders that you need to save as updates in the same row.You also can't guarantee that the destination table exists at writetime, so you want the connector to automatically create the table if itdoesn't exist.

Connector configuration
To achieve the expected result, you write the following configuration file:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.# Refer to the GitHub repository for full settings reference.# Settings for row key creation also omitted.# Refer to theExample: flexible row key and column family creation section.# Connector name, class, Bigtable and Google Cloud identifiersname=BigtableSinkConnectorconnector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnectorgcp.bigtable.project.id=my_project_idgcp.bigtable.instance.id=my_bigtable_instance_id# Use JsonConverter to format Kafka messages as JSONkey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter# Name of the topic where shopping details are postedtopics=shopping_topic# Automatically create destination tables if they don't existauto.create.tables=true# UPSERT causes subsequent writes to overwrite existing rows.# This way you can update the same order when customers change the contents# of their baskets.insert.mode=upsert

Example: write messages to multiple tables

Sample scenario

Your incoming Kafka messages contain details for shopping orders from differentfulfillment channels. These messages are posted to different topics, and youwant to use the same configuration file to write them to separate tables.

Connector configuration

You can write your messages to multiple tables, but if you use a singleconfiguration file for your setup, then each message must use the sameschema. If you need to process messages from different topics intodistinct columns or families, we recommend that you create separate connectorinstances.

To achieve the expected result, you write the following configuration file:

# Settings such as latency configuration or DLQ identifiers omitted for brevity.# Refer to the GitHub repository for full settings reference.# Settings for row key creation are also omitted.# Refer to theExample: flexible row key and column family creation section.# Connector name, class, Bigtable and Google Cloud identifiersname=BigtableSinkConnectorconnector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnectorgcp.bigtable.project.id=my_project_idgcp.bigtable.instance.id=my_bigtable_instance_id# Use JsonConverter to format Kafka messages as JSONkey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter# Name of the topics where shopping details are postedtopics=shopping_topic_store1,shopping_topic_store2# Use a dynamic table name based on the Kafka topic name.table.name.format=orders_${topic}

In this approach, you use thetable.name.format=orders_${topic}property to dynamically refer to each Kafka topic name.When you configure multiple topic names with thetopics=shopping_topic_store1,shopping_topic_store2 setting,each message is written to a separate table:

  • Messages from theshopping_topic_store1 topic are written to theorders_shopping_topic_store1 table.
  • Messages from theshopping_topic_store2 topic are written to theorders_shopping_topic_store2 table.

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025-12-15 UTC.