Stream messages from Pub/Sub by using Dataflow and Cloud Storage

Dataflow is a fully-managed service for transforming andenriching data in stream (real-time) and batch modes with equalreliability and expressiveness. It provides a simplified pipelinedevelopment environment using the Apache Beam SDK, which has a rich setof windowing and session analysis primitives as well as an ecosystem of sourceand sink connectors.This quickstart shows you how to use Dataflow to:

  • Read messages published to a Pub/Sub topic
  • Window (or group) the messages by timestamp
  • Write the messages to Cloud Storage

This quickstart introduces you to using Dataflow in Java andPython.SQLis also supported. This quickstart is also offered as aGoogle Cloud Skills Boost tutorialwhich offers temporary credentials to get you started.

You can also start by using UI-based Dataflowtemplatesif you do not intend to do custom data processing.

Note: Before you start this quickstart, make sure you're using aversion of Python or aversion of Javathat's supported by Dataflow.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenabledataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  8. Set up authentication:

    1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin).Learn how to grant roles.
    2. Create the service account:

      gcloudiamservice-accountscreateSERVICE_ACCOUNT_NAME

      ReplaceSERVICE_ACCOUNT_NAME with a name for the service account.

    3. Grant roles to the service account. Run the following command once for each of the following IAM roles:roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com"--role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
      Note: The--role flag affects which resources the service account can access in your project. You can revoke these roles or grant additional roles later.
    4. Grant the required role to the principal that will attach the service account to other resources.

      gcloudiamservice-accountsadd-iam-policy-bindingSERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com--member="user:USER_EMAIL"--role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  9. Install the Google Cloud CLI.

  10. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  11. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  12. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  13. Verify that billing is enabled for your Google Cloud project.

  14. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenabledataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  15. Set up authentication:

    1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin).Learn how to grant roles.
    2. Create the service account:

      gcloudiamservice-accountscreateSERVICE_ACCOUNT_NAME

      ReplaceSERVICE_ACCOUNT_NAME with a name for the service account.

    3. Grant roles to the service account. Run the following command once for each of the following IAM roles:roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com"--role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
      Note: The--role flag affects which resources the service account can access in your project. You can revoke these roles or grant additional roles later.
    4. Grant the required role to the principal that will attach the service account to other resources.

      gcloudiamservice-accountsadd-iam-policy-bindingSERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com--member="user:USER_EMAIL"--role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  16. Create local authentication credentials for your user account:

    gcloudauthapplication-defaultlogin

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Set up your Pub/Sub project

  1. Create variables for your bucket, project, and region. Cloud Storage bucket names must be globally unique. Select a Dataflowregion close to where you run the commands in this quickstart. The value of theREGION variable must be a valid region name. For more information about regions and locations, seeDataflow locations.

    BUCKET_NAME=BUCKET_NAMEPROJECT_ID=$(gcloudconfigget-valueproject)TOPIC_ID=TOPIC_IDREGION=DATAFLOW_REGIONSERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
  2. Create a Cloud Storage bucket owned by this project:

    gcloudstoragebucketscreategs://$BUCKET_NAME
  3. Create a Pub/Sub topic in this project:

    gcloudpubsubtopicscreate$TOPIC_ID
  4. Create a Cloud Scheduler job in this project. The job publishes a message to a Pub/Sub topic at one-minute intervals.

    If an App Engine app does not exist for the project, this step will create one.

    gcloudschedulerjobscreatepubsubpublisher-job--schedule="* * * * *"\--topic=$TOPIC_ID--message-body="Hello!"--location=$REGION

    Start the job.

    gcloudschedulerjobsrunpublisher-job--location=$REGION
  5. Use the following commands to clone the quickstart repository and navigate to the sample code directory:

    Java

    gitclonehttps://github.com/GoogleCloudPlatform/java-docs-samples.gitcdjava-docs-samples/pubsub/streaming-analytics

    Python

    gitclonehttps://github.com/GoogleCloudPlatform/python-docs-samples.gitcdpython-docs-samples/pubsub/streaming-analyticspipinstall-rrequirements.txt# Install Apache Beam dependencies

Stream messages from Pub/Sub to Cloud Storage

Code sample

This sample code uses Dataflow to:

  • Read Pub/Sub messages.
  • Window (or group) messages into fixed-size intervals by publish timestamps.
  • Write the messages in each window to files in Cloud Storage.

Java

importjava.io.IOException;importorg.apache.beam.examples.common.WriteOneFilePerWindow;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.io.gcp.pubsub.PubsubIO;importorg.apache.beam.sdk.options.Default;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.options.StreamingOptions;importorg.apache.beam.sdk.options.Validation.Required;importorg.apache.beam.sdk.transforms.windowing.FixedWindows;importorg.apache.beam.sdk.transforms.windowing.Window;importorg.joda.time.Duration;publicclassPubSubToGcs{/*   * Define your own configuration options. Add your own arguments to be processed   * by the command-line parser, and specify default values for them.   */publicinterfacePubSubToGcsOptionsextendsStreamingOptions{@Description("The Cloud Pub/Sub topic to read from.")@RequiredStringgetInputTopic();voidsetInputTopic(Stringvalue);@Description("Output file's window size in number of minutes.")@Default.Integer(1)IntegergetWindowSize();voidsetWindowSize(Integervalue);@Description("Path of the output file including its filename prefix.")@RequiredStringgetOutput();voidsetOutput(Stringvalue);}publicstaticvoidmain(String[]args)throwsIOException{// The maximum number of shards when writing output.intnumShards=1;PubSubToGcsOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);options.setStreaming(true);Pipelinepipeline=Pipeline.create(options);pipeline// 1) Read string messages from a Pub/Sub topic..apply("Read PubSub Messages",PubsubIO.readStrings().fromTopic(options.getInputTopic()))// 2) Group the messages into fixed-sized minute intervals..apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))// 3) Write one file to GCS for every window of messages..apply("Write Files to GCS",newWriteOneFilePerWindow(options.getOutput(),numShards));// Execute the pipeline and wait until it finishes running.pipeline.run().waitUntilFinish();}}

Python

importargparsefromdatetimeimportdatetimeimportloggingimportrandomfromapache_beamimport(DoFn,GroupByKey,io,ParDo,Pipeline,PTransform,WindowInto,WithKeys,)fromapache_beam.options.pipeline_optionsimportPipelineOptionsfromapache_beam.transforms.windowimportFixedWindowsclassGroupMessagesByFixedWindows(PTransform):"""A composite transform that groups Pub/Sub messages based on publish time    and outputs a list of tuples, each containing a message and its publish time.    """def__init__(self,window_size,num_shards=5):# Set window size to 60 seconds.self.window_size=int(window_size*60)self.num_shards=num_shardsdefexpand(self,pcoll):return(pcoll# Bind window info to each element using element timestamp (or publish time).|"Window into fixed intervals"            >>WindowInto(FixedWindows(self.window_size))|"Add timestamp to windowed elements" >>ParDo(AddTimestamp())# Assign a random key to each windowed element based on the number of shards.|"Add key" >>WithKeys(lambda_:random.randint(0,self.num_shards-1))# Group windowed elements by key. All the elements in the same window must fit# memory for this. If not, you need to use `beam.util.BatchElements`.|"Group by key" >>GroupByKey())classAddTimestamp(DoFn):defprocess(self,element,publish_time=DoFn.TimestampParam):"""Processes each windowed element by extracting the message body and its        publish time into a tuple.        """yield(element.decode("utf-8"),datetime.utcfromtimestamp(float(publish_time)).strftime("%Y-%m-%d %H:%M:%S.%f"),)classWriteToGCS(DoFn):def__init__(self,output_path):self.output_path=output_pathdefprocess(self,key_value,window=DoFn.WindowParam):"""Write messages in a batch to Google Cloud Storage."""ts_format="%H:%M"window_start=window.start.to_utc_datetime().strftime(ts_format)window_end=window.end.to_utc_datetime().strftime(ts_format)shard_id,batch=key_valuefilename="-".join([self.output_path,window_start,window_end,str(shard_id)])withio.gcsio.GcsIO().open(filename=filename,mode="w")asf:formessage_body,publish_timeinbatch:f.write(f"{message_body},{publish_time}\n".encode())defrun(input_topic,output_path,window_size=1.0,num_shards=5,pipeline_args=None):# Set `save_main_session` to True so DoFns can access globally imported modules.pipeline_options=PipelineOptions(pipeline_args,streaming=True,save_main_session=True)withPipeline(options=pipeline_options)aspipeline:(pipeline# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam# binds the publish time returned by the Pub/Sub server for each message# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub|"Read from Pub/Sub" >>io.ReadFromPubSub(topic=input_topic)|"Window into" >>GroupMessagesByFixedWindows(window_size,num_shards)|"Write to GCS" >>ParDo(WriteToGCS(output_path)))if__name__=="__main__":logging.getLogger().setLevel(logging.INFO)parser=argparse.ArgumentParser()parser.add_argument("--input_topic",help="The Cloud Pub/Sub topic to read from."'"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',)parser.add_argument("--window_size",type=float,default=1.0,help="Output file's window size in minutes.",)parser.add_argument("--output_path",help="Path of the output GCS file including the prefix.",)parser.add_argument("--num_shards",type=int,default=5,help="Number of shards to use when writing windowed elements to GCS.",)known_args,pipeline_args=parser.parse_known_args()run(known_args.input_topic,known_args.output_path,known_args.window_size,known_args.num_shards,pipeline_args,)

Start the pipeline

To start the pipeline, run the following command:

Java

mvncompileexec:java\-Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs\-Dexec.cleanupDaemonThreads=false\-Dexec.args=" \    --project=$PROJECT_ID \    --region=$REGION \    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \    --output=gs://$BUCKET_NAME/samples/output \    --gcpTempLocation=gs://$BUCKET_NAME/temp \    --runner=DataflowRunner \    --windowSize=2 \    --serviceAccount=$SERVICE_ACCOUNT"

Python

pythonPubSubToGCS.py\--project=$PROJECT_ID\--region=$REGION\--input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID\--output_path=gs://$BUCKET_NAME/samples/output\--runner=DataflowRunner\--window_size=2\--num_shards=2\--temp_location=gs://$BUCKET_NAME/temp\--service_account_email=$SERVICE_ACCOUNT

The preceding command runs locally and launches a Dataflow jobthat runs in the cloud. When the command returnsJOB_MESSAGE_DETAILED: Workershave started successfully, exit the local program usingCtrl+C.

Observe job and pipeline progress

You can observe the job's progress in the Dataflow console.

Go to the Dataflow console

Observe the job's progress

Open the job details view to see:

  • Job structure
  • Job logs
  • Stage metrics

Observe the job's progress

You may have to wait a few minutes to see the output files inCloud Storage.

Observe the job's progress

Alternatively, use the command line below to check which files have been writtenout.

gcloudstoragelsgs://${BUCKET_NAME}/samples/

The output should look like the following:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

  1. Delete the Cloud Scheduler job.

    gcloudschedulerjobsdeletepublisher-job--location=$REGION
  2. In the Dataflow console, stop the job. Cancel the pipelinewithout draining it.

  3. Delete the topic.

    gcloudpubsubtopicsdelete$TOPIC_ID
  4. Delete the files created by the pipeline.

    gcloudstoragerm"gs://${BUCKET_NAME}/samples/output*"--recursive--continue-on-errorgcloudstoragerm"gs://${BUCKET_NAME}/temp/*"--recursive--continue-on-error
  5. Remove the Cloud Storage bucket.

    gcloudstoragermgs://${BUCKET_NAME}--recursive

  6. Delete the service account:
    gcloud iam service-accounts deleteSERVICE_ACCOUNT_EMAIL
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloudauthapplication-defaultrevoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloudauthrevoke

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.