Best practices for highly parallel workflows Stay organized with collections Save and categorize content based on your preferences.
This page provides guidance about best practices to follow when building and runningDataflow HPC highly parallel workflows, including how touse external code in your pipelines, how to run the pipeline, and how to manageerror handling.
Note: When discussing native code for Dataflow, this articleconcentrates on the Java SDK for Apache Beam. You can also use the PythonSDK. Many of the discussion points are relevant for both. Example code for thisguide is available in the Apache BeamGitHub repository. For an example that uses thePython SDK, see theUse custom containers with C++ librariestutorial.Include external code in your pipeline
A key differentiator for highly parallel pipelines is that they useC++ code within theDoFn rather than one of the standard Apache Beam SDKlanguages. For Java pipelines, to make it easier to use C++ libraries in the pipeline, it is recommended thatyou use external procedure calls. This section describes the general approachused for running external (C++) code in Java pipelines.
An Apache Beam pipeline definition has several key components:
PCollectionsare immutable collections of homogeneous elements.PTransformsare used to define the transformations to aPCollectionthatgenerates anotherPCollection.- The pipeline is the construct that allows you, through code, to declare theinteractions between
PTransformsandPCollections. The pipeline isrepresented as adirected acyclic graph (DAG).
When you use code from a language that is not one of the standard Apache Beam SDKlanguages, place the code in thePTransform, which is within theDoFn, anduse one of the standard SDK languages to define the pipeline itself. We recommend using the Apache Beam Python SDK to define the pipeline,because the Python SDK has a utility class that makes the use of other codesimpler. You can, however, use the other Apache Beam SDKs.
You can use the code to conduct quick experiments without requiring a fullbuild. For a production system, you typically create your own binaries, whichgives you the freedom to tune the process to your needs.
The following diagram illustrates the two usages of pipeline data:
- Data is used to drive the process.
- Data is acquired during processing and joined to the driver data.
On this page, primary data (from the source) is referred to asdriving data, and secondary data (from the processing phase) is referredto asjoining data.
In a finance use case, the driving data might be a few hundred thousand trades.Each trade needs to be processed in conjunction with market data. In that case,the market data is the joining data. In a media use case, the driving data mightbe images files that require processing but don't need other data sources, andtherefore don't use joining data.
Size considerations for driving data
If the size of the driving data element is in the low-megabyte range,treat it with the normal Apache Beam paradigm of creating aPCollection object fromthe source and sending the object to the Apache Beam transforms for processing.
If the size of the driving data element is in the high megabytes or in thegigabytes, as is typical for media, you can put the driving data intoCloud Storage. Then, in the startingPCollection object, reference thestorage URI, and only a URI reference to that data used.
Size considerations for joining data
If the joining data is a few hundred megabytes or less, use aside input to get this data to the Apache Beam transforms. The side input sends the data packet toevery worker that needs it.
If the joining data is in the gigabyte or terabyte range, use eitherBigtable or Cloud Storage to merge the joining data to thedriving data, depending on the nature of the data. Bigtable isideal for finance scenarios where market data is often accessed as key-valuelookups from Bigtable. For more information about designing yourBigtable schema, including recommendations for working withtime-series data, see the following Bigtable documentation:
Run the external code
You can run external code in Apache Beam in many ways.
Create a process that's called from a
DoFnobject inside aDataflow transform.UseJNI with the Java SDK.
Note: JNI provides a method for passing data into and out of the externallibrary. Because JNI is very efficient, this approach is ideal if the C++library is well established and stable. However, if the C++ code is prone tocrashing, C++ code can generatesegfaults.Trying to catch these errors is problematic, because any failure can also takedown the calling process, resulting in a Dataflow retry of thatbundle of work.Create a subprocess directly from the
DoFnobject. Although this approach isnot the most efficient, it's robust and simple to implement. Because of thepotential issues with using JNI, this page demonstrates using a subprocess call.
As you design your workflow, consider the complete end-to-end pipeline. Anyinefficiencies in the way the process is run are offset by the fact thatthe data movement from the source all the way to the sink is accomplished with asingle pipeline. If you compare this approach to others, look atthe end-to-end times of the pipeline as well as end-to-end costs.
Pull the binaries into the hosts
When you use a native Apache Beam language, the Apache Beam SDKautomatically moves all required code to the workers.However, when you make a call to external code, you need to move the codemanually.
To move the code, do the following. The example demonstrates the steps for theApache Beam Java SDK.
- Store the compiled external code, along with versioning information,in Cloud Storage.
- In the
@Setupmethod, create a synchronized block to check whether the code file isavailable on the local resource. Rather than implementing a physical check,you can confirm availability using a static variable when the first threadfinishes. - If the file isn't available, use the Cloud Storage client library topull the file from the Cloud Storage bucket to the local worker. Arecommended approach is to use the Apache Beam
FileSystemsclass for this task. - After the file is moved, confirm that the execute bit is set on the codefile.
- In a production system, check the hash of the binaries to ensure that thefile has been copied correctly.
Using the Apache BeamfilesToStage function is also an option, but it removes some of the advantages of therunner's ability to automatically package and move your Java code. In addition,because the call to the subprocess needs an absolute file location, you need touse code to determine the class path and therefore the location of the filemoved byfilesToStage. We do not recommend this approach.
Run the external binaries
Before you can run external code, you need to build a wrapper for it. Writethis wrapper in the same language as the external code (for example, C++) or asa shell script. The wrapper lets you pass file handles and implementoptimizations as described in theDesign processing for small CPU cycles section on this page.Your wrapper does not need to be sophisticated. The following snippet shows anoutline of a wrapper in C++.
intmain(intargc,char*argv[]){if(argc <3){std::cerr <<"Required return file and data to process" <<'\n';return1;}std::stringreturnFile=argv[1];std::stringword=argv[2];std::ofstreammyfile;myfile.open(returnFile);myfile <<word;myfile.close();return0;}This code reads two parameters from the argument list. The first parameter isthe location of the return file where the data is pushed. The secondparameter is the data that the code echos to the user. In real-worldimplementations, this code would do more than echo "Hello, world"!
After you write the wrapper code, run the external code by doing thefollowing:
- Transmit the data to the external code binaries.
- Run the binaries, catch any errors, and log errors and results.
- Handle the logging information.
- Capture data from the completed processing.
Transmit the data to the binaries
To start the process of running the library, transmit data to the C++ code.This step is where you can take advantage of Dataflow integration withother Google Cloud Platform tools. A tool like Bigtable can deal with very largedatasets and handle low-latency and high-concurrency access, which allowsthousands of cores to simultaneously access the dataset. In addition,Bigtable can preprocess data, allowing data shaping, enrichment,and filtering. All of this work can be done in Apache Beamtransforms before you run the external code.
For a production system, the recommended path is to use aprotocol buffer to encapsulate the input data. You can convert the input data to bytes andbase64 encode it before passing it to the external library. The twoways to pass this data to the external library are as follows:
- Small input data. For small data that doesn't exceed the system'smaximum length for a command argument, pass the argument in position 2 ofthe process being built with
java.lang.ProcessBuilder. - Large input data. For larger data sizes, create a file whose name includes aUUID in order to contain the data required by the process.
Run the C++ code, catching errors, and logging
Capturing and handling error information is a critical part of your pipeline.The resources used by the Dataflow runner are ephemeral, and it'soften difficult to inspect worker log files. You must make sure that you captureand push all useful information to Dataflow runner logging, andthat you store the logging data in one or more Cloud Storage buckets.
The recommended approach is to redirectstdout andstderr to files, whichallows you to avoid any out-of-memory considerations. For example, in theDataflow runner that calls the C++ code, you could include lineslike the following:
Java
importjava.lang.ProcessBuilder.Redirect;...processbuilder.redirectError(Redirect.appendTo(errfile));processbuilder.redirectOutput(Redirect.appendTo(outFile));Python
# Requires Apache Beam 2.34 or later.stopping_times,bad_values=(integers|beam.Map(collatz.total_stopping_time).with_exception_handling(use_subprocess=True))# Write the bad values to a side channel.bad_values|'WriteBadValues' >>beam.io.WriteToText(os.path.splitext(output_path)[0]+'-bad.txt')Handle logging information
Many use cases involve processing millions of elements. Successful processinggenerates logs with little or no value, so you must make a business decisionabout retaining the log data. For example, consider these alternatives toretaining all log data:
- If information contained in logs from successful element processingisn't valuable, don't keep it.
- Create logic that samples the log data, such as sampling only every10,000 log entries. If the processing is homogenous, such as when manyiterations of the code generate essentially identical log data, thisapproach provides an effective balance between retaining log data andoptimizing processing.
For failure conditions, the amount of data dumped to logs might be large. Aneffective strategy for handling large quantities of error log data is to readthe first few lines of the log entry and push just those lines toCloud Logging. You can load the rest of the log file into Cloud Storage buckets.This approach allows you to look at the first lines of the error logs later andthen, if needed, refer to Cloud Storage for the whole file.
Checking the size of the log file is also useful. If the file size is zero,you can safely ignore it or record a simple log message that the file had nodata.
Capture data from completed processing
It's not recommended that you usestdout to pass the result of the computationback to theDoFn function. Other code that your C++ code calls, and even yourown code, might send messages tostdout as well, polluting thestdoutputstream that otherwise contains logging data. Instead, it's a better practice tomake a change to the C++ wrapper code to allow the code to accept a parameterindicating where to create the file that stores the value. Ideally, thisfile should be stored in a language-neutral way usingprotocol buffers,which allows the C++ code to pass an object back to the Java or Python code. TheDoFnobject can read the result directly from the file and pass the resultinformation on to its ownoutput call.
Experience has shown the importance of runningunit tests dealing with theprocess itself. It's important to implement a unit test that runs theprocess independently of the Dataflow pipeline. Debugging thelibrary can be done much more efficiently if it is standalone and doesn't haveto run the whole pipeline.
Design processing for small CPU cycles
Calling a subprocess has overhead. Depending on your workload, you mightneed to do extra work to reduce the ratio between work being done andthe administrative overhead of starting and shutting down the process.
In the media use case, the size of the driving data element might be inthe high megabytes or in the gigabytes. As a result, processing for each dataelement can take many minutes.In that case, the cost of calling the subprocess is insignificant compared tothe overall processing time. The best approach in this situation is to have asingle element start its own process.
However, in other use cases, such as finance, processing requires verysmall units of CPU time (tens of milliseconds). In that case, the overhead ofcalling the subprocess is disproportionately large. A solution to thisissue is to make use of Apache Beam'sGroupByKey transform to create batches of between 50 and 100 elements to be fed into theprocess. For example, you can follow these steps:
- In a
DoFnfunction, create a key-value pair. If you are processing financial trades,you can use the trade number as the key. If you don't have a uniquenumber to use as the key, you can generate a checksum from the data and usea modulo function to create partitions of 50 elements. - Send the key to a
GroupByKey.createfunction, which returns aKV<key,Iterable<data>>collection that containsthe 50 elements that you can then send to the process.
Limit worker parallelism
When you work with a language that's natively supported inthe Dataflow runner, you never need to think about what'shappening to the worker. Dataflow has many processes that overseeflow control and threads in batch or stream mode.
However, if you're using an external language like C++, be aware thatyou're doing something a little out of the ordinary by starting subprocesses. Inbatch mode, the Dataflow runner uses a small ratio of workingthreads to CPUs compared to streaming mode. It's recommended, especially instreaming mode, that you create a semaphore within your class to more directlycontrol an individual worker's parallelism.
For example, with media processing, you might not want hundreds of transcodingelements to be processed in parallel by a single worker. In cases like those,you can create a utility class that provides permits to theDoFn function forthe work being carried out. Using this class allows you to take direct controlof the worker threads within your pipeline.
Use high-capacity data sinks in Google Cloud Platform
After the data has been processed, it's sent to a data sink. The sink needs tobe able to handle the volume of results that are created by your grid processingsolution.
The following diagram shows some of the sinks available in Google Cloud Platform whenDataflow is running a grid workload.
Bigtable, BigQuery, and Pub/Subcan all deal with very large streams of data. For example, each Bigtablenode can handle 10,000 inserts per second of up to 1K in size with easyhorizontal scalability. As a result, a 100-node Bigtable clustercan absorb 1,000,000 messages per second generated by theDataflow grid.
Manage segfaults
When you use C++ code within a pipeline, you need to decide how to managesegfaults, because they have non-local ramifications if not dealt with correctly.The Dataflow runner creates processes as needed in Java, Python,or Go, and then assigns work to the processes in the form of bundles.
If the call to the C++ code is done using tightly coupled tools, such asJNI orCython, and the C++ process segfaults, the calling process and Java VirtualMachine (JVM) also crash. In this scenario, bad data points aren't catchable. To make bad data pointscatchable, use a looser coupling, which branches away bad dataand allows the pipeline to continue. However, with mature C++ code that isfully tested against all data variations, you can use mechanisms like Cython.
What's next
Follow thetutorialto create a pipeline that uses custom containers with C++ libraries.
View the example code for this page in the Apache BeamGitHub repository.
Learn more aboutbuilding pipelines with Apache Beam.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.