Create a Dataflow pipeline using Python

This document shows you how to use the Apache Beam SDK for Python to build a programthat defines a pipeline. Then, you run the pipeline by using a direct local runner or a cloud-basedrunner such as Dataflow. For an introduction to the WordCount pipeline, see theHow to use WordCount in Apache Beam video.


To follow step-by-step guidance for this task directly in the Google Cloud console, clickGuide me:

Guide me


Before you begin

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging,Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore,and Cloud Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenabledataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloudauthapplication-defaultlogin

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles:roles/iam.supportUser, roles/datastream.admin,roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Logging,Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore,and Cloud Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenabledataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloudauthapplication-defaultlogin

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles:roles/iam.supportUser, roles/datastream.admin,roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  18. Grant roles to your Compute Engine default service account. Run the following command once for each of the following IAM roles:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com"--role=SERVICE_ACCOUNT_ROLE
    • ReplacePROJECT_ID with your project ID.
    • ReplacePROJECT_NUMBER with your project number. To find your project number, seeIdentify projects or use thegcloud projects describe command.
    • ReplaceSERVICE_ACCOUNT_ROLE with each individual role.
  19. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class toS (Standard).
    • Set the storage location to the following:US (United States).
    • ReplaceBUCKET_NAME with a unique bucket name. Don't include sensitive information in the bucket name because the bucket namespace is global and publicly visible.
    gcloudstoragebucketscreategs://BUCKET_NAME--default-storage-class STANDARD--locationUS
  20. Copy the Google Cloud project ID and the Cloud Storage bucket name. You need these values later in this document.

Set up your environment

Dataflow no longer supports pipelines using Python 2. For more information, seePython 2 support on Google Cloud Platform page. For details about supported versions, seeApache Beam runtime support.

In this section, use the command prompt to set up an isolated Python virtual environment to run your pipeline projectby usingvenv.This process lets you isolate the dependencies of one project from the dependencies of other projects.

If you don't have a command prompt readily available, you can useCloud Shell.Cloud Shell already has the package manager for Python 3 installed, so you can skip to creating avirtual environment.

To install Python and then create a virtual environment, follow these steps:

  1. Check that you have Python 3 andpip running in your system:
    python--versionpython-mpip--version
  2. If required, install Python 3 and then set up a Python virtual environment: follow the instructionsprovided in theInstalling Python andSetting up venv sections of theSetting up a Python development environment page.

After you complete the quickstart, you can deactivate the virtual environment by runningdeactivate.

Get the Apache Beam SDK

The Apache Beam SDK is an open source programming model for data pipelines. You define apipeline with an Apache Beam program and then choose a runner, such as Dataflow, to run your pipeline.

To download and install the Apache Beam SDK, follow these steps:

  1. Verify that you are in the Python virtual environment that you created in the preceding section.Ensure that the prompt starts with<env_name>, whereenv_nameis the name of the virtual environment.
  2. Install the latest version of the Apache Beam SDK for Python:
  3. pipinstallapache-beam[gcp]

Run the pipeline locally

To see how a pipeline runs locally, use a ready-made Python module for thewordcountexample that is included with theapache_beam package.

Thewordcount pipeline example does the following:

  1. Takes a text file as input.

    This text file is located in a Cloud Storage bucket with theresource namegs://dataflow-samples/shakespeare/kinglear.txt.

  2. Parses each line into words.
  3. Performs a frequency count on the tokenized words.

To stage thewordcount pipeline locally, follow these steps:

  1. From your local terminal, run thewordcount example:
    python-mapache_beam.examples.wordcount\--outputoutputs
  2. View the output of the pipeline:
    moreoutputs*
  3. To exit, pressq.
Running the pipeline locally lets you test and debug your Apache Beam program.You can view thewordcount.py source codeonApache Beam GitHub.

Run the pipeline on the Dataflow service

In this section, run thewordcount example pipeline from theapache_beam package on the Dataflow service. Thisexample specifiesDataflowRunner as the parameter for--runner.
  • Run the pipeline:
    python-mapache_beam.examples.wordcount\--regionDATAFLOW_REGION\--inputgs://dataflow-samples/shakespeare/kinglear.txt\--outputgs://BUCKET_NAME/results/outputs\--runnerDataflowRunner\--projectPROJECT_ID\--temp_locationgs://BUCKET_NAME/tmp/

    Replace the following:

    • DATAFLOW_REGION: theregion where you want to deploy the Dataflow job—for example,europe-west1

      The--region flag overrides the default region that is set in the metadata server, your local client, or environment variables.

    • BUCKET_NAME: the Cloud Storage bucket name that you copied earlier
    • PROJECT_ID: the Google Cloud project ID that you copied earlier
Note: To specify auser-managed worker service account, include the--service_account_emailpipeline option. User-managed worker service accounts are recommended for production workloads. If you don't specify a worker service account when you create a job, Dataflow uses theCompute Engine default service account.

View your results

When you run a pipeline using Dataflow, your results are stored in a Cloud Storage bucket.In this section, verify that the pipeline is running by using either the Google Cloud console or the local terminal.

Google Cloud console

To view your results in Google Cloud console, follow these steps:

  1. In the Google Cloud console, go to the DataflowJobs page.

    Go to Jobs

    TheJobs page displays details of yourwordcount job, including a status ofRunning at first, and thenSucceeded.

  2. Go to the Cloud StorageBuckets page.

    Go to Buckets

  3. From the list of buckets in your project, click the storage bucket that you created earlier.

    In thewordcount directory, the output files that your job created are displayed.

Local terminal

View the results from your terminal or by using Cloud Shell.

  1. To list the output files, use thegcloud storage ls command:
    gcloudstoragelsgs://BUCKET_NAME/results/outputs*--long
  2. ReplaceBUCKET_NAME with the name of the Cloud Storage bucket usedin the pipeline program.

  3. To view the results in the output files, use thegcloud storage cat command:
    gcloudstoragecatgs://BUCKET_NAME/results/outputs*

Modify the pipeline code

Thewordcount pipeline in the previous examples distinguishes between uppercase and lowercase words. The following steps show how to modify the pipeline so that thewordcount pipeline is not case-sensitive.
  1. On your local machine, download the latest copy of thewordcount code from the Apache Beam GitHub repository.
  2. From the local terminal, run the pipeline:
    pythonwordcount.py--outputoutputs
  3. View the results:
    moreoutputs*
  4. To exit, pressq.
  5. In an editor of your choice, open thewordcount.py file.
  6. Inside therun function, examine the pipeline steps:
    counts=(lines|'Split'>>(beam.ParDo(WordExtractingDoFn()).with_output_types(str))|'PairWithOne'>>beam.Map(lambdax:(x,1))|'GroupAndSum'>>beam.CombinePerKey(sum))

    Aftersplit, the lines are split into words as strings.

  7. To lowercase the strings, modify the line aftersplit:
    counts=(lines|'Split'>>(beam.ParDo(WordExtractingDoFn()).with_output_types(str))|'lowercase'>>beam.Map(str.lower)|'PairWithOne'>>beam.Map(lambdax:(x,1))|'GroupAndSum'>>beam.CombinePerKey(sum))
    This modification maps thestr.lower function onto every word. This line is equivalent tobeam.Map(lambda word: str.lower(word)).
  8. Save the file and run the modifiedwordcount job:
    pythonwordcount.py--outputoutputs
  9. View the results of the modified pipeline:
    moreoutputs*
  10. To exit, pressq.
  11. Run the modified pipeline on the Dataflow service:
    pythonwordcount.py\--regionDATAFLOW_REGION\--inputgs://dataflow-samples/shakespeare/kinglear.txt\--outputgs://BUCKET_NAME/results/outputs\--runnerDataflowRunner\--projectPROJECT_ID\--temp_locationgs://BUCKET_NAME/tmp/

    Replace the following:

    • DATAFLOW_REGION: theregion where you want to deploy the Dataflow job
    • BUCKET_NAME: your Cloud Storage bucket name
    • PROJECT_ID: you Google Cloud project ID

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

Note: If you followed this quickstart in a new project, then you candelete the project.
  1. Delete the bucket:
    gcloud storage buckets deleteBUCKET_NAME
    Important: Your bucket must be empty before you can delete it.
  2. If you keep your project, revoke the roles that you granted to the Compute Engine default service account. Run the following command once for each of the following IAM roles:

    gcloudprojectsremove-iam-policy-bindingPROJECT_ID\--member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com\--role=SERVICE_ACCOUNT_ROLE
  3. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloudauthapplication-defaultrevoke
  4. Optional: Revoke credentials from the gcloud CLI.

    gcloudauthrevoke

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.