Set Dataflow pipeline options Stay organized with collections Save and categorize content based on your preferences.
This page explains how to setpipeline options for yourDataflow jobs. These pipeline options configure how and where yourpipeline runs and which resources it uses.
Pipeline execution is separate from your Apache Beamprogram's execution. The Apache Beam program that you've written constructsa pipeline for deferred execution. This means that the program generates aseries of steps that any supported Apache Beam runner can execute.Compatible runners include the Dataflow runner onGoogle Cloud and the direct runner that executes the pipeline directly in alocal environment.
You can pass parameters into a Dataflow job at runtime.For additional information about setting pipeline options at runtime, seeConfiguring pipeline options.
Note: If you're creating aclassic templatethat accepts pipeline options,you must use theValueProvider interface in your pipeline code. Formore information, seeUse runtime parameters in your pipeline code.Use pipeline options with Apache Beam SDKs
You can use the following SDKs to set pipeline options for Dataflow jobs:
- Apache Beam SDK for Python
- Apache Beam SDK for Java
- Apache Beam SDK for Go
To use the SDKs, you set the pipeline runner and other execution parameters byusing the Apache Beam SDK classPipelineOptions.
There are two methods for specifying pipeline options:
- Set pipeline options programmatically by supplying a list of pipeline options.
- Set pipeline options directly on the command line when you run your pipeline code.
Set pipeline options programmatically
You can set pipeline options programmatically by creating and modifying aPipelineOptions object.
Java
Construct aPipelineOptions object using the methodPipelineOptionsFactory.fromArgs.
For an example, see theLaunch on Dataflow samplesection on this page.
Python
Create aPipelineOptions object.
For an example, see theLaunch on Dataflow samplesection on this page.
Go
Setting pipeline options programmatically usingPipelineOptions is notsupported in the Apache Beam SDK for Go. Use Go command-line arguments.
For an example, see theLaunch on Dataflow samplesection on this page.
Set pipeline options on the command line
You can set pipeline options by using command-line arguments.
Java
The following example syntax is from theWordCount pipeline in theJava tutorial.
mvn-Pdataflow-runnercompileexec:java\-Dexec.mainClass=org.apache.beam.examples.WordCount\-Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION"Replace the following:
PROJECT_ID: your Google Cloud project IDBUCKET_NAME: the name of your Cloud Storage bucketREGION: aDataflow region,us-central1
Python
The following example syntax is from theWordCount pipeline in thePython tutorial.
python-mapache_beam.examples.wordcount\--regionDATAFLOW_REGION\--inputgs://dataflow-samples/shakespeare/kinglear.txt\--outputgs://STORAGE_BUCKET/results/outputs\--runnerDataflowRunner\--projectPROJECT_ID\--temp_locationgs://STORAGE_BUCKET/tmp/Replace the following:
DATAFLOW_REGION: theregionwhere you want to deploy the Dataflow job—for example,europe-west1The
--regionflag overrides the default region that isset in the metadata server, your local client, or environmentvariables.STORAGE_BUCKET: theCloud Storage bucket namePROJECT_ID: the Google Cloud project ID
Go
The following example syntax is from theWordCount pipeline in theGo tutorial.
gorunwordcount.go--inputgs://dataflow-samples/shakespeare/kinglear.txt\--outputgs://BUCKET_NAME/results/outputs\--runnerdataflow\--projectPROJECT_ID\--regionDATAFLOW_REGION\--staging_locationgs://BUCKET_NAME/binaries/Replace the following:
BUCKET_NAME: the Cloud Storage bucket namePROJECT_ID: the Google Cloud project IDDATAFLOW_REGION: Theregion where youwant to deploy the Dataflow job. For example,europe-west1.The--regionflag overrides the default region that is set in the metadataserver, your local client, or environment variables.
Set experimental pipeline options
In the Java, Python, and Go SDKs, theexperimentspipeline optionenables experimental or pre-GA Dataflow features.
Set programmatically
To set theexperiments option programmatically, use the following syntax.
Java
In yourPipelineOptions object, include theexperiments option by using the following syntax.This example sets the boot disk size to 80 GB with the experiment flag.
options.setExperiments("streaming_boot_disk_size_gb=80")For an example that shows how to create thePipelineOptions object, see theLaunch on Dataflow samplesection on this page.
Python
In yourPipelineOptions object, include theexperiments option by using the following syntax.This example sets the boot disk size to 80 GB with the experiment flag.
beam_options=PipelineOptions(beam_args,experiments=['streaming_boot_disk_size_gb=80'])For an example that shows how to create thePipelineOptions object, see theLaunch on Dataflow samplesection on this page.
Go
Setting pipeline options programmatically usingPipelineOptions is notsupported in the Apache Beam SDK for Go. Use Go command-line arguments.
Set on the command line
To set theexperiments option on the command line, use the following syntax.
Java
This example sets the boot disk size to 80 GB with the experiment flag.
--experiments=streaming_boot_disk_size_gb=80Python
This example sets the boot disk size to 80 GB with the experiment flag.
--experiments=streaming_boot_disk_size_gb=80Go
This example sets the boot disk size to 80 GB with the experiment flag.
--experiments=streaming_boot_disk_size_gb=80Set in a template
To enable an experimental feature when running a Dataflowtemplate, use the--additional-experiments flag.
Classic template
gclouddataflowjobsrunJOB_NAME--additional-experiments=EXPERIMENT[,...]Flex template
gclouddataflowflex-templaterunJOB_NAME--additional-experiments=EXPERIMENT[,...]Access the pipeline options object
When you create yourPipeline object in your Apache Beam program, passPipelineOptions. When the Dataflow service runsyour pipeline, it sends a copy of thePipelineOptions to each worker.
Java
AccessPipelineOptions inside anyParDo transform'sDoFn instance by usingthe methodProcessContext.getPipelineOptions.
Python
This feature is not supported in the Apache Beam SDK for Python.
Go
Access pipeline options usingbeam.PipelineOptions.
Launch on Dataflow
Run your job on managed Google Cloud resources by using theDataflow runner service. Running your pipeline withDataflow creates a Dataflow job, which usesCompute Engine and Cloud Storage resources in your Google Cloudproject. For information about Dataflow permissions, seeDataflow security and permissions.
Dataflow jobs useCloud Storage to store temporary filesduring pipeline execution. To avoid being billed for unnecessary storage costs,turn off the soft delete feature on buckets that yourDataflow jobs use for temporary storage.For more information, seeDisable soft delete.
Note: TypingCtrl+C from the command linedoes not cancel your job. TheDataflow service is still running the job on Google Cloud.To cancel the job, use theDataflow Monitoring Interface or theDataflow command-line interface.Set required options
To run your pipeline using Dataflow, set the followingpipeline options:
Java
project: the ID of your Google Cloud project.runner: the pipeline runner that executes your pipeline. ForGoogle Cloud execution, this must beDataflowRunner.gcpTempLocation: a Cloud Storage path forDataflow to stage most temporary files. The specified bucketmust already exist.If you don't specify
gcpTempLocation, then Dataflow usesthe value of thetempLocationoption. If you don't specify either ofthese options, then Dataflow creates a newCloud Storage bucket.
Python
project: your Google Cloud project ID.region: the region for your Dataflow job.runner: the pipeline runner that executes your pipeline. ForGoogle Cloud execution, this must beDataflowRunner.temp_location: a Cloud Storage path forDataflow to stage temporary job files created during theexecution of the pipeline.
Go
project: your Google Cloud project ID.region: the region for your Dataflow job.runner: the pipeline runner that executes your pipeline. ForGoogle Cloud execution, this must bedataflow.staging_location: a Cloud Storage path forDataflow to stage temporary job files created during theexecution of the pipeline.
Set pipeline options programmatically
The following example code shows how to construct a pipeline byprogrammatically setting the runner and other required options to execute thepipeline using Dataflow.
Java
// Create and set your PipelineOptions.DataflowPipelineOptionsoptions=PipelineOptionsFactory.as(DataflowPipelineOptions.class);// For cloud execution, set the Google Cloud project, staging location,// and set DataflowRunner.options.setProject("my-project-id");options.setStagingLocation("gs://my-bucket/binaries");options.setRunner(DataflowRunner.class);// Create the Pipeline with the specified options.Pipelinep=Pipeline.create(options);Python
importargparseimportapache_beamasbeamfromapache_beam.options.pipeline_optionsimportPipelineOptionsparser=argparse.ArgumentParser()# parser.add_argument('--my-arg', help='description')args,beam_args=parser.parse_known_args()# Create and set your PipelineOptions.# For Cloud execution, specify DataflowRunner and set the Cloud Platform# project, job name, temporary files location, and region.# For more information about regions, check:# https://cloud.google.com/dataflow/docs/concepts/regional-endpointsbeam_options=PipelineOptions(beam_args,runner='DataflowRunner',project='my-project-id',job_name='unique-job-name',temp_location='gs://my-bucket/temp',region='us-central1')# Note: Repeatable options like dataflow_service_options or experiments must# be specified as a list of string(s).# e.g. dataflow_service_options=['enable_prime']# Create the Pipeline with the specified options.withbeam.Pipeline(options=beam_options)aspipeline:pass# build your pipeline here.Go
The Apache Beam SDK for Go uses Go command-line arguments. Useflag.Set() to set flag values.
// Use the Go flag package to parse custom options.flag.Parse()// Set the required options programmatically.// For Cloud execution, specify the Dataflow runner, Google Cloud// project ID, region, and staging location.// For more information about regions, see// https://cloud.google.com/dataflow/docs/concepts/regional-endpointsflag.Set("runner","dataflow")flag.Set("project","my-project-id")flag.Set("region","us-central1")flag.Set("staging_location","gs://my-bucket/binaries")beam.Init()// Create the Pipeline.p:=beam.NewPipeline()s:=p.Root()After you've constructed your pipeline, specify all the pipeline reads,transforms, and writes, and run the pipeline.
Use pipeline options from the command line
The following example shows how to use pipeline options that are specified onthe command line. This example doesn't set the pipeline optionsprogrammatically.
Java
// Set your PipelineOptions to the specified command-line optionsMyOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation();// Create the Pipeline with the specified options.Pipelinep=Pipeline.create(options);Python
Use thePython argparse module to parse command-line options.
# Use Python argparse module to parse custom argumentsimportargparseimportapache_beamasbeamfromapache_beam.options.pipeline_optionsimportPipelineOptions# For more details on how to use argparse, take a look at:# https://docs.python.org/3/library/argparse.htmlparser=argparse.ArgumentParser()parser.add_argument('--input-file',default='gs://dataflow-samples/shakespeare/kinglear.txt',help='The file path for the input text to process.')parser.add_argument('--output-path',required=True,help='The path prefix for output files.')args,beam_args=parser.parse_known_args()# Create the Pipeline with remaining arguments.beam_options=PipelineOptions(beam_args)withbeam.Pipeline(options=beam_options)aspipeline:lines=(pipeline|'Read files' >>beam.io.ReadFromText(args.input_file)|'Write files' >>beam.io.WriteToText(args.output_path))Go
Use theGoflag package to parsecommand-line options. You must parse the options before you callbeam.Init(). In this example,output is a command-line option.
// Define configuration optionsvar(output=flag.String("output","","Output file (required)."))// Parse options before beam.Init()flag.Parse()beam.Init()// Input validation must be done after beam.Init()if*output==""{log.Fatal("No output provided!")}p:=beam.NewPipeline()After you construct your pipeline, specify all the pipeline reads,transforms, and writes, and then run the pipeline.
Control execution modes
When an Apache Beam program runs a pipeline on a service such asDataflow, the program can either run the pipeline asynchronously,or can block until pipeline completion. You can change this behavior by usingthe following guidance.
Java
When an Apache Beam Java program runs a pipeline on a service such asDataflow, it is typically executed asynchronously. To run apipeline and wait until the job completes, setDataflowRunner as thepipeline runner and explicitly callpipeline.run().waitUntilFinish().
When you useDataflowRunner and callwaitUntilFinish() on thePipelineResult object returned frompipeline.run(), the pipeline executeson Google Cloud but the local code waits for the cloud job to finish andreturn the finalDataflowPipelineJob object. While the job runs, theDataflow service prints job status updates and console messageswhile it waits.
Python
When an Apache Beam Python program runs a pipeline on a service such asDataflow, it is typically executed asynchronously. To blockuntil pipeline completion, use thewait_until_finish() method of thePipelineResult object, returned from therun() method of the runner.
Go
When an Apache Beam Go program runs a pipeline on Dataflow,it is synchronous by default and blocks until pipeline completion. If youdon't want to block, there are two options:
Start the job in a Go routine.
gofunc(){pr,err:=beamx.Run(ctx,p)iferr!=nil{// Handle the error.}// Send beam.PipelineResult into a channel.results<-pr}()// Do other operations while the pipeline runs.Use the
--asynccommand-line flag, which is in thejoboptspackage.
To view execution details, monitor progress, and verify job completion status,use theDataflow monitoring interfaceor theDataflow command line interface.
Use streaming sources
Java
If your pipeline reads from an unbounded data source, such asPub/Sub, the pipeline automatically executes in streaming mode.
Python
If your pipeline uses an unbounded data source, such as Pub/Sub, youmust set thestreaming option to true.
Go
If your pipeline reads from an unbounded data source, such asPub/Sub, the pipeline automatically executes in streaming mode.
Streaming jobs use a Compute Enginemachine typeofn1-standard-2 or higher by default.
GroupByKey.Launch locally
Instead of running your pipeline on managed cloud resources, you can choose toexecute your pipeline locally. Local execution has certain advantages fortesting, debugging, or running your pipeline over small data sets. For example,local execution removes the dependency on the remote Dataflowservice and associated Google Cloud project.
When you use local execution, you must run your pipeline with datasets smallenough to fit in local memory. You can create a small in-memorydataset using aCreate transform, or you can use aRead transform towork with small local or remote files. Local execution typically provides afaster and easier way to perform testing and debugging with fewer externaldependencies, but is limited by the memory that is available in your localenvironment.
The following example code shows how to construct a pipeline that executes inyour local environment.
Java
// Create and set our Pipeline Options.PipelineOptionsoptions=PipelineOptionsFactory.create();// Create the Pipeline with the specified options.Pipelinep=Pipeline.create(options);DirectRunneris already the default. However, you do need to either explicitly includeDirectRunner as a dependency or add it to the classpath.Python
importargparseimportapache_beamasbeamfromapache_beam.options.pipeline_optionsimportPipelineOptionsparser=argparse.ArgumentParser()# parser.add_argument('--my-arg')args,beam_args=parser.parse_known_args()# Create and set your Pipeline Options.beam_options=PipelineOptions(beam_args)args=beam_options.view_as(MyOptions)withbeam.Pipeline(options=beam_options)aspipeline:lines=(pipeline|beam.io.ReadFromText(args.input)|beam.io.WriteToText(args.output))DirectRunneris already the default.Go
// Parse options before beam.Init()flag.Parse()beam.Init()p:=beam.NewPipeline()direct isalready the default.After you've constructed your pipeline, run it.
Create custom pipeline options
You can add your own custom options in addition to the standardPipelineOptions. Apache Beam's command line can also parse customoptions using command line arguments specified in the same format.
Java
To add your own options, define an interface with getter and setter methodsfor each option, as in the following example:
publicinterfaceMyOptionsextendsPipelineOptions{StringgetMyCustomOption();voidsetMyCustomOption(StringmyCustomOption);}Python
To add your own options, use theadd_argument() method (which behavesexactly like Python's standardargparse module),as in the following example:
fromapache_beam.options.pipeline_optionsimportPipelineOptionsclassMyOptions(PipelineOptions):@classmethoddef_add_argparse_args(cls,parser):parser.add_argument('--input')parser.add_argument('--output')Go
To add your own options, use theGo flag package as shown in thefollowing example:
var(input=flag.String("input","","")output=flag.String("output","",""))You can also specify a description, which appears when a user passes--help asa command-line argument, and a default value.
Java
You set the description and default value using annotations, as follows:
publicinterfaceMyOptionsextendsPipelineOptions{@Description("My custom command line argument.")@Default.String("DEFAULT")StringgetMyCustomOption();voidsetMyCustomOption(StringmyCustomOption);}We recommend that you register your interface withPipelineOptionsFactoryand then pass the interface when creating thePipelineOptions object. Whenyou register your interface withPipelineOptionsFactory, the--help canfind your custom options interface and add it to the output of the--helpcommand.PipelineOptionsFactory validates that your custom options arecompatible with all other registered options.
The following example code shows how to register your custom options interfacewithPipelineOptionsFactory:
PipelineOptionsFactory.register(MyOptions.class);MyOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);Now your pipeline can accept--myCustomOption=value as a command-lineargument.
Python
You set the description and default value as follows:
fromapache_beam.options.pipeline_optionsimportPipelineOptionsclassMyOptions(PipelineOptions):@classmethoddef_add_argparse_args(cls,parser):parser.add_argument('--input',default='gs://dataflow-samples/shakespeare/kinglear.txt',help='The file path for the input text to process.')parser.add_argument('--output',required=True,help='The path prefix for output files.')Go
You set the description and default value as follows:
var(input=flag.String("input","gs://MY_STORAGE_BUCKET/input","Input for the pipeline")output=flag.String("output","gs://MY_STORAGE_BUCKET/output","Output for the pipeline"))Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.