Change stream partitions, records, and queries Stay organized with collections Save and categorize content based on your preferences.
This page describes change streams in Spanner for GoogleSQL-dialect databases andPostgreSQL-dialect databases, including:
- The split-based partitioning model
- The format and content of change stream records
- The low-level syntax used to query those records
- An example of the query workflow
You use the Spanner API toquery change streams directly.Applications that insteaduse Dataflow to read change streamdata don't need to work directly with the data model describedhere.
For a broader introductory guide to change streams, seeChange streamsoverview.
Change stream partitions
When a change occurs on a table that is watched by a change stream,Spanner writes a corresponding change stream record in thedatabase, synchronously in the same transaction as the data change. Thismeans that if the transaction succeeds, Spanner has alsosuccessfully captured and persisted the change. Internally,Spanner co-locates the change stream record and the data changeso that they are processed by the same server to minimize write overhead.
As part of the DML to a particular split, Spannerappends the write to the corresponding change stream datasplit in the same transaction. Because of this colocation, changestreams don't add extra coordination across serving resources, whichminimizes the transaction commit overhead.
Spanner scales by dynamically splitting and merging data basedon database load and size, and distributing splits across serving resources.
To enable change streams writes and reads to scale, Spannersplits and merges the internal change stream storage along with the databasedata, automatically avoidinghotspots. To supportreading change stream records in near real-time as database writes scale, theSpanner API is designed for a change stream to be queriedconcurrently using change stream partitions. Change stream partitions map tochange stream data splits that contain the change stream records. A changestream's partitions change dynamically over time and are correlated to howSpanner dynamically splits and merges the database data.
A change stream partition contains records for an immutable key range for aspecific time range. Any change stream partition can split into one or morechange stream partitions, or be merged with other change stream partitions. Whenthese split or merge events happen, child partitions are created to capture thechanges for their respective immutable key ranges for the next time range. Inaddition to data change records, a change stream query returns child partitionrecords to notify readers of new change stream partitions that need to bequeried, as well as heartbeat records to indicate forward progress when nowrites have occurred recently.
When querying a particular change stream partition, the change records arereturned in commit timestamp order. Each change record is returned exactlyonce. Across change stream partitions, change record ordering is not guaranteed.Change records for a particular primary key are returned only on onepartition for a particular time range.
Due to the parent-child partition lineage, in order to process changes for aparticular key in commit timestamp order, records returned from childpartitions should be processed only after records from all parentpartitions have been processed.
Change stream read functions and query syntax
GoogleSQL
To query change streams, use theExecuteStreamingSqlAPI. Spanner automatically creates a special read functionalong with the change stream. The read function provides access to thechange stream's records. The read function naming convention isREAD_change_stream_name.
Assuming a change streamSingersNameStream exists in the database, thequery syntax for GoogleSQL is the following:
SELECTChangeRecordFROMREAD_SingersNameStream(start_timestamp,end_timestamp,partition_token,heartbeat_milliseconds,read_options)The read function accepts the following arguments:
| Argument name | Type | Required? | Description |
|---|---|---|---|
start_timestamp | TIMESTAMP | Required | Specifies that records withcommit_timestamp greater than or equal tostart_timestamp should be returned. The value must be within the change stream retention period, and should be less than or equal to the current time, and greater than or equal to the timestamp of the change stream's creation. |
end_timestamp | TIMESTAMP | Optional (Default:NULL) | Specifies that records with acommit_timestamp less than or equal toend_timestamp should be returned. The value must be within the change stream retention period and greater or equal than thestart_timestamp. The query finishes either after returning allChangeRecords up to theend_timestamp or when you terminate the connection. Ifend_timestamp is set toNULL or isn't specified, the query continues execution until allChangeRecords are returned or until you terminate the connection. |
partition_token | STRING | Optional (Default:NULL) | Specifies which change stream partition to query, based on the content ofchild partitions records. IfNULL or not specified, this means the reader is querying the change stream for the first time, and has not obtained any specific partition tokens to query from. |
heartbeat_milliseconds | INT64 | Required | Determines how frequently a heartbeatChangeRecord is returned in case there are no transactions committed in this partition.The value must be between 1,000 (one second) and300,000 (five minutes). |
read_options | ARRAY | Optional (Default:NULL) | Adds read options reserved for future use. The only permitted value isNULL. |
We recommend making a helper method for building the text of theread function query and binding parameters to it, as shown in the followingexample.
Java
privatestaticfinalStringSINGERS_NAME_STREAM_QUERY_TEMPLATE="SELECT ChangeRecord FROM READ_SingersNameStream"+"("+" start_timestamp => @startTimestamp,"+" end_timestamp => @endTimestamp,"+" partition_token => @partitionToken,"+" heartbeat_milliseconds => @heartbeatMillis"+")";// Helper method to conveniently create change stream query texts and// bind parameters.publicstaticStatementgetChangeStreamQuery(StringpartitionToken,TimestampstartTimestamp,TimestampendTimestamp,longheartbeatMillis){returnStatement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE).bind("startTimestamp").to(startTimestamp).bind("endTimestamp").to(endTimestamp).bind("partitionToken").to(partitionToken).bind("heartbeatMillis").to(heartbeatMillis).build();}
PostgreSQL
To query change streams, use theExecuteStreamingSqlAPI. Spanner automatically creates a special read functionalong with the change stream. The read function provides access to thechange stream's records. The read function naming convention isspanner.read_json_change_stream_name.
Assuming a change streamSingersNameStream exists in the database, thequery syntax for PostgreSQL is the following:
SELECT*FROM"spanner"."read_json_SingersNameStream"(start_timestamp,end_timestamp,partition_token,heartbeat_milliseconds,null)The read function accepts the following arguments:
| Argument name | Type | Required? | Description |
|---|---|---|---|
start_timestamp | timestamp with time zone | Required | Specifies that change records withcommit_timestamp greater than or equal tostart_timestamp should be returned. The value must be within the change stream retention period, and should be less than or equal to the current time, and greater than or equal to the timestamp of the change stream's creation. |
end_timestamp | timestamp with timezone | Optional (Default:NULL) | Specifies that change records withcommit_timestamp less than or equal toend_timestamp should be returned. The value must be within the change stream retention period and greater or equal than thestart_timestamp. The query finishes either after returning all change records up to theend_timestamp or until you terminate the connection. IfNULL, the query continues execution until all change records are returned or until you terminate the connection. |
partition_token | text | Optional (Default:NULL) | Specifies which change stream partition to query, based on the content ofchild partitions records. IfNULL or not specified, this means the reader is querying the change stream for the first time, and has not obtained any specific partition tokens to query from. |
heartbeat_milliseconds | bigint | Required | Determines how frequently a heartbeatChangeRecord is returned when there are no transactions committed in this partition. The value must be between1,000 (one second) and300,000 (five minutes). |
null | null | Required | Reserved for future use |
We recommend making a helper method for building the text of theread function and binding parameters to it, as shown in the followingexample.
Java
privatestaticfinalStringSINGERS_NAME_STREAM_QUERY_TEMPLATE="SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""+"($1, $2, $3, $4, null)";// Helper method to conveniently create change stream query texts and// bind parameters.publicstaticStatementgetChangeStreamQuery(StringpartitionToken,TimestampstartTimestamp,TimestampendTimestamp,longheartbeatMillis){returnStatement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE).bind("p1").to(startTimestamp).bind("p2").to(endTimestamp).bind("p3").to(partitionToken).bind("p4").to(heartbeatMillis).build();}
Change streams record format
GoogleSQL
The change streams read function returns a singleChangeRecord column oftypeARRAY<STRUCT<...>>. In each row, this array always contains a singleelement.
The array elements have the following type:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>>>
There are three fields in thisSTRUCT:data_change_record,heartbeat_record andchild_partitions_record, each of typeARRAY<STRUCT<...>>. In any row that the change stream read functionreturns, only one of these three fields contains a value; the others twoare empty orNULL. These array fields contain, at most, one element.
The following sections examine each of these three record types.
PostgreSQL
The change streams read function returns a singleChangeRecord column oftypeJSON with the following structure:
{"data_change_record":{},"heartbeat_record":{},"child_partitions_record":{}}There are three possible keys in this object:data_change_record,heartbeat_record andchild_partitions_record, the corresponding valuetype isJSON. In any row that the change stream read function returns,only one of these three keys exists.
The following sections examine each of these three record types.
Data change records
A data change record contains a set of changes to a table with thesame modification type (insert, update, or delete) committed at the samecommit timestamp in one change stream partition for the sametransaction. Multiple data change records can be returned for the sametransaction across multiple change stream partitions.
All data change records havecommit_timestamp,server_transaction_id,andrecord_sequence fields, which together determine the order in the changestream for a stream record. These three fields are sufficient to derivethe ordering of changes and provide external consistency.
Note that multiple transactions can have the same commit timestamp ifthey touch non-overlapping data. Theserver_transaction_id fieldoffers the ability to distinguish which set of changes (potentiallyacross change stream partitions) were issued within the sametransaction. Pairing it with therecord_sequence andnumber_of_records_in_transaction fields allows you buffer and orderall the records from a particular transaction, as well.
The fields of a data change record include the following:
GoogleSQL
| Field | Type | Description |
|---|---|---|
commit_timestamp | TIMESTAMP | Indicates the timestamp in which the change was committed. |
record_sequence | STRING | Indicates the sequence number for the record within the transaction.Sequence numbers are unique and monotonically increasing (but notnecessarily contiguous) within a transaction. Sort the records for the sameserver_transaction_id byrecord_sequence toreconstruct the ordering of the changes within the transaction.Spanner might optimize this ordering for better performancesand it might not always match the original ordering that you provide. |
server_transaction_id | STRING | Provides a globally unique string that represents the transaction inwhich the change was committed. The value should only beused in the context of processing change stream records and is notcorrelated with the transaction id in Spanner's API. |
is_last_record_in_transaction_in_partition | BOOL | Indicates whether this is the last record for a transaction in thecurrent partition. |
table_name | STRING | Name of the table affected by the change. |
value_capture_type | STRING | Describes the value capture type that was specified in thechange stream configuration when this change was captured. The value capture type can be one of the following:
By default, it is |
column_types | [ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key":BOOLEAN "ordinal_position":NUMBER }, ...] | Indicates the name of the column, the column type,whether it is a primary key, and the position of the column asdefined in the schema (ordinal_position). The first column of atable in the schema would have an ordinal position of1. Thecolumn type may be nested for array columns. The format matches the typestructure described in theSpanner API reference. |
mods | [ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...]] | Describes the changes that were made, including the primary keyvalues, the old values, and the new values of the changed or tracked columns.The availability and content of the old and new values depends on theconfiguredvalue_capture_type. Thenew_values andold_values fields only contain the non-key columns. |
mod_type | STRING | Describes the type of change. One ofINSERT,UPDATE, orDELETE. |
number_of_records_in_transaction | INT64 | Indicates the number of data change records that are part of thistransaction across all change stream partitions. |
number_of_partitions_in_transaction | INT64 | Indicates the number of partitions that return data change records forthis transaction. |
transaction_tag | STRING | Indicates theTransaction tag associated with this transaction. |
is_system_transaction | BOOL | Indicates whether the transaction is a system transaction. |
PostgreSQL
| Field | Type | Description |
|---|---|---|
commit_timestamp | STRING | Indicates the timestamp at which the change was committed. |
record_sequence | STRING | Indicates the sequence number for the record within the transaction.Sequence numbers are unique and monotonically increasing (but notnecessarily contiguous) within a transaction. Sort the records for the sameserver_transaction_id byrecord_sequence toreconstruct the ordering of the changes within the transaction. |
server_transaction_id | STRING | Provides a globally unique string that represents the transaction inwhich the change was committed. The value should only beused in the context of processing change stream records and is notcorrelated with the transaction id in Spanner's API |
is_last_record_in_transaction_in_partition | BOOLEAN | Indicates whether this is the last record for a transaction in thecurrent partition. |
table_name | STRING | Indicates the name of the table affected by the change. |
value_capture_type | STRING | Describes the value capture type that was specified in thechange stream configuration when this change was captured. The value capture type can be one of the following:
By default, it is |
column_types | [ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key":BOOLEAN "ordinal_position":NUMBER }, ...] | Indicates the name of the column, the column type,whether it's a primary key, and the position of the column asdefined in the schema (ordinal_position). The first column of atable in the schema would have an ordinal position of1. Thecolumn type may be nested for array columns. The format matches the typestructure described in theSpanner API reference. |
mods | [ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...]] | Describes the changes that were made, including the primary keyvalues, the old values, and the new values of the changed or trackedcolumns. The availability and content of the old and new values dependson the configuredvalue_capture_type. Thenew_values andold_values fields only contain thenon-key columns. |
mod_type | STRING | Describes the type of change. One ofINSERT,UPDATE, orDELETE. |
number_of_records_in_transaction | INT64 | Indicates the number of data change records that are part of thistransaction across all change stream partitions. |
number_of_partitions_in_transaction | NUMBER | Indicates the number of partitions that return data change records forthis transaction. |
transaction_tag | STRING | Indicates theTransaction tag associated with this transaction. |
is_system_transaction | BOOLEAN | Indicates whether the transaction is a system transaction. |
Example data change record
A pair of example data change records follow. They describe a single transactionwhere there is a transfer between two accounts. The two accounts are in separatechange stream partitions.
"data_change_record":{"commit_timestamp":"2022-09-27T12:30:00.123456Z",// record_sequence is unique and monotonically increasing within a// transaction, across all partitions."record_sequence":"00000000","server_transaction_id":"6329047911","is_last_record_in_transaction_in_partition":true,"table_name":"AccountBalance","column_types":[{"name":"AccountId","type":{"code":"STRING"},"is_primary_key":true,"ordinal_position":1},{"name":"LastUpdate","type":{"code":"TIMESTAMP"},"is_primary_key":false,"ordinal_position":2},{"name":"Balance","type":{"code":"INT"},"is_primary_key":false,"ordinal_position":3}],"mods":[{"keys":{"AccountId":"Id1"},"new_values":{"LastUpdate":"2022-09-27T12:30:00.123456Z","Balance":1000},"old_values":{"LastUpdate":"2022-09-26T11:28:00.189413Z","Balance":1500},}],"mod_type":"UPDATE",// options are INSERT, UPDATE, DELETE"value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":2,"number_of_partitions_in_transaction":2,"transaction_tag":"app=banking,env=prod,action=update","is_system_transaction":false,}"data_change_record":{"commit_timestamp":"2022-09-27T12:30:00.123456Z","record_sequence":"00000001","server_transaction_id":"6329047911","is_last_record_in_transaction_in_partition":true,"table_name":"AccountBalance","column_types":[{"name":"AccountId","type":{"code":"STRING"},"is_primary_key":true,"ordinal_position":1},{"name":"LastUpdate","type":{"code":"TIMESTAMP"},"is_primary_key":false,"ordinal_position":2},{"name":"Balance","type":{"code":"INT"},"is_primary_key":false,"ordinal_position":3}],"mods":[{"keys":{"AccountId":"Id2"},"new_values":{"LastUpdate":"2022-09-27T12:30:00.123456Z","Balance":2000},"old_values":{"LastUpdate":"2022-01-20T11:25:00.199915Z","Balance":1500},},...],"mod_type":"UPDATE",// options are INSERT, UPDATE, DELETE"value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":2,"number_of_partitions_in_transaction":2,"transaction_tag":"app=banking,env=prod,action=update","is_system_transaction":false,}The following data change record is an example of a record with the valuecapture typeNEW_VALUES. Note that only new values are populated.Only theLastUpdate column was modified, so only that columnwas returned.
"data_change_record":{"commit_timestamp":"2022-09-27T12:30:00.123456Z",// record_sequence is unique and monotonically increasing within a// transaction, across all partitions."record_sequence":"00000000","server_transaction_id":"6329047911","is_last_record_in_transaction_in_partition":true,"table_name":"AccountBalance","column_types":[{"name":"AccountId","type":{"code":"STRING"},"is_primary_key":true,"ordinal_position":1},{"name":"LastUpdate","type":{"code":"TIMESTAMP"},"is_primary_key":false,"ordinal_position":2}],"mods":[{"keys":{"AccountId":"Id1"},"new_values":{"LastUpdate":"2022-09-27T12:30:00.123456Z"},"old_values":{}}],"mod_type":"UPDATE",// options are INSERT, UPDATE, DELETE"value_capture_type":"NEW_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"app=banking,env=prod,action=update","is_system_transaction":false}The following data change record is an example of a record with the valuecapture typeNEW_ROW. Only theLastUpdatecolumn was modified, but all tracked columns are returned.
"data_change_record":{"commit_timestamp":"2022-09-27T12:30:00.123456Z",// record_sequence is unique and monotonically increasing within a// transaction, across all partitions."record_sequence":"00000000","server_transaction_id":"6329047911","is_last_record_in_transaction_in_partition":true,"table_name":"AccountBalance","column_types":[{"name":"AccountId","type":{"code":"STRING"},"is_primary_key":true,"ordinal_position":1},{"name":"LastUpdate","type":{"code":"TIMESTAMP"},"is_primary_key":false,"ordinal_position":2},{"name":"Balance","type":{"code":"INT"},"is_primary_key":false,"ordinal_position":3}],"mods":[{"keys":{"AccountId":"Id1"},"new_values":{"LastUpdate":"2022-09-27T12:30:00.123456Z","Balance":1000},"old_values":{}}],"mod_type":"UPDATE",// options are INSERT, UPDATE, DELETE"value_capture_type":"NEW_ROW","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"app=banking,env=prod,action=update","is_system_transaction":false}The following data change record is an example of a record with the valuecapture typeNEW_ROW_AND_OLD_VALUES. Only theLastUpdate column wasmodified, but all tracked columns are returned. This value capture type capturesthe new value and old value ofLastUpdate.
"data_change_record":{"commit_timestamp":"2022-09-27T12:30:00.123456Z",// record_sequence is unique and monotonically increasing within a// transaction, across all partitions."record_sequence":"00000000","server_transaction_id":"6329047911","is_last_record_in_transaction_in_partition":true,"table_name":"AccountBalance","column_types":[{"name":"AccountId","type":{"code":"STRING"},"is_primary_key":true,"ordinal_position":1},{"name":"LastUpdate","type":{"code":"TIMESTAMP"},"is_primary_key":false,"ordinal_position":2},{"name":"Balance","type":{"code":"INT"},"is_primary_key":false,"ordinal_position":3}],"mods":[{"keys":{"AccountId":"Id1"},"new_values":{"LastUpdate":"2022-09-27T12:30:00.123456Z","Balance":1000},"old_values":{"LastUpdate":"2022-09-26T11:28:00.189413Z"}}],"mod_type":"UPDATE",// options are INSERT, UPDATE, DELETE"value_capture_type":"NEW_ROW_AND_OLD_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"app=banking,env=prod,action=update","is_system_transaction":false}Heartbeat records
When a heartbeat record is returned, it indicates that all changes withcommit_timestamp less than or equal to the heartbeat record'stimestamp have been returned, and future data records in thispartition must have higher commit timestamps than that returned by theheartbeat record. Heartbeat records are returned when there are no datachanges written to a partition. When there are data changes written tothe partition,data_change_record.commit_timestamp can be used insteadofheartbeat_record.timestamp to tell that the reader is making forwardprogress in reading the partition.
You can use heartbeat records returned on partitions to synchronizereaders across all partitions. Once all readers have received either aheartbeat greater than or equal to some timestampA or have received data orchild partition records greater than or equal to timestampA, the readers knowthey have received all records committed at or before that timestampA and canstart processing the buffered records—for example, sorting the cross-partitionrecords by timestamp and grouping them byserver_transaction_id.
A heartbeat record contains only one field:
GoogleSQL
| Field | Type | Description |
|---|---|---|
timestamp | TIMESTAMP | Indicates the heartbeat record's timestamp. |
PostgreSQL
| Field | Type | Description |
|---|---|---|
timestamp | STRING | Indicates the heartbeat record's timestamp. |
Example heartbeat record
An example heartbeat record, communicating that all records with timestampsless or equal than this record's timestamp have been returned:
heartbeat_record:{"timestamp":"2022-09-27T12:35:00.312486Z"}Child partition records
Child partition records returns information about child partitions: theirpartition tokens, the tokens of their parent partitions, and thestart_timestamp that represents the earliest timestamp that the childpartitions contain change records for. Records whose commit timestampsare immediately prior to thechild_partitions_record.start_timestamp arereturned in the current partition. After returning all thechild partition records for this partition, this query returns witha success status, indicating all records have been returned for thispartition.
The fields of a child partition record includes the following:
GoogleSQL
| Field | Type | Description |
|---|---|---|
start_timestamp | TIMESTAMP | Indicates that the data change records returned from childpartitions in this child partition record have a commit timestampgreater than or equal tostart_timestamp. When querying a childpartition, the query should specify the child partition token and astart_timestamp greater than or equal tochild_partitions_token.start_timestamp. All child partitionsrecords returned by a partition have the samestart_timestampand the timestamp always falls between the query's specifiedstart_timestamp andend_timestamp. |
record_sequence | STRING | Indicates a monotonically increasing sequence number that can be used todefine the ordering of the child partition records when there are multiplechild partition records returned with the samestart_timestampin a particular partition. The partition token,start_timestampandrecord_sequence uniquely identify a child partition record. |
child_partitions | [ { "token" : "STRING", "parent_partition_tokens" : ["STRING"] }] | Returns a set of child partitions and their associated information.This includes the partition token string used to identify the childpartition in queries, as well as the tokens of its parentpartitions. |
PostgreSQL
| Field | Type | Description |
|---|---|---|
start_timestamp | STRING | Indicates that the data change records returned from childpartitions in this child partition record have a commit timestampgreater than or equal tostart_timestamp. When querying a childpartition, the query should specify the child partition token and astart_timestamp greater than or equal tochild_partitions_token.start_timestamp. All child partitionsrecords returned by a partition have the samestart_timestampand the timestamp always falls between the query's specifiedstart_timestamp andend_timestamp. |
record_sequence | STRING | Indicates a monotonically increasing sequence number that can be used todefine the ordering of the child partition records when there are multiplechild partition records returned with the samestart_timestampin a particular partition. The partition token,start_timestampandrecord_sequence uniquely identify a child partition record. |
child_partitions | [ { "token": "STRING", "parent_partition_tokens": ["STRING"], }, [...]] | Returns an array of child partitions and their associated information.This includes the partition token string used to identify the childpartition in queries, as well as the tokens of its parent partitions. |
Example child partition record
The following is an example of a child partition record:
child_partitions_record:{"start_timestamp":"2022-09-27T12:40:00.562986Z","record_sequence":"00000001","child_partitions":[{"token":"child_token_1",// To make sure changes for a key is processed in timestamp// order, wait until the records returned from all parents// have been processed."parent_partition_tokens":["parent_token_1","parent_token_2"]}],}Change streams query workflow
Run change stream queries using theExecuteStreamingSql API, with a single-useread-onlytransaction and astrongtimestamp bound. The changestream read function lets you specify thestart_timestamp andend_timestamp for the time range of interest. All change recordswithin the retention period are accessible using the strong read-onlytimestamp bound.
All otherTransactionOptionsare invalid for change stream queries. In addition,ifTransactionOptions.read_only.return_read_timestamp is set totrue,a special value ofkint64max - 1 is returned in theTransactionmessage that describes the transaction, instead of a valid readtimestamp. This special value should be discarded and not used for anysubsequent queries.
Each change stream query can return any number of rows, each containingeither a data change record, heartbeat record, or child partitionsrecord. There is no need to set a deadline for the request.
Example change stream query workflow
The streaming query workflow begins with issuing the very first change streamquery by specifying thepartition_token toNULL. The query needs to specifythe read function for the change stream, start and end timestamp of interest,and the heartbeat interval. When theend_timestamp isNULL, the query keepsreturning data changes until the partition ends.
GoogleSQL
SELECTChangeRecordFROMREAD_SingersNameStream(start_timestamp=>"2022-05-01T09:00:00Z",end_timestamp=>NULL,partition_token=>NULL,heartbeat_milliseconds=>10000);PostgreSQL
SELECT*FROM"spanner"."read_json_SingersNameStream"('2022-05-01T09:00:00Z',NULL,NULL,10000,NULL);Process data records from this query until all child partition records arereturned. In the following example, two child partition records and threepartition tokens are returned, then the query terminates. Child partitionrecords from a specific query always shares the samestart_timestamp.
child_partitions_record:{"record_type":"child_partitions","start_timestamp":"2022-05-01T09:00:01Z","record_sequence":"1000012389","child_partitions":[{"token":"child_token_1",// Note parent tokens are null for child partitions returned// from the initial change stream queries."parent_partition_tokens":[NULL]}{"token":"child_token_2","parent_partition_tokens":[NULL]}],}child_partitions_record:{"record_type":"child_partitions","start_timestamp":"2022-05-01T09:00:01Z","record_sequence":"1000012390","child_partitions":[{"token":"child_token_3","parent_partition_tokens":[NULL]}],}To process changes after2022-05-01T09:00:01Z, create three new queries andrun them in parallel. Used together, the three queries return data changes forthe same key range their parent covers. Always set thestart_timestamp to thestart_timestamp in the same child partition record and use the sameend_timestamp and heartbeat interval to process the records consistentlyacross all queries.
GoogleSQL
SELECTChangeRecordFROMREAD_SingersNameStream(start_timestamp=>"2022-05-01T09:00:01Z",end_timestamp=>NULL,partition_token=>"child_token_1",heartbeat_milliseconds=>10000);SELECTChangeRecordFROMREAD_SingersNameStream(start_timestamp=>"2022-05-01T09:00:01Z",end_timestamp=>NULL,partition_token=>"child_token_2",heartbeat_milliseconds=>10000);SELECTChangeRecordFROMREAD_SingersNameStream(start_timestamp=>"2022-05-01T09:00:01Z",end_timestamp=>NULL,partition_token=>"child_token_3",heartbeat_milliseconds=>10000);PostgreSQL
SELECT*FROM"spanner"."read_json_SingersNameStream"('2022-05-01T09:00:01Z',NULL,'child_token_1',10000,NULL);SELECT*FROM"spanner"."read_json_SingersNameStream"('2022-05-01T09:00:01Z',NULL,'child_token_2',10000,NULL);SELECT*FROM"spanner"."read_json_SingersNameStream"('2022-05-01T09:00:01Z',NULL,'child_token_3',10000,NULL);The query onchild_token_2 finishes after returning another child partitionrecord. This record indicates that a new partition is covering changes for bothchild_token_2 andchild_token_3 starting at2022-05-01T09:30:15Z. Theexact same record is returned by the query onchild_token_3, because both arethe parent partitions of the newchild_token_4. To ensure a strict orderedprocessing of data records for a particular key, the query onchild_token_4must start after all the parents have finished. In this case, the parents arechild_token_2 andchild_token_3. Only create one query for each childpartition token. The query workflow design should appoint one parent to wait andschedule the query onchild_token_4.
child_partitions_record:{"record_type":"child_partitions","start_timestamp":"2022-05-01T09:30:15Z","record_sequence":"1000012389","child_partitions":[{"token":"child_token_4","parent_partition_tokens":["child_token_2","child_token_3"],}],}GoogleSQL
SELECTChangeRecordFROMREAD_SingersNameStream(start_timestamp=>"2022-05-01T09:30:15Z",end_timestamp=>NULL,partition_token=>"child_token_4",heartbeat_milliseconds=>10000);PostgreSQL
SELECT*FROM"spanner"."read_json_SingersNameStream"('2022-05-01T09:30:15Z',NULL,'child_token_4',10000,NULL);Find examples of handling and parsing change stream records in the Apache BeamSpannerIO Dataflow connector onGitHub.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-12-17 UTC.