Write from Dataflow to Pub/Sub Stay organized with collections Save and categorize content based on your preferences.
This document describes how to write text data from Dataflow toPub/Sub by using the Apache BeamPubSubIOI/O connector.
Overview
To write data to Pub/Sub, use thePubSubIO connector. The inputelements can be either Pub/Sub messages or just the message data.If the input elements are Pub/Sub messages, you can optionallyset attributes or an ordering key on each message.
You can use either the Java, Python, or Go version of thePubSubIO connector,as follows:
Java
To write to a single topic, call thePubsubIO.writeMessages method. Thismethod takes an input collection ofPubsubMessage objects. The connectoralso defines convenience methods for writing strings, binary-encoded Avromessages, or binary-encoded protobuf messages. These methods convert the inputcollection into Pub/Sub messages.
To write to a dynamic set of topics based on the input data, callwriteMessagesDynamic. Specifythe destination topic for each message by callingPubsubMessage.withTopic onthe message. For example, you can route messages to different topics based onthe value of a particular field in your input data.
For more information, see thePubsubIO reference documentation.
Python
Call thepubsub.WriteToPubSub method.By default, this method takes an input collection of typebytes,representing the message payload. If thewith_attributes parameter isTrue, the method takes a collection ofPubsubMessage objects.
For more information, see thepubsub module reference documentation.
Go
To write data to Pub/Sub, call thepubsubio.Write method. This method takes aninput collection of eitherPubSubMessage objects or byte slices that containthe message payloads.
For more information, see thepubsubio package reference documentation.
For more information about Pub/Sub messages, seeMessage format in thePub/Sub documentation.
Note: Pub/Sub messages have a10 MB size limit.The Apache BeamPubSubIO connector requires additional space for data encoding,which lowers the maximum message size it can process.For example, the SDK for Java has a7.5 MB message limitbecause of base64 encoding.Timestamps
Pub/Sub sets a timestamp on each message. This timestamprepresents the time when the message is published to Pub/Sub. In astreaming scenario, you might also care about theevent timestamp, whichis the time when the message data was generated. You can use the Apache Beamelement timestamp to represent event time. Sources that create an unboundedPCollection oftenassign each new element a timestamp that corresponds to the event time.
For Java and Python, the Pub/Sub I/O connector can write eachelement's timestamp as a Pub/Sub message attribute. Messageconsumers can use this attribute to get the event timestamp.
Java
CallPubsubIO.Write<T>.withTimestampAttribute and specify the name of theattribute.
Python
Specify thetimestamp_attribute parameter when you callWriteToPubSub.
Message delivery
Dataflow supportsexactly-once processing of messageswithin a pipeline. However, the Pub/Sub I/O connector can'tguarantee exactly-once delivery of messages through Pub/Sub.
For Java and Python, you can configure the Pub/Sub I/O connectorto write each element's unique ID as a message attribute. Message consumers canthen use this attribute to deduplicate messages.
Java
CallPubsubIO.Write<T>.withIdAttribute and specify the name of theattribute.
Python
Specify theid_label parameter when you callWriteToPubSub.
Direct output
If you enableat-least-once streaming mode in yourpipeline, then the I/O connector usesdirect output. In this mode, theconnector doesn't checkpoint messages, which enables faster writes. However,retries in this mode might cause duplicate messages with different message IDs,possibly making it harder for message consumers to deduplicate the messages.
For pipelines that use exactly-once mode, you can enable direct output bysetting thestreaming_enable_pubsub_direct_outputservice option. Direct outputreduces write latency and results in more efficient processing. Consider thisoption if your message consumers can handle duplicate messages with non-uniquemessage IDs.
Examples
The following example creates aPCollection of Pub/Sub messagesand writes them to a Pub/Sub topic. The topic is specified as apipeline option. Each message contains payload data and a set of attributes.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.
importjava.nio.charset.StandardCharsets;importjava.util.Arrays;importjava.util.HashMap;importjava.util.List;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.coders.DefaultCoder;importorg.apache.beam.sdk.extensions.avro.coders.AvroCoder;importorg.apache.beam.sdk.io.gcp.pubsub.PubsubIO;importorg.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.transforms.Create;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.TypeDescriptor;publicclassPubSubWriteWithAttributes{publicinterfaceOptionsextendsPipelineOptions{@Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")StringgetTopic();voidsetTopic(Stringvalue);}// A custom datatype for the source data.@DefaultCoder(AvroCoder.class)staticclassExampleData{publicStringname;publicStringproduct;publicLongtimestamp;// Epoch time in millisecondspublicExampleData(){}publicExampleData(Stringname,Stringproduct,Longtimestamp){this.name=name;this.product=product;this.timestamp=timestamp;}}// Write messages to a Pub/Sub topic.publicstaticvoidmain(String[]args){// Example source data.finalList<ExampleData>messages=Arrays.asList(newExampleData("Robert","TV",1613141590000L),newExampleData("Maria","Phone",1612718280000L),newExampleData("Juan","Laptop",1611618000000L),newExampleData("Rebeca","Videogame",1610000000000L));// Parse the pipeline options passed into the application. Example:// --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsvaroptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);varpipeline=Pipeline.create(options);pipeline// Create some data to write to Pub/Sub..apply(Create.of(messages))// Convert the data to Pub/Sub messages..apply(MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via((message->{byte[]payload=message.product.getBytes(StandardCharsets.UTF_8);// Create attributes for each message.HashMap<String,String>attributes=newHashMap<String,String>();attributes.put("buyer",message.name);attributes.put("timestamp",Long.toString(message.timestamp));returnnewPubsubMessage(payload,attributes);})))// Write the messages to Pub/Sub..apply(PubsubIO.writeMessages().to(options.getTopic()));pipeline.run().waitUntilFinish();}} To authenticate to Dataflow, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.Python
importargparsefromtypingimportAny,Dict,Listimportapache_beamasbeamfromapache_beam.ioimportPubsubMessagefromapache_beam.ioimportWriteToPubSubfromapache_beam.options.pipeline_optionsimportPipelineOptionsfromtyping_extensionsimportSelfdefitem_to_message(item:Dict[str,Any])->PubsubMessage:# Re-import needed types. When using the Dataflow runner, this# function executes on a worker, where the global namespace is not# available. For more information, see:# https://cloud.google.com/dataflow/docs/guides/common-errors#name-errorfromapache_beam.ioimportPubsubMessageattributes={"buyer":item["name"],"timestamp":str(item["ts"])}data=bytes(item["product"],"utf-8")returnPubsubMessage(data=data,attributes=attributes)defwrite_to_pubsub(argv:List[str]=None)->None:# Parse the pipeline options passed into the application. Example:# --topic=$TOPIC_PATH --streaming# For more information, see# https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsclassMyOptions(PipelineOptions):@classmethod# Define a custom pipeline option to specify the Pub/Sub topic.def_add_argparse_args(cls:Self,parser:argparse.ArgumentParser)->None:parser.add_argument("--topic",required=True)example_data=[{"name":"Robert","product":"TV","ts":1613141590000},{"name":"Maria","product":"Phone","ts":1612718280000},{"name":"Juan","product":"Laptop","ts":1611618000000},{"name":"Rebeca","product":"Video game","ts":1610000000000},]options=MyOptions()withbeam.Pipeline(options=options)aspipeline:(pipeline|"Create elements" >>beam.Create(example_data)|"Convert to Pub/Sub messages" >>beam.Map(item_to_message)|WriteToPubSub(topic=options.topic,with_attributes=True))print("Pipeline ran successfully.")
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.