Stream changes with Dataflow

The Bigtable Beam connector lets you use Dataflow toread Bigtable data change records without needing to track orprocesspartitionchanges in yourcode, because the connector handles that logic for you.

This document describes how to configure and use the Bigtable Beam connector to read a change stream using a Dataflow pipeline.Before you read this document, you should read theOverview of changestreams and be familiar withDataflow.

Alternatives to building your own pipeline

If you don't want to build your own Dataflowpipeline, then you can use one of the following options.

You can use a Google-provided Dataflow template.

You can also use the code samples from the Bigtable tutorial orquickstart as a starting point for your code.

Make sure that the code that you generate usesgoogle cloud libraries-bom version 26.14.0 or later.

Connector details

The Bigtable Beam connector method,BigtableIO.readChangeStream, lets you read a stream of datachange records (ChangeStreamMutation) that you can process. The Bigtable Beam connector is acomponent of theApache Beam GitHubrepository. For a description of the connector code, see the comments atBigtableIO.java.

You must use the connector with Beam version 2.48.0 or later. CheckApache Beamruntime support to make sure thatyou're using a supported version of Java. Then you can deploy a pipeline thatuses the connector toDataflow, which handles the provisioning and managementof resources and assists with the scalability and reliability of stream dataprocessing.

For more information on the Apache Beam programming model, see theBeam documentation.

Grouping data without event times

Data change records streamed using the Bigtable Beam connector aren't compatible with Dataflowfunctions that depend on event times.

As explained inReplication andwatermarks, alow watermark might not advance if replication for the partition hasn't caughtup to the rest of the instance. When a low watermark stops advancing, it cancause the change stream to stall.

To prevent the stream from stalling, the Bigtable Beam connector outputs all data with an output timestamp ofzero. The zero timestamp makes Dataflow consider all the datachange records to belate data.As a result, Dataflow features that depend on event times aren'tcompatible with Bigtable change streams. Specifically, you can'tusewindowing functions,event-time triggers,orevent-time timers.

Instead, you can useGlobalWindows with non-event time triggers to group this late data into panes, as demonstratedin theexample from the tutorial. For details on triggers and panes, seeTriggers in the Beam programming guide.

Autoscaling

The connector supportsDataflow autoscaling,which is enabled by default when usingRunner v2 (required). The Dataflow autoscaling algorithm takes into accountthe estimated change stream backlog, which can be monitored on theDataflow monitoring page in theBacklog section. Use the--maxNumWorkers flag when deploying ajob to cap the number of workers.

To manually scale your pipeline instead of using autoscaling, seeManually scaling a streaming pipeline.

Limitations

Note the following limitations before using the Bigtable Beam connector with Dataflow.

Dataflow Runner V2

The connector can only be executed usingDataflow Runner v2.To enable this, specify--experiments=use_runner_v2 in your command-linearguments. Running with Runner v1 causes your pipeline to fail with thefollowing exception:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshots

The connector does not supportDataflow snapshots.

Duplicates

The Bigtable Beam connector streams changes for each row key and eachcluster in commit timestamp order but, because it sometimes restarts fromearlier times in the stream, it can produce duplicates.

Pipeline restarts

If a Dataflow pipeline has stopped for a long time, data changerecords can fall behind the retention boundary. When the pipeline is resumed,Bigtable fails the pipeline so that you can start a new pipelinewith a new request start time that is within the retention period.Bigtable does this, instead of silently advancing the request timeof the original pipeline, to prevent the unintended dropping of data changerecords with timestamps that fall outside of the specified retention period.

Before you begin

Before you use the connector, complete the following prerequisites.

Set up authentication

To use the Java samples on this page in a local development environment, install and initialize the gcloud CLI, and then set up Application Default Credentials with your user credentials.

    Install the Google Cloud CLI.

    If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloudauthapplication-defaultlogin

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

For more information, see Set up authentication for a local development environment.

For information about setting up authentication for a production environment, see Set up Application Default Credentials for code running on Google Cloud.

Enable a change stream

You mustenable a change stream on a table before you can read it. You can alsocreate a new table with change streams enabled.

Change stream metadata table

When you stream changes with Dataflow, theBigtable Beam connector creates a metadata table that is named__change_stream_md_table by default. The change stream metadata table managesthe operational state of the connector and stores metadata about data changerecords.

By default, the connector creates the table in the same instance as the tablethat is being streamed. To ensure that the table works correctly, the app profile for themetadata table must use single-cluster routing and have single-rowtransactions enabled.

For more information about streaming changes from Bigtable withthe Bigtable Beam connector, see theBigtableIOdocumentation.

Required roles

To get the permissions that you need to read a Bigtable changestream using Dataflow, ask your administrator to grant you thefollowing IAM roles.

To read the changes from Bigtable, you need this role:

  • Bigtable Administrator(roles/bigtable.admin)on the Bigtable instance that contains the table you plan tostream changes from

To run the Dataflow job, you need these roles:

For more information about granting roles, seeManage access.

You might also be able to get the required permissions throughcustom roles or otherpredefined roles.

Add the Bigtable Beam connector as a dependency

Add code similar to the following dependency to your Maven pom.xml file. Theversion must be 2.48.0 or later.

<dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-google-cloud-platform</artifactId><version>VERSION</version></dependency></dependencies>

Read the change stream

To build a Dataflow pipeline to read your data change records,you configure the connector and then add transforms and sinks. Then you use theconnector to readChangeStreamMutation objects in a Beam pipeline.

The code samples in this section, written in Java, demonstrate how to build apipeline and use it to convert key-value pairs into a string. Each pair consistsof a row key and aChangeStreamMutation object. The pipeline converts eachobject's entries to a comma-separated string.

Build the pipeline

This Java code sample demonstrates how to build the pipeline:

BigtableOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);Pipelinep=Pipeline.create(options);finalInstantstartTime=Instant.now();p.apply("Read Change Stream",BigtableIO.readChangeStream().withProjectId(options.getBigtableProjectId()).withInstanceId(options.getBigtableInstanceId()).withTableId(options.getBigtableTableId()).withAppProfileId(options.getBigtableAppProfile()).withStartTime(startTime)).apply("Flatten Mutation Entries",FlatMapElements.into(TypeDescriptors.strings()).via(ChangeStreamsHelloWorld::mutationEntriesToString)).apply("Print mutations",ParDo.of(newDoFn<String,Void>(){// a DoFn as an anonymous inner class instance@ProcessElementpublicvoidprocessElement(@ElementStringmutation){System.out.println("Change captured: "+mutation);}}));p.run();

Process the data change records

This sample demonstrates how to loop through all the entries in a data change recordfor a row and call a convert-to-string method based on entry type.

For a list of entry types that a data change record can contain, seeWhat's in a data change record.

staticList<String>mutationEntriesToString(KV<ByteString,ChangeStreamMutation>mutationPair){List<String>mutations=newArrayList<>();StringrowKey=mutationPair.getKey().toStringUtf8();ChangeStreamMutationmutation=mutationPair.getValue();MutationTypemutationType=mutation.getType();for(Entryentry:mutation.getEntries()){if(entryinstanceofSetCell){mutations.add(setCellToString(rowKey,mutationType,(SetCell)entry));}elseif(entryinstanceofDeleteCells){mutations.add(deleteCellsToString(rowKey,mutationType,(DeleteCells)entry));}elseif(entryinstanceofDeleteFamily){// Note: DeleteRow mutations are mapped into one DeleteFamily per-familymutations.add(deleteFamilyToString(rowKey,mutationType,(DeleteFamily)entry));}else{thrownewRuntimeException("Entry type not supported.");}}returnmutations;}

In this sample awrite entry is converted:

privatestaticStringsetCellToString(StringrowKey,MutationTypemutationType,SetCellsetCell){List<String>mutationParts=Arrays.asList(rowKey,mutationType.name(),"SetCell",setCell.getFamilyName(),setCell.getQualifier().toStringUtf8(),setCell.getValue().toStringUtf8());returnString.join(",",mutationParts);}

In this sample adeletion of cells entry is converted:

privatestaticStringdeleteCellsToString(StringrowKey,MutationTypemutationType,DeleteCellsdeleteCells){StringtimestampRange=deleteCells.getTimestampRange().getStart()+"-"+deleteCells.getTimestampRange().getEnd();List<String>mutationParts=Arrays.asList(rowKey,mutationType.name(),"DeleteCells",deleteCells.getFamilyName(),deleteCells.getQualifier().toStringUtf8(),timestampRange);returnString.join(",",mutationParts);}

In this sample, adeletion of a column family entry is converted:

privatestaticStringdeleteFamilyToString(StringrowKey,MutationTypemutationType,DeleteFamilydeleteFamily){List<String>mutationParts=Arrays.asList(rowKey,mutationType.name(),"DeleteFamily",deleteFamily.getFamilyName());returnString.join(",",mutationParts);}

Monitor

The following resources in the Google Cloud console let you monitor yourGoogle Cloud resources while you run a Dataflow pipeline toread a Bigtable change stream:

In particular, check the following metrics:

  • On the Bigtable system insights page, check the followingmetrics:
    • CPU utilization by change streams data in the metriccpu_load_by_app_profile_by_method_by_table. Shows the change stream'simpact on your cluster's CPU usage.
    • Change stream storage utilization (bytes)(change_stream_log_used_bytes).
  • On the Dataflow monitoring page, checkdata freshness.This metric shows the difference between the current time and thewatermark, which is approximately two minutes, with occasional spikes that area minute or two longer. Data freshness doesn'tindicate if data change records are being processed slowly. To ensure the continuous health and performance of your criticalapplications, monitor the Dataflow data freshness metric and take the following actions:

    • If the data freshness metric is consistently higher than the threshold,your pipeline might be under-resourced. We recommend that you add moreDataflow workers.
    • If the Dataflow workers arewell provisioned but the data freshness has been increasing or isconsistently high, contactGoogle Cloud Support.
  • The Dataflowprocessing_delay_from_commit_timestamp_MEANmetric can tell you the mean processing time of data change records over thelifetime of the job.

The Bigtableserver/latencies metric is not useful when you'remonitoring a Dataflow pipeline that is reading aBigtable change stream, because it reflects the streaming requestduration, not the data change record processing latency. High latency in achange stream doesn't mean the requests are being processed slowly; it meansthe connection was open for that long.

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025-12-15 UTC.