Build change streams connections to Kafka Stay organized with collections Save and categorize content based on your preferences.
This page explains how to use the Kafka connector to consumeand forward Spannerchange streams data.
Core concepts
The following describes core concepts of the Kafka connector.
Debezium
Debezium is an open source project that providesa low latency data streaming platform for change data capture.
Kafka connector
The Kafka connector provides an abstraction over the Spanner API topublish Spanner change streams to Kafka. With this connector,you don't have to manage the change streams partition lifecycle,which is necessary when you use the Spanner API directly.
The Kafka connector produces a change event for every data change record mod and sendschange event records downstream into a separate Kafka topic for each change stream-tracked table.Adata change record mod represents a single modification (insert, update, or delete) that was captured. A single data change record can contain more than one mod.
Kafka connector output
The Kafka connector forwards change streams records directlyinto a separate Kafka topic. The output topic name should beconnector_name.table_name.If the topic doesn't exist, the Kafka connector automatically creates a topic under that name.
You can also configuretopic routing transformations to re-route records into topics that you specify. If you want to use topic routing, disable thelow watermark functionality.
Record ordering
Records are ordered by commit timestamp per primary key inthe Kafka topics. Records belonging to different primary keys do not haveordering guarantees. Records with the same primary key are stored in thesame Kafka topic partition. If you want to process whole transactions, you canalso use thedata change record'sserver_transaction_id andnumber_of_records_in_transaction fields toassemble a Spanner transaction.
Change events
The Kafka connector generates a data change event for eachINSERT,UPDATE,andDELETE operation. Each event contains a key and values for the changed row.
You can useKafka Connect converters to produce data change events inProtobuf,AVRO,JSON, orJSON Schemaless formats. If you use aKafka Connect converter that produces schemas, the event containsseparate schemas for the key and values. Otherwise, the event only containsthe key and values.
The schema for the key never changes. The schema for the values is anamalgamation of all the columns the change stream has tracked since theconnector start time.
If you configure the connector to produce JSON events, the output change eventcontains five fields:
The first
schemafield specifies a Kafka Connect schema that describes the Spanner key schema.The first
payloadfield has the structure described by the previousschemafield and contains the key for the row that was changed.The second
schemafield specifies the Kafka Connect schema that describes the schema for the changed row.The second
payloadfield has the structure described by the previousschemafield and it contains the actual data for the row that was changed.The
sourcefield is a mandatory field that describes the source metadata for the event.
The following is an example of a data change event:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 //}Low watermark
The low watermark describes the time T at which the Kafka connector is guaranteed to have streamed out and published to a Kafka topic all events with timestamp < T.
You can enable the low watermark in the Kafka connector using thegcp.spanner.low-watermark.enabled parameter. This parameter is disabledby default. If the low watermark is enabled, thelow_watermark field in the change stream datachange record is populated with the Kafka connector's current low watermarktimestamp.
If there are no records being produced, the Kafka connector sends periodicwatermark "heartbeats" to the Kafka output topics detected by the connector.
These watermark heartbeats are records that are empty except for thelow_watermark field. You can then use the low watermark to perform time-based aggregations.For example, you can use the low watermark to order events by committimestamp across primary keys.
Metadata topics
The Kafka connector, as well as theKafka Connect framework, creates severalmetadata topics to store connector-related information. It is not advisable tomodify either the configuration or the content of these metadata topics.
The following are the metadata topics:
_consumer_offsets: A topic automatically created by Kafka. Stores consumer offsets for consumers created in the Kafka connector._kafka-connect-offsets: A topic automatically created by Kafka Connect. Stores the connector offsets._sync_topic_spanner_connector_connectorname: A topic automatically created by the connector. Stores metadata regarding change stream partitions._rebalancing_topic_spanner_connector_connectorname: A topic automatically created by the connector. Used to determine connector task aliveness._debezium-heartbeat.connectorname: A topic used to process Spanner change stream heartbeats.
Kafka connector runtime
The following describes the Kafka connector runtime.
Scalability
The Kafka connector is horizontally scalable and runs on one or more tasks spread out among multiple Kafka Connect workers.
Message delivery guarantees
The Kafka connector supports at-least-once delivery guarantee.
Fault tolerance
The Kafka connector is tolerant of failures. As the Kafka connector reads changes andproduces events, it records the last commit timestamp processed for each changestream partition. If the Kafka connector stops for any reason (includingcommunication failures, network problems, or software failures), upon restartthe Kafka connector continues streaming records where it last left off.
The Kafka connector reads the information schema at the Kafka connector's starttimestamp to retrieve schema information. By default, Spanner cannotread the information schema at read timestamps before theversion retention period,which defaults to one hour. If you want to start the connector from earlier thanone hour into the past, you must increase the database's version retentionperiod.
Set up the Kafka connector
Create a change stream
For details on how to create a change stream, seeCreate a change stream. To continue with the next steps, a Spanner instance with a change stream configured is required.
Note that if you want both changed and unchanged columns to be returned on eachdata change event, use the value capture typeNEW_ROW. For more information, seevalue capture type.
Install the Kafka connector JAR
WithZookeeper,Kafka, andKafka Connect installed, the remaining tasks to deploy a Kafka connector are to downloadtheconnector's plug-in archive, extract the JAR files into your Kafka Connect environment, and add thedirectory with the JAR files toKafka Connect'splugin.path.You then need to restart your Kafka Connect process to pick up the new JAR files.
If you are working with immutable containers, you can pull images fromDebezium's Container images forZookeeper, Kafka and Kafka Connect. The Kafka Connect image has theSpanner connector pre-installed.
For more information on how to install Debezium-based Kafka connector JARs, seeInstalling Debezium.
Configure the Kafka connector
The following is an example of the configuration for a Kafka connectorthat connects to a change stream calledchangeStreamAll in thedatabaseusers in instancetest-instance and projecttest-project.
"name":"spanner-connector","config":{"connector.class":"io.debezium.connector.spanner.SpannerConnector","gcp.spanner.project.id":"test-project","gcp.spanner.instance.id":"test-instance","gcp.spanner.database.id":"users","gcp.spanner.change.stream":"changeStreamAll","gcp.spanner.credentials.json":"{"client_id": user@example.com}","gcp.spanner.database.role":"cdc-role","tasks.max":"10"}
This configuration contains the following:
The name of the connector when registered with a Kafka Connect service.
The name of this Spanner connector class.
The Project ID.
The Spanner Instance ID.
The Spanner Database ID.
The change stream name.
The JSON object for the service account key.
(Optional) The Spanner database role to use.
The maximum number of tasks.
For a complete list of connector properties, seeKafka connector configuration properties.
Add the connector configuration to Kafka Connect
To start running a Spanner connector:
Create a configuration for the Spanner connector.
Use theKafka Connect REST API to add thatconnector configuration to your Kafka Connect cluster.
You can send this configuration with aPOST command to a running Kafka Connectservice. By default, the Kafka Connect service runs on port8083. The service records the configuration and start the connector task that connects to the Spanner database and streams change event records toKafka topics.
The following is an examplePOST command:
POST /connectors HTTP/1.1Host: http://localhost:8083Accept: application/json{ "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }}Example successful response:
HTTP/1.1 201 CreatedContent-Type: application/json{ "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ]}Update the Kafka connector configuration
To update the connector configuration, send aPUT command to the runningKafka Connect service with the same connector name.
Assume that we have a connector running with the configuration fromthe previous section. The following is an examplePUT command:
PUT /connectors/spanner-connector/config HTTP/1.1Host: http://localhost:8083Accept: application/json{ "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10"}Example successful response:
HTTP/1.1 200 OKContent-Type: application/json{ "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10"}Stop the Kafka connector
To stop the connector, send aDELETE command to the runningKafka Connect service with the same connector name.
Assume that we have a connector running with the configuration fromthe previous section. The following is an exampleDELETE command:
DELETE /connectors/spanner-connector HTTP/1.1Host: http://localhost:8083
Example successful response:
HTTP/1.1 204 No Content
Monitor the Kafka connector
In addition to standard Kafka Connect andDebezium metrics, the Kafka connector exports its own metrics:
MilliSecondsLowWatermark: The current low watermark of the connector task in milliseconds. Thelow watermark describes the time T at which the connector is guaranteed to havestreamed out all events with timestamp < TMilliSecondsLowWatermarkLag: The lag of the low watermark behind the current time in milliseconds.streamed out all events with timestamp < TLatencyLowWatermark<Variant>MilliSeconds: The lag of the low watermark behind the current timein milliseconds. P50, P95, P99, Average, Min, and Max variants are provided.LatencySpanner<Variant>MilliSeconds: The Spanner-commit-timestamp-to-connector-read latency.P50, P95, P99, Average, Min, Max variants are provided.LatencyReadToEmit<Variant>MilliSeconds: The Spanner-read-timestamp-to-connector-emit latency.P50, P95, P99, Average, Min, and Max variants are provided.LatencyCommitToEmit<Variant>tMilliSeconds: The Spanner-commit-timestamp-to-connector-emit latency.P50, P95, P99, Average, Min, and Max variants are provided.LatencyCommitToPublish<Variant>MilliSeconds: The Spanner-commit-timestamp-to Kafka-publish-timestamp latency. P50, P95, P99, Average, Min, Max variants are provided.NumberOfChangeStreamPartitionsDetected: The total number of partitionsdetected by the current connector task.NumberOfChangeStreamQueriesIssued: The total number of change stream queriesissued by the current task.NumberOfActiveChangeStreamQueries: The active number of change streamqueries detected by the current connector task.SpannerEventQueueCapacity: The total capacity ofStreamEventQueue, a queue that stores elements received from change stream queries.SpannerEventQueueCapacity: The remainingStreamEventQueuecapacity.TaskStateChangeEventQueueCapacity: The total capacity ofTaskStateChangeEventQueue, a queue that stores events happening in the connector.RemainingTaskStateChangeEventQueueCapacity: The remainingTaskStateChangeEventQueuecapacity.NumberOfActiveChangeStreamQueries: The active number of change streamqueries detected by the current connector task.
Kafka connector configuration properties
The following are required configuration properties for the connector:
name: Unique name for the connector. Attempting to register again with the same name causes failure. This property is required by all Kafka Connect connectors.connector.class: The name of the Java class for the connector. Always use a value ofio.debezium.connector.spanner.SpannerConnectorfor the Kafka connector.tasks.max: The maximum number of tasks that should be created for this connector.gcp.spanner.project.id: The project IDgcp.spanner.instance.id: The Spanner instance IDgcp.spanner.database.id: The Spanner database IDgcp.spanner.change.stream: The Spanner change stream namegcp.spanner.credentials.json: The service account key JSON object.gcp.spanner.credentials.path: The file path to the service account key JSON object. Required if the above field isn't provided.gcp.spanner.database.role: The Spanner database role touse. This is required only when the change stream is secured withfine-grained access control. The database role must have theSELECTprivilege on thechange stream and theEXECUTEprivilege on the change stream's readfunction. For more information, seeFine-grained access control for changestreams.
The following advanced configuration properties have defaults that work in mostsituations and therefore rarely need to be specified in the connector's configuration:
gcp.spanner.low-watermark.enabled: Indicates whether the low watermark is enabled for the connector. Default is false.gcp.spanner.low-watermark.update-period.ms: The interval at which the low watermark is updated. Default is 1000 ms.heartbeat.interval.ms: The Spanner heartbeat interval. The default is 300000 (five minutes).gcp.spanner.start.time: The connector start time. Defaults to the current time.gcp.spanner.end.time: The connector end time. Defaults to infinity.tables.exclude.list: The tables to exclude change events for. Defaults to empty.tables.include.list: The tables to include change events for. If not populated, all tables are included. Defaults to empty.gcp.spanner.stream.event.queue.capacity: The Spanner event queue capacity. Defaults to 10000.connector.spanner.task.state.change.event.queue.capacity: The task state change event queue capacity. Defaults to 1000.connector.spanner.max.missed.heartbeats: The maximum number of missed heartbeats for a change stream query before an exception is thrown. Defaults to 10.scaler.monitor.enabled: Indicates whether task autoscaling is enabled. Defaults to false.tasks.desired.partitions: The preferred number of change streams partitions per task. This parameter is needed for task autoscaling. Defaults to 2.tasks.min: The minimum number of tasks. This parameter is needed for task autoscaling. Defaults to 1.connector.spanner.sync.topic: The name for the sync topic, an internal connector topic used to store communication between tasks. Defaults to_sync_topic_spanner_connector_connectornameif the user did not provide a name.connector.spanner.sync.poll.duration: The poll duration for the sync topic. Defaults to 500 ms.connector.spanner.sync.request.timeout.ms: The timeout for requests to the sync topic. Defaults to 5000 ms.connector.spanner.sync.delivery.timeout.ms: The timeout for publishing to the sync topic. Defaults to 15000 ms.connector.spanner.sync.commit.offsets.interval.ms: The interval at which offsets are committed for the sync topic. Defaults to 60000 ms.connector.spanner.sync.publisher.wait.timeout: The interval at which messages are published to the sync topic. Defaults to 5 ms.connector.spanner.rebalancing.topic: The name of the rebalancing topic. The rebalancing topic is an internal connector topic used to determine task aliveness. Defaults to_rebalancing_topic_spanner_connector_connectornameif the user did not provide a name.connector.spanner.rebalancing.poll.duration: The poll duration for the rebalancing topic. Defaults to 5000 ms.connector.spanner.rebalancing.commit.offsets.timeout: The timeout for committing offsets for the rebalancing topic. Defaults to 5000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms: The interval at which offsets are committed for the sync topic. Defaults to 60000 ms.connector.spanner.rebalancing.task.waiting.timeout: The duration of time a task waits before processing a rebalancing event. Defaults to 1000 ms.
For an even more detailed list of the configurable connector properties, see theGitHub repository.
Limitations
The connector does not support streamingsnapshot events.
If watermarking is enabled in the connector, you cannot configureDebezium topicrouting transformations.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-12-17 UTC.