Use custom containers with C++ libraries Stay organized with collections Save and categorize content based on your preferences.
In this tutorial, you create a pipeline that usescustom containers with C++ libraries to run a Dataflow HPChighly parallel workflow. Use this tutorial to learn how touse Dataflow and Apache Beam to run grid computing applicationsthat require data to be distributed to functions running on many cores.
The tutorial demonstrates how to run the pipeline first by using theDirect Runner and then by using theDataflow Runner.By running the pipeline locally, you can test the pipeline before deploying it.
This example usesCython bindings andfunctions from theGMP library.Regardless of the library or binding tool that you use, you can apply the sameprinciples to your pipeline.
The example code isavailable on GitHub.
Objectives
Create a pipeline that uses custom containers with C++ libraries.
Build a Docker container image using a Dockerfile.
Package the code and dependencies into a Docker container.
Run the pipeline locally to test it.
Run the pipeline in a distributed environment.
Costs
In this document, you use the following billable components of Google Cloud Platform:
- Artifact Registry
- Cloud Build
- Cloud Storage
- Compute Engine
- Dataflow
To generate a cost estimate based on your projected usage, use thepricing calculator.
When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, seeClean up.
Before you begin
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
Install the Google Cloud CLI.
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
Toinitialize the gcloud CLI, run the following command:
gcloudinit
Create or select a Google Cloud project.
Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project: To create a project, you need the Project Creator role (
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission.Learn how to grant roles.
Create a Google Cloud project:
gcloud projects createPROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating.Select the Google Cloud project that you created:
gcloud config set projectPROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
Verify that billing is enabled for your Google Cloud project.
Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.gcloudservicesenablecompute.googleapis.com
dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com Create local authentication credentials for your user account:
gcloudauthapplication-defaultlogin
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUsergcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
Install the Google Cloud CLI.
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
Toinitialize the gcloud CLI, run the following command:
gcloudinit
Create or select a Google Cloud project.
Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project: To create a project, you need the Project Creator role (
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission.Learn how to grant roles.
Create a Google Cloud project:
gcloud projects createPROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating.Select the Google Cloud project that you created:
gcloud config set projectPROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
Verify that billing is enabled for your Google Cloud project.
Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.gcloudservicesenablecompute.googleapis.com
dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com Create local authentication credentials for your user account:
gcloudauthapplication-defaultlogin
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUsergcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
Create a user-managed worker service account for your new pipeline and grant the necessary roles to the service account.
To create the service account, run the
gcloud iam service-accounts createcommand:gcloudiamservice-accountscreateparallelpipeline\--description="Highly parallel pipeline worker service account"\--display-name="Highly parallel data pipeline access"
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdminroles/artifactregistry.reader
gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com"--role=SERVICE_ACCOUNT_ROLE
Replace
SERVICE_ACCOUNT_ROLEwith each individual role.Grant your Google Account a role that lets you create access tokens for the service account:
gcloudiamservice-accountsadd-iam-policy-bindingparallelpipeline@PROJECT_ID.iam.gserviceaccount.com--member="user:EMAIL_ADDRESS"--role=roles/iam.serviceAccountTokenCreator
Download the code sample and change directories
Download the code sample and then change directories.The code samples in the GitHub repository provide all the code that you need torun this pipeline. When you are ready to build your own pipeline, you can usethis sample code as a template.
Clone thebeam-cpp-example repository.
Use the
git clonecommand to clone the GitHub repository:gitclonehttps://github.com/GoogleCloudPlatform/dataflow-sample-applications.gitSwitch to the application directory:
cddataflow-sample-applications/beam-cpp-example
Pipeline code
You can customize the pipeline code from this tutorial. This pipeline completesthe following tasks:
- Dynamically produces all integers in an input range.
- Runs the integers through a C++ function and filters bad values.
- Writes the bad values to a side channel.
- Counts the occurrence of each stopping time and normalizes the results.
- Prints the output, formatting and writing the results to a text file.
- Creates a
PCollectionwith a single element. - Processes the single element with a
mapfunction and passes the frequencyPCollectionas a side input. - Processes the
PCollectionand produces a single output.
The starter file looks like the following:
## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importargparseimportloggingimportosimportsysdefrun(argv):# Import here to avoid __main__ session pickling issues.importioimportitertoolsimportmatplotlib.pyplotaspltimportcollatzimportapache_beamasbeamfromapache_beam.ioimportrestriction_trackersfromapache_beam.options.pipeline_optionsimportPipelineOptionsclassRangeSdf(beam.DoFn,beam.RestrictionProvider):"""An SDF producing all the integers in the input range. This is preferable to beam.Create(range(...)) as it produces the integers dynamically rather than materializing them up front. It is an SDF to do so with perfect dynamic sharding. """definitial_restriction(self,desired_range):start,stop=desired_rangereturnrestriction_trackers.OffsetRange(start,stop)defrestriction_size(self,_,restriction):returnrestriction.size()defcreate_tracker(self,restriction):returnrestriction_trackers.OffsetRestrictionTracker(restriction)defprocess(self,_,active_range=beam.DoFn.RestrictionParam()):foriinitertools.count(active_range.current_restriction().start):ifactive_range.try_claim(i):yieldielse:breakclassGenerateIntegers(beam.PTransform):def__init__(self,start,stop):self._start=startself._stop=stopdefexpand(self,p):return(p|beam.Create([(self._start,self._stop+1)])|beam.ParDo(RangeSdf()))parser=argparse.ArgumentParser()parser.add_argument('--start',dest='start',type=int,default=1)parser.add_argument('--stop',dest='stop',type=int,default=10000)parser.add_argument('--output',default='./out.png')known_args,pipeline_args=parser.parse_known_args(argv)# Store this as a local to avoid capturing the full known_args.output_path=known_args.outputwithbeam.Pipeline(options=PipelineOptions(pipeline_args))asp:# Generate the integers from start to stop (inclusive).integers=p|GenerateIntegers(known_args.start,known_args.stop)# Run them through our C++ function, filtering bad records.# Requires apache beam 2.34 or later.stopping_times,bad_values=(integers|beam.Map(collatz.total_stopping_time).with_exception_handling(use_subprocess=True))# Write the bad values to a side channel.bad_values|'WriteBadValues' >>beam.io.WriteToText(os.path.splitext(output_path)[0]+'-bad.txt')# Count the occurrence of each stopping time and normalize.total=known_args.stop-known_args.start+1frequencies=(stopping_times|'Aggregate' >>(beam.Map(lambdax:(x,1))|beam.CombinePerKey(sum))|'Normalize' >>beam.MapTuple(lambdax,count:(x,count/total)))ifknown_args.stop <=10:# Print out the results for debugging.frequencies|beam.Map(print)else:# Format and write them to a text file.(frequencies|'Format' >>beam.MapTuple(lambdacount,freq:f'{count},{freq}')|beam.io.WriteToText(os.path.splitext(output_path)[0]+'.txt'))# Define some helper functions.defmake_scatter_plot(xy):x,y=zip(*xy)plt.plot(x,y,'.')png_bytes=io.BytesIO()plt.savefig(png_bytes,format='png')png_bytes.seek(0)returnpng_bytes.read()defwrite_to_path(path,content):"""Most Beam IOs write multiple elements to some kind of a container file (e.g. strings to lines of a text file, avro records to an avro file, etc.) This function writes each element to its own file, given by path. """# Write to a temporary path and to a rename for fault tolerence.tmp_path=path+'.tmp'fs=beam.io.filesystems.FileSystems.get_filesystem(path)withfs.create(tmp_path)asfout:fout.write(content)fs.rename([tmp_path],[path])(p# Create a PCollection with a single element.|'CreateSingleton' >>beam.Create([None])# Process the single element with a Map function, passing the frequency# PCollection as a side input.# This will cause the normally distributed frequency PCollection to be# colocated and processed as a single unit, producing a single output.|'MakePlot' >>beam.Map(lambda_,data:make_scatter_plot(data),data=beam.pvalue.AsList(frequencies))# Pair this with the desired filename.|'PairWithFilename' >>beam.Map(lambdacontent:(output_path,content))# And actually write it out, using MapTuple to split the tuple into args.|'WriteToOutput' >>beam.MapTuple(write_to_path))if__name__=='__main__':logging.getLogger().setLevel(logging.INFO)run(sys.argv)Set up your development environment
Use theApache Beam SDK for Python.
Install the GMP library:
apt-getinstalllibgmp3-devTo install the dependencies, use the
requirements.txtfile.pipinstall-rrequirements.txtTo build the Python bindings, run the following command.
pythonsetup.pybuild_ext--inplace
You can customize therequirements.txt file from this tutorial. The starter fileincludes the following dependencies:
## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#apache-beam[gcp]==2.46.0cython==0.29.24pyparsing==2.4.2matplotlib==3.4.3Run the pipeline locally
Running the pipeline locally is useful for testing.By running the pipeline locally, you can confirm that the pipeline runs andbehaves as expected before you deploy the pipeline to a distributed environment.
You can run the pipeline locally by using the following command.This command outputs an image namedout.png.
pythonpipeline.pyCreate the Google Cloud Platform resources
This section explains how to create the following resources:
- A Cloud Storage bucket to use as a temporary storage location andan output location.
- A Docker container to package the pipeline code and dependencies.
Create a Cloud Storage bucket
Begin by creating a Cloud Storage bucket usingGoogle Cloud CLI. This bucket is used as a temporary storage location by theDataflow pipeline.
To create the bucket, use thegcloud storage buckets create command:
gcloudstoragebucketscreategs://BUCKET_NAME--location=LOCATIONReplace the following:
- BUCKET_NAME: a name for your Cloud Storagebucket that meets thebucket naming requirements.Cloud Storage bucket names must be globally unique.
- LOCATION: thelocation for the bucket.
Create and build a container image
You can customize the Dockerfile from this tutorial.The starter file looks like the following:
Note: You might need to update the Python version in the Dockerfile whenyou run the pipeline in a distributed environment. When you use a customcontainer image, the Python interpreter minor version in your image must matchthe version used at pipeline construction time.FROMapache/beam_python3.9_sdk:2.46.0# Install a C++ library.RUNapt-getupdateRUNapt-getinstall-ylibgmp3-dev# Install Python dependencies.COPYrequirements.txtrequirements.txtRUNpipinstall-rrequirements.txt# Install the code and some python bindings.COPYpipeline.pypipeline.pyCOPYcollatz.pyxcollatz.pyxCOPYsetup.pysetup.pyRUNpythonsetup.pyinstallThis Dockerfile contains theFROM,COPY,andRUN commands, which you can read about in theDockerfile reference.
To upload artifacts, create an Artifact Registry repository. Eachrepository can contain artifacts for a single supported format.
All repository content is encrypted using either Google-owned and Google-managed encryption keys orcustomer-managed encryption keys. Artifact Registry usesGoogle-owned and Google-managed encryption keys by default and no configuration is requiredfor this option.
You must have at leastArtifact Registry Writer access to the repository.
Run the following command to create a new repository. The command uses the
--asyncflag and returns immediately, without waiting for the operation inprogress to complete.gcloudartifactsrepositoriescreateREPOSITORY\--repository-format=docker\--location=LOCATION\--asyncReplace
REPOSITORYwith a name for your repository. For each repository location in a project, repository names must be unique.Create the Dockerfile.
For packages to be part of the Apache Beam container, you must specify them aspart of the
Note: Dependencies installed with theDockerfile are only available when launching the pipeline. Job dependenciesmust be included in the requirements file. As in the example, Apache Beam doesnot need to be in this file, and it is encouraged to keep it separate forfaster launches.requirements.txtfile. Ensure that you don'tspecifyapache-beamas part of therequirements.txtfile. The Apache Beam container already hasapache-beam.Before you can push or pull images, configure Docker to authenticaterequests for Artifact Registry. To set up authentication to Docker repositories, runthe following command:
gcloudauthconfigure-dockerLOCATION-docker.pkg.devThe command updates your Docker configuration. You can now connect withArtifact Registry in your Google Cloud project to push images.
Build theDocker image using your
Dockerfilewith Cloud Build.Update path in the following command to match the Dockerfile thatyou created. This command builds the file and pushes it to your Artifact Registry repository.
gcloudbuildssubmit--tagLOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest.
Package the code and dependencies in a Docker container
To run this pipeline in a distributed environment, package the code and dependenciesinto a docker container.
dockerbuild.-tcpp_beam_containerAfter you package the code and dependencies, you can run the pipeline locallyto test it.
pythonpipeline.py\--runner=PortableRunner\--job_endpoint=embed\--environment_type=DOCKER\--environment_config="docker.io/library/cpp_beam_container"This command writes the output inside the Docker image. To view the output,run the pipeline with the
--output, and write the output to a Cloud Storagebucket. For example, run the following command.pythonpipeline.py\--runner=PortableRunner\--job_endpoint=embed\--environment_type=DOCKER\--environment_config="docker.io/library/cpp_beam_container"\--output=gs://BUCKET_NAME/out.png
Run the pipeline
You can now run the Apache Beam pipeline in Dataflow byreferring to the file with the pipeline code and passing theparametersrequired by the pipeline.
In your shell or terminal, run the pipeline with the Dataflow Runner.
pythonpipeline.py\--runner=DataflowRunner\--project=PROJECT_ID\--region=REGION\--temp_location=gs://BUCKET_NAME/tmp\--sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest"\--experiment=use_runner_v2\--output=gs://BUCKET_NAME/out.pngAfter you execute the command to run the pipeline, theDataflow returns a Job ID with the job statusQueued. It mighttake several minutes before the job status reachesRunning and you can accessthejob graph.
View your results
View data written to your Cloud Storage bucket. Use thegcloud storage ls commandto list the contents at the top level of your bucket:
gcloudstoragelsgs://BUCKET_NAMEIf successful, the command returns a message similar to:
gs://BUCKET_NAME/out.pngClean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Delete the project
The easiest way to eliminate billing is to delete the Google Cloud project that you created for the tutorial.
Delete the individual resources
If you want to reuse the project, then delete the resources that you createdfor the tutorial.
Clean up Google Cloud Platform project resources
Delete the Artifact Registry repository.
gcloudartifactsrepositoriesdeleteREPOSITORY\--location=LOCATION--asyncDelete the Cloud Storage bucket and its objects. This bucket alonedoes not incur any charges.
gcloudstoragermgs://BUCKET_NAME--recursive
Revoke credentials
Revoke the roles that you granted to the user-managed worker service account. Run the following command once for each of the following IAM roles:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdminroles/artifactregistry.reader
gcloudprojectsremove-iam-policy-bindingPROJECT_ID\--member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com\--role=SERVICE_ACCOUNT_ROLE
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloudauthapplication-defaultrevoke
Optional: Revoke credentials from the gcloud CLI.
gcloudauthrevoke
What's next
- Viewthe sample application on GitHub.
- Use custom containers in Dataflow.
- Learn more about usingcontainer environments with Apache Beam.
- Explore reference architectures, diagrams, and best practices about Google Cloud.Take a look at ourCloud Architecture Center.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.