Work with Dataflow data pipelines Stay organized with collections Save and categorize content based on your preferences.
Overview
You can use Dataflow data pipelines for the following tasks:
- Create recurrent job schedules.
- Understand where resources are spent over multiple job executions.
- Define and manage data freshness objectives.
- Drill down into individual pipeline stages to fix and optimize yourpipelines.
For API documentation, see theData Pipelines reference.
Features
- Create a recurring batch pipeline to run a batch job on a schedule.
- Create a recurring incremental batch pipeline to run a batch job against thelatest version of input data.
- Use the pipeline summary scorecard to view the aggregated capacityusage and resource consumption of a pipeline.
- View the data freshness of a streaming pipeline. This metric, which evolves overtime, can be tied to an alert that notifies you when freshness falls lower than aspecified objective.
- Use pipeline metric graphs to compare batch pipeline jobs and findanomalies.
Limitations
Regional availability: You can create data pipelines inavailable Cloud Scheduler regions.
Quota:
- Default number of pipelines per project: 500
Default number of pipelines per organization: 2500
The organization level quota is disabled by default. You can opt-in toorganization level quotas, and if you do so, each organization can have atmost 2500 pipelines by default.
Labels: You can't useuser-defined labels tolabel Dataflow data pipelines.However, when you use the
additionalUserLabelsfield, those values are passed through to your Dataflow job.For more information about how labels apply to individualDataflow jobs, seePipeline options.
Types of data pipelines
Dataflow has two data pipeline types, streaming and batch.Both types of pipeline run jobs that are defined in Dataflowtemplates.
- Streaming data pipeline
- A streaming data pipeline runs a Dataflow streamingjob immediately after it's created.
- Batch data pipeline
A batch data pipeline runs a Dataflow batchjob on a user-defined schedule. The batch pipeline input filename canbe parameterized to allow forincremental batch pipeline processing.
Note: Every Dataflow batch job name created by a batch data pipelineuses the following naming pattern:<pipeline_id>-mp--<timestamp>-<random int>. The valueoftimestamphas seconds granularity. The string-mp–is reserved to indicate that thisDataflow batch job is created by a batch data pipeline. Thepipeline_idis truncated after 27 characters.
Incremental batch pipelines
You can use datetime placeholders to specify an incremental input fileformat for a batch pipeline.
- Placeholders for year, month, date, hour, minute, and second can be used, andmust follow the
strftime()format.Placeholders are preceded by the percentage symbol (%). - Parameter formatting is not verified during pipeline creation.
- Example: If you specify "gs://bucket/Y" as the parameterized input path,it's evaluated as "gs://bucket/Y", because "Y" without a preceding "%"does not map to the
strftime()format.
- Example: If you specify "gs://bucket/Y" as the parameterized input path,it's evaluated as "gs://bucket/Y", because "Y" without a preceding "%"does not map to the
At each scheduled batch pipeline execution time, the placeholderportion of the input path is evaluated to the current (ortime-shifted) datetime. Date valuesare evaluated using the current date in the time zone of the scheduled job.If the evaluated path matches the path of an input file, the file ispicked up for processing by the batch pipeline at the scheduled time.
- Example: A batch pipeline is scheduled to repeat at the start of each hourPST. If you parameterize the input path as
gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv,on April 15, 2021, 6PM PST, the input path is evaluated togs://bucket-name/2021-04-15/prefix-18_00.csv.
Use time shift parameters
You can use + or - minute or hour time shift parameters.To support matching an input path with an evaluated datetime that isshifted before or after the current datetime of the pipeline schedule,enclose these parameters in curly braces.Use the format{[+|-][0-9]+[m|h]}. The batch pipeline continues to repeat at itsscheduled time, but the input path is evaluated with the specifiedtime offset.
- Example: A batch pipeline is scheduled to repeat at the start of each hourPST. If you parameterize the input path as
gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h},on April 15, 2021, 6PM PST, the input path is evaluated togs://bucket-name/2021-04-15/prefix-16_00.csv.
Data pipeline roles
For Dataflow data pipeline operations to succeed, you need thenecessary IAM roles, as follows:
You need the appropriate role to perform operations:
Datapipelines.admin:Can perform all data pipeline operationsDatapipelines.viewer:Can view data pipelines and jobsDatapipelines.invoker:Can invoke a data pipeline job run (this role can be enabled using the API)
The service account used by Cloud Scheduler needs to have the
roles/iam.serviceAccountUserrole, whether the service account is user-specified or the default Compute Engine service account. For more information, seeData pipeline roles.You need to be able to act as the service account used by Cloud Scheduler andDataflow by being granted the
roles/iam.serviceAccountUserrole on that account. If you don't select a service account forCloud Scheduler and Dataflow, thedefault Compute Engine service accountis used.
NOT_FOUND error status from Cloud Schedulerwith the log entrytype.googleapis.com/google.cloud.scheduler.logging.AttemptFinished,it often indicates missing permissions on the service accountsfor either Cloud Scheduler or Dataflow.Verify and correct these permissions to resolve data pipeline issues.Create a data pipeline
You can create a Dataflow data pipeline in two ways:
The data pipelines setup page: When you first access the Dataflowpipelines feature in the Google Cloud console, a setup page opens. Enable thelisted APIs to create data pipelines.
Import a job
You can import a Dataflow batch or streaming job that is based on aclassic or flex templateand make it a data pipeline.
In the Google Cloud console, go to the DataflowJobspage.
Select a completed job, then on theJob Detailspage, select+Import as a pipeline.
On theCreate pipeline from template page, the parameters are populatedwith the options of the imported job.
For a batch job, in theSchedule your pipeline section,provide a recurrence schedule. Providing an email account address for the Cloud Scheduler, which is used to schedule batch runs, is optional. If it's not specified, thedefault Compute Engine service account is used.
Create a data pipeline
In the Google Cloud console, go to the DataflowData pipelines page.
Select+Create data pipeline.
On theCreate pipeline from template page, provide a pipeline name, and fill in the other templateselection and parameter fields.
For a batch job, in theSchedule your pipeline section,provide a recurrence schedule. Providing an email account address for the Cloud Scheduler, which is used to schedule batch runs, is optional. If a value is not specified, thedefault Compute Engine service account is used.
Create a batch data pipeline
To create this sample batch data pipeline, you musthave access to the following resources in your project:
- ACloud Storage bucket to storeinput and output files
- ABigQuery dataset to create a table.
This example pipeline uses theCloud Storage Text to BigQuerybatch pipeline template. This template reads files in CSV format fromCloud Storage, runs a transform, then inserts values intoa BigQuery table with three columns.
Create the following files on your local drive:
A
bq_three_column_table.jsonfile that contains the following schemaof the destination BigQuery table.{"BigQuery Schema":[{"name":"col1","type":"STRING"},{"name":"col2","type":"STRING"},{"name":"col3","type":"INT64"}]}A
split_csv_3cols.jsJavaScript file, which implements asimple transformation on the input data before insertion into BigQuery.functiontransform(line){varvalues=line.split(',');varobj=newObject();obj.col1=values[0];obj.col2=values[1];obj.col3=values[2];varjsonString=JSON.stringify(obj);returnjsonString;}A
file01.csvCSV file with several records that are inserted into theBigQuery table.b8e5087a,74,275317a52c051,4a,25846672de80f,cd,76981111b92bf,2e,104653ff658424,f0,149364e6c17c75,84,38840833f5a69,8f,76892d8c833ff,7d,2013867d3da7fb,d5,819193836d29b,70,181524ca66e6e5,d7,172076c8475eb6,03,247282558294df,f3,155392737b82a8,c7,23552382c8f5dc,35,46803957ab17f9,5e,480350cbcdaf84,bd,35412752b55391,eb,423078825b8863,62,8816026f16d4f,fd,397783
Use the
gcloud storage cpcommand to copy the files to folders ina Cloud Storage bucket in your project, as follows:Copy
bq_three_column_table.jsonandsplit_csv_3cols.jstogs://BUCKET_ID/text_to_bigquery/gcloudstoragecpbq_three_column_table.jsongs://BUCKET_ID/text_to_bigquery/gcloudstoragecpsplit_csv_3cols.jsgs://BUCKET_ID/text_to_bigquery/Copy
file01.csvtogs://BUCKET_ID/inputs/gcloudstoragecpfile01.csvgs://BUCKET_ID/inputs/
In the Google Cloud console, go to the Cloud StorageBucketspage.
To create a
tmpfolder in your Cloud Storage bucket,select your folder name to open theBucket details page,then clickCreate folder.
In the Google Cloud console, go to the DataflowData pipelines page.
SelectCreate data pipeline. Enter or select the following itemson theCreate pipeline from template page:
- ForPipeline name, enter
text_to_bq_batch_data_pipeline. - ForRegional endpoint, select a Compute Engineregion.The source and destination regions must match. Therefore, yourCloud Storage bucket and BigQuery table must be in the same region.
ForDataflow template, inProcess Data in Bulk (batch), selectText Files on Cloud Storage to BigQuery.
Note: Don't select the streaming pipeline with the same name inProcess Data Continuously (stream).ForSchedule your pipeline, select a schedule, such asHourly at minute25,in your timezone. You can edit the schedule after you submit the pipeline.Providing an email account address for the Cloud Scheduler,which is used to schedule batch runs, is optional. If it's notspecified, thedefault Compute Engine service accountis used.
InRequired parameters, enter the following:
- ForJavaScript UDF path in Cloud Storage:
gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
- ForJSON path:
BUCKET_ID/text_to_bigquery/bq_three_column_table.json
- ForJavaScript UDF name:
transform - ForBigQuery output table:
PROJECT_ID:DATASET_ID.three_column_table
- ForCloud Storage input path:
BUCKET_ID/inputs/file01.csv
- ForTemporary BigQuery directory:
BUCKET_ID/tmp
- ForTemporary location:
BUCKET_ID/tmp
- ForJavaScript UDF path in Cloud Storage:
ClickCreate pipeline.
- ForPipeline name, enter
Confirm pipeline and template information and viewcurrent and previous history from thePipeline details page.

You can edit the data pipeline schedule from thePipeline info panel on thePipeline details page.

You can also run a batch pipeline on demand using theRun button in the Dataflow Pipelines console.
Create a sample streaming data pipeline
You can create a sample streaming data pipeline by following thesample batch pipeline instructions,with the following differences:
- ForPipeline schedule, don't specify a schedule for a streamingdata pipeline. The Dataflow streaming job is started immediately.
- ForDataflow template, inProcess Data Continuously (stream), selectText Files on Cloud Storage to BigQuery.
- ForWorker machine type, the pipeline processes the initial set offiles matching the
gs://BUCKET_ID/inputs/file01.csvpattern andany additional files matching this pattern that you upload totheinputs/folder. If the size of CSV files exceeds several GB,to avoid possible out-of-memory errors, select a machine type with higher memorythan the defaultn1-standard-4machine type, such asn1-highmem-8.
Troubleshooting
This section shows you how to resolve issues with Dataflow datapipelines.
Data pipeline job fails to launch
When you use data pipelines to create a recurring job schedule, yourDataflow job might not launch, and a503 status error appears inthe Cloud Scheduler log files.
This issue occurs when Dataflow is temporarily unable to run the job.
To work around this issue, configure Cloud Scheduler to retry the job. Becausethe issue is temporary, when the job is retried, it might succeed. Formore information about setting retry values in Cloud Scheduler, seeCreate a job.
Investigate pipeline objectives violations
The following sections describe how to investigate pipelines that don't meetperformance objectives.
Recurring batch pipelines
For an initial analysis of the health of your pipeline,on thePipeline info page in the Google Cloud console, use theIndividual job status andThread time per step graphs.These graphs are located in the pipeline status panel.
Example investigation:
You have a recurring batch pipeline that runs every hour at 3 minutes pastthe hour. Each job normally runs for approximately 9 minutes.You have an objective for all jobs to complete in less than 10 minutes.
The job status graph shows that a job ran for more than 10 minutes.
In theUpdate/Execution history table, find the job that ran during thehour of interest. Click through to the Dataflow job details page.On that page, find the longer running stage, and then look in the logs forpossible errors to determine the cause of the delay.
Streaming pipelines
For an initial analysis of the health of your pipeline,on thePipeline Details page, in thePipeline info tab,use the data freshness graph. This graph is located in the pipeline status panel.
Example investigation:
You have a streaming pipeline that normally produces an output with adata freshnessof 20 seconds.
You set an objective of having a 30-second data freshness guarantee.When you review the data freshness graph, you notice that between 9 and 10 AM,data freshness jumped to almost 40 seconds.

Switch to thePipeline metrics tab, then view the CPU Utilizationand Memory Utilization graphs for further analysis.
Error: Pipeline ID already exists within the project
If you try to create a new pipeline with a name that already exists in yourproject, you receive this error message:Pipeline Id already exist within theproject. To avoid this issue, always choose unique names for your pipelines.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.