Stream changes with Dataflow
The Bigtable Beam connector lets you use Dataflow toread Bigtable data change records without needing to track orprocesspartitionchanges in yourcode, because the connector handles that logic for you.
This document describes how to configure and use the Bigtable Beam connector to read a change stream using a Dataflow pipeline.Before you read this document, you should read theOverview of changestreams and be familiar withDataflow.
Alternatives to building your own pipeline
If you don't want to build your own Dataflowpipeline, then you can use one of the following options.
You can use a Google-provided Dataflow template.
You can also use the code samples from the Bigtable tutorial orquickstart as a starting point for your code.
Make sure that the code that you generate usesgoogle cloud libraries-bom version 26.14.0 or later.
Connector details
The Bigtable Beam connector method,BigtableIO.readChangeStream, lets you read a stream of datachange records (ChangeStreamMutation) that you can process. The Bigtable Beam connector is acomponent of theApache Beam GitHubrepository. For a description of the connector code, see the comments atBigtableIO.java.
You must use the connector with Beam version 2.48.0 or later. CheckApache Beamruntime support to make sure thatyou're using a supported version of Java. Then you can deploy a pipeline thatuses the connector toDataflow, which handles the provisioning and managementof resources and assists with the scalability and reliability of stream dataprocessing.
For more information on the Apache Beam programming model, see theBeam documentation.
Grouping data without event times
Data change records streamed using the Bigtable Beam connector aren't compatible with Dataflowfunctions that depend on event times.
As explained inReplication andwatermarks, alow watermark might not advance if replication for the partition hasn't caughtup to the rest of the instance. When a low watermark stops advancing, it cancause the change stream to stall.
To prevent the stream from stalling, the Bigtable Beam connector outputs all data with an output timestamp ofzero. The zero timestamp makes Dataflow consider all the datachange records to belate data.As a result, Dataflow features that depend on event times aren'tcompatible with Bigtable change streams. Specifically, you can'tusewindowing functions,event-time triggers,orevent-time timers.
Instead, you can useGlobalWindows with non-event time triggers to group this late data into panes, as demonstratedin theexample from the tutorial. For details on triggers and panes, seeTriggers in the Beam programming guide.
Autoscaling
The connector supportsDataflow autoscaling,which is enabled by default when usingRunner v2 (required). The Dataflow autoscaling algorithm takes into accountthe estimated change stream backlog, which can be monitored on theDataflow monitoring page in theBacklog section. Use the--maxNumWorkers flag when deploying ajob to cap the number of workers.
To manually scale your pipeline instead of using autoscaling, seeManually scaling a streaming pipeline.
Limitations
Note the following limitations before using the Bigtable Beam connector with Dataflow.
Dataflow Runner V2
The connector can only be executed usingDataflow Runner v2.To enable this, specify--experiments=use_runner_v2 in your command-linearguments. Running with Runner v1 causes your pipeline to fail with thefollowing exception:
java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable DataflowSnapshots
The connector does not supportDataflow snapshots.
Duplicates
The Bigtable Beam connector streams changes for each row key and eachcluster in commit timestamp order but, because it sometimes restarts fromearlier times in the stream, it can produce duplicates.
Pipeline restarts
If a Dataflow pipeline has stopped for a long time, data changerecords can fall behind the retention boundary. When the pipeline is resumed,Bigtable fails the pipeline so that you can start a new pipelinewith a new request start time that is within the retention period.Bigtable does this, instead of silently advancing the request timeof the original pipeline, to prevent the unintended dropping of data changerecords with timestamps that fall outside of the specified retention period.
Before you begin
Before you use the connector, complete the following prerequisites.
Set up authentication
To use the Java samples on this page in a local development environment, install and initialize the gcloud CLI, and then set up Application Default Credentials with your user credentials.
Install the Google Cloud CLI.
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
If you're using a local shell, then create local authentication credentials for your user account:
gcloudauthapplication-defaultlogin
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
For more information, see Set up authentication for a local development environment.
For information about setting up authentication for a production environment, see Set up Application Default Credentials for code running on Google Cloud.
Enable a change stream
You mustenable a change stream on a table before you can read it. You can alsocreate a new table with change streams enabled.
Change stream metadata table
When you stream changes with Dataflow, theBigtable Beam connector creates a metadata table that is named__change_stream_md_table by default. The change stream metadata table managesthe operational state of the connector and stores metadata about data changerecords.
By default, the connector creates the table in the same instance as the tablethat is being streamed. To ensure that the table works correctly, the app profile for themetadata table must use single-cluster routing and have single-rowtransactions enabled.
For more information about streaming changes from Bigtable withthe Bigtable Beam connector, see theBigtableIOdocumentation.
Required roles
To get the permissions that you need to read a Bigtable changestream using Dataflow, ask your administrator to grant you thefollowing IAM roles.
To read the changes from Bigtable, you need this role:
- Bigtable Administrator(roles/bigtable.admin)on the Bigtable instance that contains the table you plan tostream changes from
To run the Dataflow job, you need these roles:
- Dataflow Developer(
roles/dataflow.developer)on the project containing your Cloud resources - Dataflow Worker(roles/dataflow.worker)on the project containing your Cloud resources
- Storage Object Admin(roles/storage.objectAdmin)on the Cloud Storage buckets that you plan to use
For more information about granting roles, seeManage access.
You might also be able to get the required permissions throughcustom roles or otherpredefined roles.
Add the Bigtable Beam connector as a dependency
Add code similar to the following dependency to your Maven pom.xml file. Theversion must be 2.48.0 or later.
<dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-google-cloud-platform</artifactId><version>VERSION</version></dependency></dependencies>Read the change stream
To build a Dataflow pipeline to read your data change records,you configure the connector and then add transforms and sinks. Then you use theconnector to readChangeStreamMutation objects in a Beam pipeline.
The code samples in this section, written in Java, demonstrate how to build apipeline and use it to convert key-value pairs into a string. Each pair consistsof a row key and aChangeStreamMutation object. The pipeline converts eachobject's entries to a comma-separated string.
Build the pipeline
This Java code sample demonstrates how to build the pipeline:
BigtableOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);Pipelinep=Pipeline.create(options);finalInstantstartTime=Instant.now();p.apply("Read Change Stream",BigtableIO.readChangeStream().withProjectId(options.getBigtableProjectId()).withInstanceId(options.getBigtableInstanceId()).withTableId(options.getBigtableTableId()).withAppProfileId(options.getBigtableAppProfile()).withStartTime(startTime)).apply("Flatten Mutation Entries",FlatMapElements.into(TypeDescriptors.strings()).via(ChangeStreamsHelloWorld::mutationEntriesToString)).apply("Print mutations",ParDo.of(newDoFn<String,Void>(){// a DoFn as an anonymous inner class instance@ProcessElementpublicvoidprocessElement(@ElementStringmutation){System.out.println("Change captured: "+mutation);}}));p.run();Process the data change records
This sample demonstrates how to loop through all the entries in a data change recordfor a row and call a convert-to-string method based on entry type.
For a list of entry types that a data change record can contain, seeWhat's in a data change record.
staticList<String>mutationEntriesToString(KV<ByteString,ChangeStreamMutation>mutationPair){List<String>mutations=newArrayList<>();StringrowKey=mutationPair.getKey().toStringUtf8();ChangeStreamMutationmutation=mutationPair.getValue();MutationTypemutationType=mutation.getType();for(Entryentry:mutation.getEntries()){if(entryinstanceofSetCell){mutations.add(setCellToString(rowKey,mutationType,(SetCell)entry));}elseif(entryinstanceofDeleteCells){mutations.add(deleteCellsToString(rowKey,mutationType,(DeleteCells)entry));}elseif(entryinstanceofDeleteFamily){// Note: DeleteRow mutations are mapped into one DeleteFamily per-familymutations.add(deleteFamilyToString(rowKey,mutationType,(DeleteFamily)entry));}else{thrownewRuntimeException("Entry type not supported.");}}returnmutations;}In this sample awrite entry is converted:
privatestaticStringsetCellToString(StringrowKey,MutationTypemutationType,SetCellsetCell){List<String>mutationParts=Arrays.asList(rowKey,mutationType.name(),"SetCell",setCell.getFamilyName(),setCell.getQualifier().toStringUtf8(),setCell.getValue().toStringUtf8());returnString.join(",",mutationParts);}In this sample adeletion of cells entry is converted:
privatestaticStringdeleteCellsToString(StringrowKey,MutationTypemutationType,DeleteCellsdeleteCells){StringtimestampRange=deleteCells.getTimestampRange().getStart()+"-"+deleteCells.getTimestampRange().getEnd();List<String>mutationParts=Arrays.asList(rowKey,mutationType.name(),"DeleteCells",deleteCells.getFamilyName(),deleteCells.getQualifier().toStringUtf8(),timestampRange);returnString.join(",",mutationParts);}In this sample, adeletion of a column family entry is converted:
privatestaticStringdeleteFamilyToString(StringrowKey,MutationTypemutationType,DeleteFamilydeleteFamily){List<String>mutationParts=Arrays.asList(rowKey,mutationType.name(),"DeleteFamily",deleteFamily.getFamilyName());returnString.join(",",mutationParts);}Monitor
The following resources in the Google Cloud console let you monitor yourGoogle Cloud resources while you run a Dataflow pipeline toread a Bigtable change stream:
In particular, check the following metrics:
- On the Bigtable system insights page, check the followingmetrics:
- CPU utilization by change streams data in the metric
cpu_load_by_app_profile_by_method_by_table. Shows the change stream'simpact on your cluster's CPU usage. - Change stream storage utilization (bytes)(
change_stream_log_used_bytes).
- CPU utilization by change streams data in the metric
On the Dataflow monitoring page, checkdata freshness.This metric shows the difference between the current time and thewatermark, which is approximately two minutes, with occasional spikes that area minute or two longer. Data freshness doesn'tindicate if data change records are being processed slowly. To ensure the continuous health and performance of your criticalapplications, monitor the Dataflow data freshness metric and take the following actions:
- If the data freshness metric is consistently higher than the threshold,your pipeline might be under-resourced. We recommend that you add moreDataflow workers.
- If the Dataflow workers arewell provisioned but the data freshness has been increasing or isconsistently high, contactGoogle Cloud Support.
The Dataflow
processing_delay_from_commit_timestamp_MEANmetric can tell you the mean processing time of data change records over thelifetime of the job.
The Bigtableserver/latencies metric is not useful when you'remonitoring a Dataflow pipeline that is reading aBigtable change stream, because it reflects the streaming requestduration, not the data change record processing latency. High latency in achange stream doesn't mean the requests are being processed slowly; it meansthe connection was open for that long.
What's next
- Learn how to write from Dataflow to Cloud Storage.
- View the full list of monitoring metrics provided by Bigtable.
- Use monitoring to explore Dataflow metrics.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-12-15 UTC.