Write from Dataflow to Cloud Storage

This document describes how to write text data from Dataflow toCloud Storage by using the Apache BeamTextIOI/O connector.

Note: Depending on your scenario, consider using one of theGoogle-provided Dataflow templates.Several of these templates output to Cloud Storage.

Include the Google Cloud Platform library dependency

To use theTextIO connector with Cloud Storage, include the followingdependency. This library provides a schema handler for"gs://" filenames.

Java

<dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-google-cloud-platform</artifactId><version>${beam.version}</version></dependency>

Python

apache-beam[gcp]==VERSION

Go

import_"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

For more information, seeInstall the Apache Beam SDK.

Enable gRPC on Apache Beam I/O connector on Dataflow

You canconnect to Cloud Storage using gRPC through theApache Beam I/O connector on Dataflow.gRPC is ahigh performance open-source remote procedure call (RPC) framework developed byGoogle that you can use to interact withCloud Storage.

To speed up your Dataflow job's write requests to Cloud Storage, youcan enable the Apache Beam I/O connector on Dataflow to use gRPC.

Command line

  1. Ensure that you use the Apache Beam SDK version2.55.0 or later.
  2. To run a Dataflow job, use--additional-experiments=use_grpc_for_gcs pipeline option. For information about the different pipeline options, seeOptional flags.

Apache Beam SDK

  1. Ensure that you use the Apache Beam SDK version2.55.0 or later.
  2. To run a Dataflow job, use--experiments=use_grpc_for_gcs pipeline option. For information about the different pipeline options, seeBasic options.

You can configure Apache Beam I/O connector on Dataflow to generate gRPC related metrics in Cloud Monitoring. The gRPC related metrics can help you to do the following:

  • Monitor and optimize the performance of gRPC requests to Cloud Storage.
  • Troubleshoot and debug issues.
  • Gain insights into your application's usage and behavior.

For information about how to configure Apache Beam I/O connector on Dataflow to generate gRPC related metrics, seeUse client-side metrics. If gathering metrics isn't necessary for your use case, you can choose to opt-out of metrics collection. For instructions, seeOpt-out of client-side metrics.

Parallelism

Parallelism is determined primarily by the number of shards. By default, therunner automatically sets this value. For most pipelines, using the defaultbehavior is recommended. In this document, seeBest practices.

Performance

The following table shows performance metrics for writing toCloud Storage. The workloads were run on onee2-standard2 worker,using the Apache Beam SDK 2.49.0 for Java. They did not use Runner v2.

100 M records | 1 kB | 1 columnThroughput (bytes)Throughput (elements)
Write130 MBps130,000 elements per second

These metrics are based on simple batch pipelines. They are intended to compare performance between I/O connectors, and are not necessarily representative of real-world pipelines. Dataflow pipeline performance is complex, and is a function of VM type, the data being processed, the performance of external sources and sinks, and user code. Metrics are based on running the Java SDK, and aren't representative of the performance characteristics of other language SDKs. For more information, seeBeam IO Performance.

Best practices

  • In general, avoid setting a specific number of shards. This allows the runnerto select an appropriate value for your scale. To enable autosharding, call.withAutoSharding(), not.withNumShards(0). If you tune the number ofshards, we recommend writing between 100MB and 1GB per shard. However, theoptimum value might depend on the workload.

  • Cloud Storage can scale to a very large number of requests persecond. However, if your pipeline has large spikes in write volume, considerwriting to multiple buckets, to avoid temporarily overloading any singleCloud Storage bucket.

  • In general, writing to Cloud Storage is more efficient when eachwrite is larger (1 kb or greater). Writing small records to a large number offiles can result in worse performance per byte.

  • When generating file names, consider using non-sequential file names, in orderto distribute load. For more information, seeUse a naming convention that distributes load evenly across key ranges.

  • When naming files, don't use the at sign ('@') followed by a number or anasterisk ('*'). For more information, see"@*" and "@N" are reserved sharding specs.

Example: Write text files to Cloud Storage

The following example creates a batch pipeline that writes text files using GZIPcompression:

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

importjava.util.Arrays;importjava.util.List;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.io.Compression;importorg.apache.beam.sdk.io.TextIO;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.transforms.Create;publicclassBatchWriteStorage{publicinterfaceOptionsextendsPipelineOptions{@Description("The Cloud Storage bucket to write to")StringgetBucketName();voidsetBucketName(Stringvalue);}// Write text data to Cloud Storagepublicstaticvoidmain(String[]args){finalList<String>wordsList=Arrays.asList("1","2","3","4");varoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);varpipeline=Pipeline.create(options);pipeline.apply(Create.of(wordsList)).apply(TextIO.write().to(options.getBucketName()).withSuffix(".txt").withCompression(Compression.GZIP));pipeline.run().waitUntilFinish();}}

If the inputPCollection is unbounded, you must define a window or a trigger on the collection, and then specify windowed writes by callingTextIO.Write.withWindowedWrites.

Python

To authenticate to Dataflow, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

importargparsefromtypingimportListimportapache_beamasbeamfromapache_beam.io.textioimportWriteToTextfromapache_beam.options.pipeline_optionsimportPipelineOptionsfromtyping_extensionsimportSelfdefwrite_to_cloud_storage(argv:List[str]=None)->None:# Parse the pipeline options passed into the application.classMyOptions(PipelineOptions):@classmethod# Define a custom pipeline option that specfies the Cloud Storage bucket.def_add_argparse_args(cls:Self,parser:argparse.ArgumentParser)->None:parser.add_argument("--output",required=True)wordsList=["1","2","3","4"]options=MyOptions()withbeam.Pipeline(options=options.view_as(PipelineOptions))aspipeline:(pipeline|"Create elements" >>beam.Create(wordsList)|"Write Files" >>WriteToText(options.output,file_name_suffix=".txt"))

For the output path, specify a Cloud Storage path that includes thebucket name and a filename prefix. For example, if you specifygs://my_bucket/output/file, theTextIO connector writes to theCloud Storage bucket namedmy_bucket, and the output files have the prefixoutput/file*.

By default, theTextIO connector shards the output files, using a namingconvention like this:<file-prefix>-00000-of-00001. Optionally, you canspecify a filename suffix and a compression scheme, as shown in the example.

To ensure idempotent writes, Dataflow writes to a temporary fileand then copies the completed temporary file to the final file.To control where these temporary files are stored,use thewithTempDirectorymethod.

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.