Kafka Connect Bigtable sink connector
Sink connectors are plugins for the Kafka Connect frameworkthat you can use to stream data from Kafka directly to other systemsfor storage and processing.The Kafka Connect Bigtable sink is a dedicated connectordesigned to stream data into Bigtable in real time with aslittle latency as possible.
This page describes the connector's features and limitations. It also providesexample usage for advanced scenarios withSingle Message Transforms (SMTs) and automated table creation.For installation instructions and full reference documentation,see theKafka Connect Bigtable Sink Connector repository.
Features
The Bigtable sink connector subscribes to your Kafka topics, readsmessages received on these topics, and then writes the data to Bigtabletables. The following sections provide a high-level overview for each feature.For usage details, seeConfiguration section ofthis document.
Key mapping, SMTs, and converters
To write data to a Bigtable table, you need to provide a unique row key, column family, and column name for each operation. This information is inferred from the fields in Kafka messages. You can construct all the required identifiers with settings likerow.key.definition,row.key.delimiter, ordefault.column.family.
Automatic table creation
You can use theauto.create.tables andauto.create.column.families settings to automatically create destination tables and column families if they don't exist in your Bigtable destination. This flexibility comes at a certain performance cost, so we generally recommend that you first create the tables where you want to stream data.
Write modes and deleting rows
When you write to a table, you can completely overwrite the data if a row already exists, or choose to abandon the operation with theinsert.mode setting. You can leverage this setting in conjunction with DLQ error handling to achieve theat least once delivery guarantee.
To issueDELETE commands, configure thevalue.null.mode property. You can use it for deleting full rows, column families, or individual columns.
Dead Letter Queue
Configure theerrors.deadletterqueue.topic.name property and seterrors.tolerance=all to post messages that fail to process to your DLQ topic.
Compatibility with the Confluent Platform Bigtable Sink Connector
The Bigtable Kafka Connect sink connector from Google Cloud offers full parity with the self-managed Confluent Platform Bigtable Sink Connector. You can use your existing configuration file for the Confluent Platform connector by adjusting theconnector.class setting toconnector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector.
Limitations
The following limitations apply:
The Kafka Connect Bigtable sink connector is currently only supportedfor Kafka clusters where you are able to independently install connectors(self-managed or on-premises Kafka clusters). This connector isn't currently supportedforGoogle Cloud Managed Service for Apache Kafka.
This connector can create column families and columns from field names up totwo nesting levels:
- Structs nested deeper than two levels are converted to
JSONand saved in their parent column. - Root-level structs are transformed into column families. Fields in those structsbecome column names.
- Root-level primitive values are by default saved to a column family thatuses the Kafka topic as its name. Columns in that family have names equal tothe field names. You can modify this behavior by using the
default.column.familyanddefault.column.qualifiersettings.
- Structs nested deeper than two levels are converted to
Installation
To install this connector, you follow standard installation steps:build the project with Maven, copy the.jar files to yourKafka Connect plugin directory, and create the configuration file.For step-by-step instructions, see theRunning the connector section in the repository.
Configuration
To configure Kafka Connect connectors, you need to write configuration files.The Bigtable Kafka Connect sink connector from Google Cloudsupports allbasic Kafka connector properties,as well as some extra fields tailored for working with Bigtable tables.
The following sections provide detailed examples for more advanced use cases,but they don't describe all available settings. For basic usage examplesand the full properties reference, see theKafka Connect Bigtable Sink Connector repository.
Example: flexible row key and column family creation
- Sample scenario
Your incoming Kafka messages contain details for shopping orderswith user identifiers. You want to write each order to a row with two columnfamilies: one for user details, one for order details.
- Source Kafka message format
You format Kafka messages posted to the topic with the
JsonConverterto achieve the following structure:{"user":"user123","phone":"800‑555‑0199","email":"business@example.com","order":{id:"order123",items:["itemUUID1","itemUUID2"],discount:0.2}}
- Expected Bigtable row
You want to write each message as a Bigtable row with the following structure:
Row key contact_details order_details name phone email orderId items discount user123#order123user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0.2 - Connector configuration
- To achieve the expected result, you write the following configuration file:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.# Refer to the GitHub repository for full settings reference.# Connector name, class, Bigtable and Google Cloud identifiersname=BigtableSinkConnectorconnector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnectorgcp.bigtable.project.id=my_project_idgcp.bigtable.instance.id=my_bigtable_instance_id# Use JsonConverter to format Kafka messages as JSONkey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter# Name of the topic where shopping details are postedtopics=shopping_topic# Settings for row key creationrow.key.definition=user,order.idrow.key.delimiter=## All user identifiers are root level fields.# Use the default column family to aggregate them into a single family.default.column.family=contact_details# Use SMT to rename "orders" field into "order_details" for the new column familytransforms=renameOrderFieldtransforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Keytransforms.renameOrderField.renames=order:order_details
The results of using this file are as follows: row.key.definition=user,order.idis a comma-separated list of the fields you want to use to construct the row key. Each entry is concatenated with the character set in therow.key.delimitersetting.When you use
row.key.definition, all your messages need to use the same schema. If you need to process messages with different structures into different columns or column families, we recommend that you create separate connector instances. For more information, see the Example: write messages to multiple tables section of this document.Bigtable column family names are based on the names of non-null root-level structs. As such:
- Values for the contact details are root-level primitive data types, so you aggregate them into a default column family with the
default.column.family=contact_detailssetting. - Order details are already wrapped in the
orderobject, but you want to useorder_detailsas the column family name. To achieve that, you use theReplaceFields SMT and rename the field.
- Values for the contact details are root-level primitive data types, so you aggregate them into a default column family with the
Example: automatic table creation and idempotent writes
- Sample scenario
Your incoming Kafka messages contain details for shopping orders. Customerscan edit their baskets before fulfillment, so you expect to receive follow-upmessages with changed orders that you need to save as updates in the same row.You also can't guarantee that the destination table exists at writetime, so you want the connector to automatically create the table if itdoesn't exist.
- Connector configuration
- To achieve the expected result, you write the following configuration file:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.# Refer to the GitHub repository for full settings reference.# Settings for row key creation also omitted.# Refer to theExample: flexible row key and column family creation section.# Connector name, class, Bigtable and Google Cloud identifiersname=BigtableSinkConnectorconnector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnectorgcp.bigtable.project.id=my_project_idgcp.bigtable.instance.id=my_bigtable_instance_id# Use JsonConverter to format Kafka messages as JSONkey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter# Name of the topic where shopping details are postedtopics=shopping_topic# Automatically create destination tables if they don't existauto.create.tables=true# UPSERT causes subsequent writes to overwrite existing rows.# This way you can update the same order when customers change the contents# of their baskets.insert.mode=upsert
Example: write messages to multiple tables
- Sample scenario
Your incoming Kafka messages contain details for shopping orders from differentfulfillment channels. These messages are posted to different topics, and youwant to use the same configuration file to write them to separate tables.
- Connector configuration
You can write your messages to multiple tables, but if you use a singleconfiguration file for your setup, then each message must use the sameschema. If you need to process messages from different topics intodistinct columns or families, we recommend that you create separate connectorinstances.
To achieve the expected result, you write the following configuration file:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.# Refer to the GitHub repository for full settings reference.# Settings for row key creation are also omitted.# Refer to theExample: flexible row key and column family creation section.# Connector name, class, Bigtable and Google Cloud identifiersname=BigtableSinkConnectorconnector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnectorgcp.bigtable.project.id=my_project_idgcp.bigtable.instance.id=my_bigtable_instance_id# Use JsonConverter to format Kafka messages as JSONkey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter# Name of the topics where shopping details are postedtopics=shopping_topic_store1,shopping_topic_store2# Use a dynamic table name based on the Kafka topic name.table.name.format=orders_${topic}
In this approach, you use the
table.name.format=orders_${topic}property to dynamically refer to each Kafka topic name.When you configure multiple topic names with thetopics=shopping_topic_store1,setting,each message is written to a separate table:shopping_topic_store2 - Messages from the
shopping_topic_store1topic are written to theorders_shopping_topic_store1table. - Messages from the
shopping_topic_store2topic are written to theorders_shopping_topic_store2table.
- Messages from the
What's next
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-12-15 UTC.