Process a Bigtable change stream

This tutorial shows how to deploy a data pipeline to Dataflow for areal-time stream of database changes sourced from a Bigtable table'schange stream. The output of the pipeline is written to a series of files onCloud Storage.

An example dataset for a music listening application is provided. In thistutorial, you track songs that are listened to and then rank the top five over aperiod.

This tutorial is intended for technical users familiar with writing code anddeploying data pipelines to Google Cloud.

Objectives

This tutorial shows you how to do the following:

  • Create a Bigtable table with a change stream enabled.
  • Deploy a pipeline on Dataflow that transforms and outputs thechange stream.
  • View the results of your data pipeline.

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use thepricing calculator.

New Google Cloud users might be eligible for afree trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, seeClean up.

Before you begin

    Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.

    Install the Google Cloud CLI. After installation,initialize the Google Cloud CLI by running the following command:

    gcloudinit

    If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

    Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

    Verify that billing is enabled for your Google Cloud project.

    Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenabledataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com

    Install the Google Cloud CLI. After installation,initialize the Google Cloud CLI by running the following command:

    gcloudinit

    If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

    Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

    Verify that billing is enabled for your Google Cloud project.

    Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenabledataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  1. Update and install thecbt CLI.
    gcloudcomponentsupdategcloudcomponentsinstallcbt

Prepare the environment

Get the code

Clone the repository that contains the sample code. If you already previously downloaded thisrepository, pull to get the latest version.

gitclonehttps://github.com/GoogleCloudPlatform/java-docs-samples.gitcdjava-docs-samples/bigtable/beam/change-streams

Create a bucket

  • Create a Cloud Storage bucket:
    gcloudstoragebucketscreategs://BUCKET_NAME
    ReplaceBUCKET_NAME with a bucket name that meets thebucket naming requirements.
  • Create a Bigtable instance

    You can use an existing instance for this tutorial orcreate an instancewith the default configurations in a region near you.

    Create a table

    The sample application tracks the songs that users listen to and stores thelisten events in Bigtable. Create a table with a change streamenabled that has one column family (cf) and one column (song) and uses user IDsfor row keys.

    Create the table.

    gcloudbigtableinstancestablescreatesong-rank\--column-families=cf--change-stream-retention-period=7d\--instance=BIGTABLE_INSTANCE_ID--project=PROJECT_ID

    Replace the following:

    • PROJECT_ID: the ID of the project that you are using
    • BIGTABLE_INSTANCE_ID: the ID of the instance to contain the new table

    Start the pipeline

    This pipeline transforms the change stream by doing the following:

    1. Reads the change stream
    2. Gets the song name
    3. Groups the song listen events into N-second windows
    4. Counts the top five songs
    5. Outputs the results

    Run the pipeline.

    mvncompileexec:java-Dexec.mainClass=SongRank\"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \--outputLocation=gs://BUCKET_NAME/ \--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"

    ReplaceBIGTABLE_REGION with the ID of the region that your Bigtable instance is in, such asus-east5.

    Understand the pipeline

    The following snippets of code from the pipeline can help you understand thecode you are running.

    Reading the change stream

    The code in this sample configures the source stream with the parameters for thespecific Bigtable instance and table.

    p.apply("Stream from Bigtable",BigtableIO.readChangeStream().withProjectId(options.getBigtableProjectId()).withInstanceId(options.getBigtableInstanceId()).withTableId(options.getBigtableTableId()).withAppProfileId(options.getBigtableAppProfile()))

    Getting the song name

    When a song is listened to, the song name is written to the column familycfand column qualifiersong, so the code extracts the value from the changestream mutation and outputs it to the next step of the pipeline.

    privatestaticclassExtractSongNameextendsDoFn<KV<ByteString,ChangeStreamMutation>,String>{@DoFn.ProcessElementpublicvoidprocessElement(ProcessContextc){for(Entrye:Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue()).getEntries()){if(einstanceofSetCell){SetCellsetCell=(SetCell)e;if("cf".equals(setCell.getFamilyName())            &&"song".equals(setCell.getQualifier().toStringUtf8())){c.output(setCell.getValue().toStringUtf8());}}}}}

    Counting the top five songs

    You can use the built-in Beam functionsCount andTop.of to get the top fivesongs in the current window.

    .apply(Count.perElement()).apply("Top songs",Top.of(5,newSongComparator()).withoutDefaults())

    Outputting the results

    This pipeline writes the results to standard out as well as files. For thefiles, it windows the writes into groups of 10 elements or one-minute segments.

    .apply("Print",ParDo.of(newPrintFn())).apply("Collect at least 10 elements or 1 minute of elements",Window.<String>into(newGlobalWindows()).triggering(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10),AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))).discardingFiredPanes()).apply("Output top songs",TextIO.write().to(options.getOutputLocation()+"song-charts/").withSuffix(".txt").withNumShards(1).withWindowedWrites());

    View the pipeline

    1. In the Google Cloud console, go to theDataflow page.

      Go to Dataflow

    2. Click the job with a name that begins withsong-rank.

    3. At the bottom of the screen clickShow to open the logs panel.

    4. ClickWorker logs to monitor the output logs of the change stream.

      Note: The log might take a few minutes to appear while the job initializes.

    Stream writes

    Use thecbt CLI to write a number of song listens for various users tothesong-rank table. This is designed to write over a few minutes to simulatesong listens streaming in over time.

    cbt-instance=BIGTABLE_INSTANCE_ID-project=PROJECT_IDimport\song-ranksong-rank-data.csvcolumn-family=cfbatch-size=1

    View the output

    Read the output on Cloud Storage to see the most popular songs.

    gcloudstoragecatgs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    Note: The file might take a few minutes to appear while the pipeline processesthe data. View the Dataflow worker logs to monitor the status.

    Example output:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]

    Clean up

    To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

    Delete the project

      Caution: Deleting a project has the following effects:
      • Everything in the project is deleted. If you used an existing project for the tasks in this document, when you delete it, you also delete any other work you've done in the project.
      • Custom project IDs are lost. When you created this project, you might have created a custom project ID that you want to use in the future. To preserve the URLs that use the project ID, such as anappspot.com URL, delete selected resources inside the project instead of deleting the whole project.

      If you plan to explore multiple architectures, tutorials, or quickstarts, reusing projects can help you avoid exceeding project quota limits.

      Delete a Google Cloud project:

      gcloud projects deletePROJECT_ID

    Delete individual resources

    1. Delete the bucket and files.

      gcloudstoragerm--recursivegs://BUCKET_NAME/
    2. Disable the change stream on the table.

      gcloudbigtableinstancestablesupdatesong-rank--instance=BIGTABLE_INSTANCE_ID\--clear-change-stream-retention-period
    3. Delete the tablesong-rank.

      cbt-instance=BIGTABLE_INSTANCE_ID-project=PROJECT_IDdeletetablesong-rank
    4. Stop the change stream pipeline.

      1. List the jobs to get the job ID.

        gclouddataflowjobslist--region=BIGTABLE_REGION
      2. Cancel the job.

        gclouddataflowjobscancelJOB_ID--region=BIGTABLE_REGION

        ReplaceJOB_ID with the job ID displayed after the prior command.

    What's next

    Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

    Last updated 2026-02-19 UTC.