Create a Dataflow pipeline using Python
This document shows you how to use the Apache Beam SDK for Python to build a programthat defines a pipeline. Then, you run the pipeline by using a direct local runner or a cloud-basedrunner such as Dataflow. For an introduction to the WordCount pipeline, see theHow to use WordCount in Apache Beam video.
To follow step-by-step guidance for this task directly in the Google Cloud console, clickGuide me:
Before you begin
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
Install the Google Cloud CLI.
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
Toinitialize the gcloud CLI, run the following command:
gcloudinit
Create or select a Google Cloud project.
Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project: To create a project, you need the Project Creator role (
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission.Learn how to grant roles.
Create a Google Cloud project:
gcloud projects createPROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating.Select the Google Cloud project that you created:
gcloud config set projectPROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Compute Engine, Cloud Logging,Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore,and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.gcloudservicesenabledataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com Create local authentication credentials for your user account:
gcloudauthapplication-defaultlogin
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.supportUser, roles/datastream.admin,roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
Install the Google Cloud CLI.
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
Toinitialize the gcloud CLI, run the following command:
gcloudinit
Create or select a Google Cloud project.
Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project: To create a project, you need the Project Creator role (
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission.Learn how to grant roles.
Create a Google Cloud project:
gcloud projects createPROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating.Select the Google Cloud project that you created:
gcloud config set projectPROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Compute Engine, Cloud Logging,Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore,and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.gcloudservicesenabledataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com Create local authentication credentials for your user account:
gcloudauthapplication-defaultlogin
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.supportUser, roles/datastream.admin,roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
Grant roles to your Compute Engine default service account. Run the following command once for each of the following IAM roles:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com"--role=SERVICE_ACCOUNT_ROLE
- Replace
PROJECT_IDwith your project ID. - Replace
PROJECT_NUMBERwith your project number. To find your project number, seeIdentify projects or use thegcloud projects describecommand. - Replace
SERVICE_ACCOUNT_ROLEwith each individual role.
- Create a Cloud Storage bucket and configure it as follows:
- Set the storage class to
S(Standard). - Set the storage location to the following:
US(United States). - Replace
BUCKET_NAMEwith a unique bucket name. Don't include sensitive information in the bucket name because the bucket namespace is global and publicly visible.
gcloudstoragebucketscreategs://BUCKET_NAME--default-storage-class STANDARD--locationUS - Set the storage class to
- Copy the Google Cloud project ID and the Cloud Storage bucket name. You need these values later in this document.
Set up your environment
Dataflow no longer supports pipelines using Python 2. For more information, seePython 2 support on Google Cloud Platform page. For details about supported versions, seeApache Beam runtime support.In this section, use the command prompt to set up an isolated Python virtual environment to run your pipeline projectby usingvenv.This process lets you isolate the dependencies of one project from the dependencies of other projects.
If you don't have a command prompt readily available, you can useCloud Shell.Cloud Shell already has the package manager for Python 3 installed, so you can skip to creating avirtual environment.
To install Python and then create a virtual environment, follow these steps:
- Check that you have Python 3 and
piprunning in your system:python--versionpython-mpip--version
- If required, install Python 3 and then set up a Python virtual environment: follow the instructionsprovided in theInstalling Python andSetting up venv sections of theSetting up a Python development environment page.
After you complete the quickstart, you can deactivate the virtual environment by runningdeactivate.
Get the Apache Beam SDK
The Apache Beam SDK is an open source programming model for data pipelines. You define apipeline with an Apache Beam program and then choose a runner, such as Dataflow, to run your pipeline.
To download and install the Apache Beam SDK, follow these steps:
- Verify that you are in the Python virtual environment that you created in the preceding section.Ensure that the prompt starts with
<env_name>, whereenv_nameis the name of the virtual environment. - Install the latest version of the Apache Beam SDK for Python:
pipinstallapache-beam[gcp]
Run the pipeline locally
To see how a pipeline runs locally, use a ready-made Python module for thewordcountexample that is included with theapache_beam package.
Thewordcount pipeline example does the following:
Takes a text file as input.
This text file is located in a Cloud Storage bucket with theresource name
gs://dataflow-samples/shakespeare/kinglear.txt.- Parses each line into words.
- Performs a frequency count on the tokenized words.
To stage thewordcount pipeline locally, follow these steps:
- From your local terminal, run the
wordcountexample:python-mapache_beam.examples.wordcount\--outputoutputs - View the output of the pipeline:
moreoutputs*
- To exit, pressq.
wordcount.py source codeonApache Beam GitHub.Run the pipeline on the Dataflow service
In this section, run thewordcount example pipeline from theapache_beam package on the Dataflow service. Thisexample specifiesDataflowRunner as the parameter for--runner.- Run the pipeline:
python-mapache_beam.examples.wordcount\--regionDATAFLOW_REGION\--inputgs://dataflow-samples/shakespeare/kinglear.txt\--outputgs://BUCKET_NAME/results/outputs\--runnerDataflowRunner\--projectPROJECT_ID\--temp_locationgs://BUCKET_NAME/tmp/
Replace the following:
DATAFLOW_REGION: theregion where you want to deploy the Dataflow job—for example,europe-west1The
--regionflag overrides the default region that is set in the metadata server, your local client, or environment variables.BUCKET_NAME: the Cloud Storage bucket name that you copied earlierPROJECT_ID: the Google Cloud project ID that you copied earlier
--service_account_emailpipeline option. User-managed worker service accounts are recommended for production workloads. If you don't specify a worker service account when you create a job, Dataflow uses theCompute Engine default service account.View your results
When you run a pipeline using Dataflow, your results are stored in a Cloud Storage bucket.In this section, verify that the pipeline is running by using either the Google Cloud console or the local terminal.
Google Cloud console
To view your results in Google Cloud console, follow these steps:
- In the Google Cloud console, go to the DataflowJobs page.
TheJobs page displays details of your
wordcountjob, including a status ofRunning at first, and thenSucceeded. - Go to the Cloud StorageBuckets page.
From the list of buckets in your project, click the storage bucket that you created earlier.
In the
wordcountdirectory, the output files that your job created are displayed.
Local terminal
View the results from your terminal or by using Cloud Shell.
- To list the output files, use the
gcloud storage lscommand:gcloudstoragelsgs://BUCKET_NAME/results/outputs*--long
- To view the results in the output files, use the
gcloud storage catcommand:gcloudstoragecatgs://BUCKET_NAME/results/outputs*
ReplaceBUCKET_NAME with the name of the Cloud Storage bucket usedin the pipeline program.
Modify the pipeline code
Thewordcount pipeline in the previous examples distinguishes between uppercase and lowercase words. The following steps show how to modify the pipeline so that thewordcount pipeline is not case-sensitive.- On your local machine, download the latest copy of the
wordcountcode from the Apache Beam GitHub repository. - From the local terminal, run the pipeline:
pythonwordcount.py--outputoutputs
- View the results:
moreoutputs*
- To exit, pressq.
- In an editor of your choice, open the
wordcount.pyfile. - Inside the
runfunction, examine the pipeline steps:counts=(lines|'Split'>>(beam.ParDo(WordExtractingDoFn()).with_output_types(str))|'PairWithOne'>>beam.Map(lambdax:(x,1))|'GroupAndSum'>>beam.CombinePerKey(sum))
After
split, the lines are split into words as strings. - To lowercase the strings, modify the line after
split: This modification maps thecounts=(lines|'Split'>>(beam.ParDo(WordExtractingDoFn()).with_output_types(str))|'lowercase'>>beam.Map(str.lower)|'PairWithOne'>>beam.Map(lambdax:(x,1))|'GroupAndSum'>>beam.CombinePerKey(sum))
str.lowerfunction onto every word. This line is equivalent tobeam.Map(lambda word: str.lower(word)). - Save the file and run the modified
wordcountjob:pythonwordcount.py--outputoutputs
- View the results of the modified pipeline:
moreoutputs*
- To exit, pressq.
- Run the modified pipeline on the Dataflow service:
pythonwordcount.py\--regionDATAFLOW_REGION\--inputgs://dataflow-samples/shakespeare/kinglear.txt\--outputgs://BUCKET_NAME/results/outputs\--runnerDataflowRunner\--projectPROJECT_ID\--temp_locationgs://BUCKET_NAME/tmp/
Replace the following:
DATAFLOW_REGION: theregion where you want to deploy the Dataflow jobBUCKET_NAME: your Cloud Storage bucket namePROJECT_ID: you Google Cloud project ID
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.
Note: If you followed this quickstart in a new project, then you candelete the project.- Delete the bucket:
Important: Your bucket must be empty before you can delete it.gcloud storage buckets deleteBUCKET_NAME
If you keep your project, revoke the roles that you granted to the Compute Engine default service account. Run the following command once for each of the following IAM roles:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloudprojectsremove-iam-policy-bindingPROJECT_ID\--member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com\--role=SERVICE_ACCOUNT_ROLE
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloudauthapplication-defaultrevoke
Optional: Revoke credentials from the gcloud CLI.
gcloudauthrevoke
What's next
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.