Use custom containers with C++ libraries

In this tutorial, you create a pipeline that usescustom containers with C++ libraries to run a Dataflow HPChighly parallel workflow. Use this tutorial to learn how touse Dataflow and Apache Beam to run grid computing applicationsthat require data to be distributed to functions running on many cores.

The tutorial demonstrates how to run the pipeline first by using theDirect Runner and then by using theDataflow Runner.By running the pipeline locally, you can test the pipeline before deploying it.

This example usesCython bindings andfunctions from theGMP library.Regardless of the library or binding tool that you use, you can apply the sameprinciples to your pipeline.

The example code isavailable on GitHub.

Objectives

  • Create a pipeline that uses custom containers with C++ libraries.

  • Build a Docker container image using a Dockerfile.

  • Package the code and dependencies into a Docker container.

  • Run the pipeline locally to test it.

  • Run the pipeline in a distributed environment.

Costs

In this document, you use the following billable components of Google Cloud Platform:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

To generate a cost estimate based on your projected usage, use thepricing calculator.

New Google Cloud users might be eligible for afree trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, seeClean up.

Before you begin

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenablecompute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloudauthapplication-defaultlogin

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles:roles/iam.serviceAccountUser

    gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenablecompute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloudauthapplication-defaultlogin

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles:roles/iam.serviceAccountUser

    gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  18. Create a user-managed worker service account for your new pipeline and grant the necessary roles to the service account.

    1. To create the service account, run thegcloud iam service-accounts create command:

      gcloudiamservice-accountscreateparallelpipeline\--description="Highly parallel pipeline worker service account"\--display-name="Highly parallel data pipeline access"
    2. Grant roles to the service account. Run the following command once for each of the following IAM roles:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com"--role=SERVICE_ACCOUNT_ROLE

      ReplaceSERVICE_ACCOUNT_ROLE with each individual role.

    3. Grant your Google Account a role that lets you create access tokens for the service account:

      gcloudiamservice-accountsadd-iam-policy-bindingparallelpipeline@PROJECT_ID.iam.gserviceaccount.com--member="user:EMAIL_ADDRESS"--role=roles/iam.serviceAccountTokenCreator

Download the code sample and change directories

Download the code sample and then change directories.The code samples in the GitHub repository provide all the code that you need torun this pipeline. When you are ready to build your own pipeline, you can usethis sample code as a template.

Clone thebeam-cpp-example repository.

  1. Use thegit clone command to clone the GitHub repository:

    gitclonehttps://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
  2. Switch to the application directory:

    cddataflow-sample-applications/beam-cpp-example

Pipeline code

You can customize the pipeline code from this tutorial. This pipeline completesthe following tasks:

  • Dynamically produces all integers in an input range.
  • Runs the integers through a C++ function and filters bad values.
  • Writes the bad values to a side channel.
  • Counts the occurrence of each stopping time and normalizes the results.
  • Prints the output, formatting and writing the results to a text file.
  • Creates aPCollection with a single element.
  • Processes the single element with amap function and passes the frequencyPCollection as a side input.
  • Processes thePCollection and produces a single output.

The starter file looks like the following:

## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements.  See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License.  You may obtain a copy of the License at##    http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importargparseimportloggingimportosimportsysdefrun(argv):# Import here to avoid __main__ session pickling issues.importioimportitertoolsimportmatplotlib.pyplotaspltimportcollatzimportapache_beamasbeamfromapache_beam.ioimportrestriction_trackersfromapache_beam.options.pipeline_optionsimportPipelineOptionsclassRangeSdf(beam.DoFn,beam.RestrictionProvider):"""An SDF producing all the integers in the input range.    This is preferable to beam.Create(range(...)) as it produces the integers    dynamically rather than materializing them up front.  It is an SDF to do    so with perfect dynamic sharding.    """definitial_restriction(self,desired_range):start,stop=desired_rangereturnrestriction_trackers.OffsetRange(start,stop)defrestriction_size(self,_,restriction):returnrestriction.size()defcreate_tracker(self,restriction):returnrestriction_trackers.OffsetRestrictionTracker(restriction)defprocess(self,_,active_range=beam.DoFn.RestrictionParam()):foriinitertools.count(active_range.current_restriction().start):ifactive_range.try_claim(i):yieldielse:breakclassGenerateIntegers(beam.PTransform):def__init__(self,start,stop):self._start=startself._stop=stopdefexpand(self,p):return(p|beam.Create([(self._start,self._stop+1)])|beam.ParDo(RangeSdf()))parser=argparse.ArgumentParser()parser.add_argument('--start',dest='start',type=int,default=1)parser.add_argument('--stop',dest='stop',type=int,default=10000)parser.add_argument('--output',default='./out.png')known_args,pipeline_args=parser.parse_known_args(argv)# Store this as a local to avoid capturing the full known_args.output_path=known_args.outputwithbeam.Pipeline(options=PipelineOptions(pipeline_args))asp:# Generate the integers from start to stop (inclusive).integers=p|GenerateIntegers(known_args.start,known_args.stop)# Run them through our C++ function, filtering bad records.# Requires apache beam 2.34 or later.stopping_times,bad_values=(integers|beam.Map(collatz.total_stopping_time).with_exception_handling(use_subprocess=True))# Write the bad values to a side channel.bad_values|'WriteBadValues' >>beam.io.WriteToText(os.path.splitext(output_path)[0]+'-bad.txt')# Count the occurrence of each stopping time and normalize.total=known_args.stop-known_args.start+1frequencies=(stopping_times|'Aggregate' >>(beam.Map(lambdax:(x,1))|beam.CombinePerKey(sum))|'Normalize' >>beam.MapTuple(lambdax,count:(x,count/total)))ifknown_args.stop <=10:# Print out the results for debugging.frequencies|beam.Map(print)else:# Format and write them to a text file.(frequencies|'Format' >>beam.MapTuple(lambdacount,freq:f'{count},{freq}')|beam.io.WriteToText(os.path.splitext(output_path)[0]+'.txt'))# Define some helper functions.defmake_scatter_plot(xy):x,y=zip(*xy)plt.plot(x,y,'.')png_bytes=io.BytesIO()plt.savefig(png_bytes,format='png')png_bytes.seek(0)returnpng_bytes.read()defwrite_to_path(path,content):"""Most Beam IOs write multiple elements to some kind of a container      file (e.g. strings to lines of a text file, avro records to an avro file,      etc.)  This function writes each element to its own file, given by path.      """# Write to a temporary path and to a rename for fault tolerence.tmp_path=path+'.tmp'fs=beam.io.filesystems.FileSystems.get_filesystem(path)withfs.create(tmp_path)asfout:fout.write(content)fs.rename([tmp_path],[path])(p# Create a PCollection with a single element.|'CreateSingleton' >>beam.Create([None])# Process the single element with a Map function, passing the frequency# PCollection as a side input.# This will cause the normally distributed frequency PCollection to be# colocated and processed as a single unit, producing a single output.|'MakePlot' >>beam.Map(lambda_,data:make_scatter_plot(data),data=beam.pvalue.AsList(frequencies))# Pair this with the desired filename.|'PairWithFilename' >>beam.Map(lambdacontent:(output_path,content))# And actually write it out, using MapTuple to split the tuple into args.|'WriteToOutput' >>beam.MapTuple(write_to_path))if__name__=='__main__':logging.getLogger().setLevel(logging.INFO)run(sys.argv)

Set up your development environment

  1. Use theApache Beam SDK for Python.

  2. Install the GMP library:

    apt-getinstalllibgmp3-dev
  3. To install the dependencies, use therequirements.txt file.

    pipinstall-rrequirements.txt
  4. To build the Python bindings, run the following command.

    pythonsetup.pybuild_ext--inplace

You can customize therequirements.txt file from this tutorial. The starter fileincludes the following dependencies:

##    Licensed to the Apache Software Foundation (ASF) under one or more#    contributor license agreements.  See the NOTICE file distributed with#    this work for additional information regarding copyright ownership.#    The ASF licenses this file to You under the Apache License, Version 2.0#    (the "License"); you may not use this file except in compliance with#    the License.  You may obtain a copy of the License at##       http://www.apache.org/licenses/LICENSE-2.0##    Unless required by applicable law or agreed to in writing, software#    distributed under the License is distributed on an "AS IS" BASIS,#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#    See the License for the specific language governing permissions and#    limitations under the License.#apache-beam[gcp]==2.46.0cython==0.29.24pyparsing==2.4.2matplotlib==3.4.3

Run the pipeline locally

Running the pipeline locally is useful for testing.By running the pipeline locally, you can confirm that the pipeline runs andbehaves as expected before you deploy the pipeline to a distributed environment.

You can run the pipeline locally by using the following command.This command outputs an image namedout.png.

pythonpipeline.py

Create the Google Cloud Platform resources

This section explains how to create the following resources:

  • A Cloud Storage bucket to use as a temporary storage location andan output location.
  • A Docker container to package the pipeline code and dependencies.

Create a Cloud Storage bucket

Begin by creating a Cloud Storage bucket usingGoogle Cloud CLI. This bucket is used as a temporary storage location by theDataflow pipeline.

To create the bucket, use thegcloud storage buckets create command:

gcloudstoragebucketscreategs://BUCKET_NAME--location=LOCATION

Replace the following:

  • BUCKET_NAME: a name for your Cloud Storagebucket that meets thebucket naming requirements.Cloud Storage bucket names must be globally unique.
  • LOCATION: thelocation for the bucket.

Create and build a container image

You can customize the Dockerfile from this tutorial.The starter file looks like the following:

Note: You might need to update the Python version in the Dockerfile whenyou run the pipeline in a distributed environment. When you use a customcontainer image, the Python interpreter minor version in your image must matchthe version used at pipeline construction time.
FROMapache/beam_python3.9_sdk:2.46.0# Install a C++ library.RUNapt-getupdateRUNapt-getinstall-ylibgmp3-dev# Install Python dependencies.COPYrequirements.txtrequirements.txtRUNpipinstall-rrequirements.txt# Install the code and some python bindings.COPYpipeline.pypipeline.pyCOPYcollatz.pyxcollatz.pyxCOPYsetup.pysetup.pyRUNpythonsetup.pyinstall

This Dockerfile contains theFROM,COPY,andRUN commands, which you can read about in theDockerfile reference.

  1. To upload artifacts, create an Artifact Registry repository. Eachrepository can contain artifacts for a single supported format.

    All repository content is encrypted using either Google-owned and Google-managed encryption keys orcustomer-managed encryption keys. Artifact Registry usesGoogle-owned and Google-managed encryption keys by default and no configuration is requiredfor this option.

    You must have at leastArtifact Registry Writer access to the repository.

    Run the following command to create a new repository. The command uses the--async flag and returns immediately, without waiting for the operation inprogress to complete.

    gcloudartifactsrepositoriescreateREPOSITORY\--repository-format=docker\--location=LOCATION\--async

    ReplaceREPOSITORY with a name for your repository. For each repository location in a project, repository names must be unique.

  2. Create the Dockerfile.

    For packages to be part of the Apache Beam container, you must specify them aspart of therequirements.txt file. Ensure that you don'tspecifyapache-beam as part of therequirements.txtfile. The Apache Beam container already hasapache-beam.

    Note: Dependencies installed with theDockerfile are only available when launching the pipeline. Job dependenciesmust be included in the requirements file. As in the example, Apache Beam doesnot need to be in this file, and it is encouraged to keep it separate forfaster launches.
  3. Before you can push or pull images, configure Docker to authenticaterequests for Artifact Registry. To set up authentication to Docker repositories, runthe following command:

    gcloudauthconfigure-dockerLOCATION-docker.pkg.dev

    The command updates your Docker configuration. You can now connect withArtifact Registry in your Google Cloud project to push images.

  4. Build theDocker image using yourDockerfilewith Cloud Build.

    Update path in the following command to match the Dockerfile thatyou created. This command builds the file and pushes it to your Artifact Registry repository.

    gcloudbuildssubmit--tagLOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest.

Package the code and dependencies in a Docker container

  1. To run this pipeline in a distributed environment, package the code and dependenciesinto a docker container.

    dockerbuild.-tcpp_beam_container
  2. After you package the code and dependencies, you can run the pipeline locallyto test it.

    pythonpipeline.py\--runner=PortableRunner\--job_endpoint=embed\--environment_type=DOCKER\--environment_config="docker.io/library/cpp_beam_container"

    This command writes the output inside the Docker image. To view the output,run the pipeline with the--output, and write the output to a Cloud Storagebucket. For example, run the following command.

    pythonpipeline.py\--runner=PortableRunner\--job_endpoint=embed\--environment_type=DOCKER\--environment_config="docker.io/library/cpp_beam_container"\--output=gs://BUCKET_NAME/out.png

Run the pipeline

You can now run the Apache Beam pipeline in Dataflow byreferring to the file with the pipeline code and passing theparametersrequired by the pipeline.

In your shell or terminal, run the pipeline with the Dataflow Runner.

pythonpipeline.py\--runner=DataflowRunner\--project=PROJECT_ID\--region=REGION\--temp_location=gs://BUCKET_NAME/tmp\--sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest"\--experiment=use_runner_v2\--output=gs://BUCKET_NAME/out.png

After you execute the command to run the pipeline, theDataflow returns a Job ID with the job statusQueued. It mighttake several minutes before the job status reachesRunning and you can accessthejob graph.

View your results

View data written to your Cloud Storage bucket. Use thegcloud storage ls commandto list the contents at the top level of your bucket:

gcloudstoragelsgs://BUCKET_NAME

If successful, the command returns a message similar to:

gs://BUCKET_NAME/out.png

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

The easiest way to eliminate billing is to delete the Google Cloud project that you created for the tutorial.

    Caution: Deleting a project has the following effects:
    • Everything in the project is deleted. If you used an existing project for the tasks in this document, when you delete it, you also delete any other work you've done in the project.
    • Custom project IDs are lost. When you created this project, you might have created a custom project ID that you want to use in the future. To preserve the URLs that use the project ID, such as anappspot.com URL, delete selected resources inside the project instead of deleting the whole project.

    If you plan to explore multiple architectures, tutorials, or quickstarts, reusing projects can help you avoid exceeding project quota limits.

    Delete a Google Cloud project:

    gcloud projects deletePROJECT_ID

Delete the individual resources

If you want to reuse the project, then delete the resources that you createdfor the tutorial.

Clean up Google Cloud Platform project resources

  1. Delete the Artifact Registry repository.

    gcloudartifactsrepositoriesdeleteREPOSITORY\--location=LOCATION--async
  2. Delete the Cloud Storage bucket and its objects. This bucket alonedoes not incur any charges.

    gcloudstoragermgs://BUCKET_NAME--recursive

Revoke credentials

  1. Revoke the roles that you granted to the user-managed worker service account. Run the following command once for each of the following IAM roles:

    gcloudprojectsremove-iam-policy-bindingPROJECT_ID\--member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com\--role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloudauthapplication-defaultrevoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloudauthrevoke

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.