Flow control Stay organized with collections Save and categorize content based on your preferences.
This document provides information about using flow control with messagespublished to a topic.
About flow control
A publisher client may attempt to publish messages faster than that client iscapable of sending data to the Pub/Sub service. Clients are limited bymany factors, including:
- Machine CPU, RAM, and network capacity
- Network settings, such as the number of outstanding requests and available bandwidth
- The latency of each publish request, largely determined by the network connections between the Pub/Subservice, the client, and Google Cloud
If the publish request rate exceeds these limits, requests accumulate in memoryuntil they fail with aDEADLINE_EXCEEDED error. This is especially likelywhen tens of thousands of messages are published in a loop, generating thousandsof requests in milliseconds.
You can diagnose this issue by checking the server side metrics in Monitoring. Youwon't be able to see the requests that have failed withDEADLINE_EXCEEDED, only thesuccessful requests. The rate of successful requests tells you thethroughput capacity of your client machines, providing a baseline for configuring flowcontrol.
To mitigate flow rate issues, configure your publisher client with flow control tolimit the rate of publish requests. You can configure the maximum number of bytes allocated for outstanding requests, and the maximum number of outstanding messages permitted. Set these limits according to the throughput capacity of your client machines.
Note: Use flow control if messages are periodically published at a higherrate than they can be sent. If this is a persistent problem rather thana transient spike in messages, consider increasing the number ofpublisher client instances.Before you begin
Before configuring the publish workflow, ensure you have completed the followingtasks:
- Learn abouttopics and the publishing workflow.
- Create a topic.
Required roles
To get the permissions that you need to use flow control, ask your administrator to grant you thePub/Sub Publisher (roles/pubsub.publisher) IAM role on your topic. For more information about granting roles, seeManage access to projects, folders, and organizations.
You might also be able to get the required permissions throughcustom roles or otherpredefined roles.
You needadditional permissions tocreate or update topics and subscriptions.
Use flow control with messages
Publisher flow control is available using thePub/Sub client libraries in the following languages:
C++
Before trying this sample, follow the C++ setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub C++ API reference documentation.
namespacepubsub=::google::cloud::pubsub;using::google::cloud::future;using::google::cloud::Options;using::google::cloud::StatusOr;[](std::stringproject_id,std::stringtopic_id){autotopic=pubsub::Topic(std::move(project_id),std::move(topic_id));// Configure the publisher to block if either (1) 100 or more messages, or// (2) messages with 100MiB worth of data have not been acknowledged by the// service. By default the publisher never blocks, and its capacity is only// limited by the system's memory.autopublisher=pubsub::Publisher(pubsub::MakePublisherConnection(std::move(topic),Options{}.set<pubsub::MaxPendingMessagesOption>(100).set<pubsub::MaxPendingBytesOption>(100*1024*1024L).set<pubsub::FullPublisherActionOption>(pubsub::FullPublisherAction::kBlocks)));std::vector<future<void>>ids;for(charconst*data:{"a","b","c"}){ids.push_back(publisher.Publish(pubsub::MessageBuilder().SetData(data).Build()).then([data](future<StatusOr<std::string>>f){autos=f.get();if(!s)return;std::cout <<"Sent '" <<data <<"' (" <<*s <<")\n";}));}publisher.Flush();// Block until they are actually sent.for(auto&id:ids)id.get();}Go
The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, seethe migration guide to v2.To see a list of v1 code samples, seethe deprecated code samples.
Before trying this sample, follow the Go setup instructions inQuickstart: Using Client Libraries.For more information, see thePub/Sub Go API reference documentation.
import("context""fmt""io""strconv""sync""sync/atomic""cloud.google.com/go/pubsub/v2")funcpublishWithFlowControlSettings(wio.Writer,projectID,topicIDstring)error{// projectID := "my-project-id"// topicID := "my-topic"ctx:=context.Background()client,err:=pubsub.NewClient(ctx,projectID)iferr!=nil{returnfmt.Errorf("pubsub.NewClient: %w",err)}deferclient.Close()// client.Publisher can be passed a topic ID (e.g. "my-topic") or// a fully qualified name (e.g. "projects/my-project/topics/my-topic").// If a topic ID is provided, the project ID from the client is used.// Reuse this publisher for all publish calls to send messages in batches.publisher:=client.Publisher(topicID)publisher.PublishSettings.FlowControlSettings=pubsub.FlowControlSettings{MaxOutstandingMessages:100,// default 1000MaxOutstandingBytes:10*1024*1024,// default 0 (unlimited)LimitExceededBehavior:pubsub.FlowControlBlock,// default Ignore, other options: Block and SignalError}varwgsync.WaitGroupvartotalErrorsuint64numMsgs:=1000// Rapidly publishing 1000 messages in a loop may be constrained by flow control.fori:=0;i <numMsgs;i++{wg.Add(1)result:=publisher.Publish(ctx,&pubsub.Message{Data:[]byte("message #"+strconv.Itoa(i)),})gofunc(iint,res*pubsub.PublishResult){fmt.Fprintf(w,"Publishing message %d\n",i)deferwg.Done()// The Get method blocks until a server-generated ID or// an error is returned for the published message._,err:=res.Get(ctx)iferr!=nil{// Error handling code can be added here.fmt.Fprintf(w,"Failed to publish: %v",err)atomic.AddUint64(&totalErrors,1)return}}(i,result)}wg.Wait()iftotalErrors >0{returnfmt.Errorf("%d of %d messages did not publish successfully",totalErrors,numMsgs)}returnnil}Java
Before trying this sample, follow the Java setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Java API reference documentation.
importcom.google.api.core.ApiFuture;importcom.google.api.core.ApiFutures;importcom.google.api.gax.batching.BatchingSettings;importcom.google.api.gax.batching.FlowControlSettings;importcom.google.api.gax.batching.FlowController.LimitExceededBehavior;importcom.google.cloud.pubsub.v1.Publisher;importcom.google.protobuf.ByteString;importcom.google.pubsub.v1.PubsubMessage;importcom.google.pubsub.v1.TopicName;importjava.io.IOException;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;publicclassPublishWithFlowControlExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringprojectId="your-project-id";StringtopicId="your-topic-id";publishWithFlowControlExample(projectId,topicId);}publicstaticvoidpublishWithFlowControlExample(StringprojectId,StringtopicId)throwsIOException,ExecutionException,InterruptedException{TopicNametopicName=TopicName.of(projectId,topicId);Publisherpublisher=null;List<ApiFuture<String>>messageIdFutures=newArrayList<>();try{// Configure how many messages the publisher client can hold in memory// and what to do when messages exceed the limit.FlowControlSettingsflowControlSettings=FlowControlSettings.newBuilder()// Block more messages from being published when the limit is reached. The other// options are Ignore (or continue publishing) and ThrowException (or error out)..setLimitExceededBehavior(LimitExceededBehavior.Block).setMaxOutstandingRequestBytes(10*1024*1024L)// 10 MiB.setMaxOutstandingElementCount(100L)// 100 messages.build();// By default, messages are not batched.BatchingSettingsbatchingSettings=BatchingSettings.newBuilder().setFlowControlSettings(flowControlSettings).build();publisher=Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();// Publish 1000 messages in quick succession may be constrained by publisher flow control.for(inti=0;i <1000;i++){Stringmessage="message "+i;ByteStringdata=ByteString.copyFromUtf8(message);PubsubMessagepubsubMessage=PubsubMessage.newBuilder().setData(data).build();// Once published, returns a server-assigned message id (unique within the topic)ApiFuture<String>messageIdFuture=publisher.publish(pubsubMessage);messageIdFutures.add(messageIdFuture);}}finally{// Wait on any pending publish requests.List<String>messageIds=ApiFutures.allAsList(messageIdFutures).get();System.out.println("Published "+messageIds.size()+" messages with flow control settings.");if(publisher!=null){// When finished with the publisher, shut down to free up resources.publisher.shutdown();publisher.awaitTermination(1,TimeUnit.MINUTES);}}}}Node.js
Before trying this sample, follow the Node.js setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Node.js API reference documentation.
/** * TODO(developer): Uncomment this variable before running the sample. */// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';// Imports the Google Cloud client libraryconst{PubSub}=require('@google-cloud/pubsub');// Creates a client; cache this for further useconstpubSubClient=newPubSub();asyncfunctionpublishWithFlowControl(topicNameOrId){// Create publisher optionsconstoptions={flowControlOptions:{maxOutstandingMessages:50,maxOutstandingBytes:10*1024*1024,// 10 MB},};// Get a publisher. Cache topic objects (publishers) and reuse them.consttopic=pubSubClient.topic(topicNameOrId,options);// For flow controlled publishing, we'll use a publisher flow controller// instead of `topic.publish()`.constflow=topic.flowControlled();// Publish messages in a fast loop.consttestMessage={data:Buffer.from('test!')};for(leti=0;i <1000;i++){// You can also just `await` on `publish()` unconditionally, but if// you want to avoid pausing to the event loop on each iteration,// you can manually check the return value before doing so.constwait=flow.publish(testMessage);if(wait){awaitwait;}}// Wait on any pending publish requests. Note that you can call `all()`// earlier if you like, and it will return a Promise for all messages// that have been sent to `flowController.publish()` so far.constmessageIds=awaitflow.all();console.log(`Published${messageIds.length} with flow control settings.`);}Node.js
Before trying this sample, follow the Node.js setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Node.js API reference documentation.
/** * TODO(developer): Uncomment this variable before running the sample. */// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';// Imports the Google Cloud client libraryimport{PubSub,PublishOptions}from'@google-cloud/pubsub';// Creates a client; cache this for further useconstpubSubClient=newPubSub();asyncfunctionpublishWithFlowControl(topicNameOrId:string){// Create publisher optionsconstoptions:PublishOptions={flowControlOptions:{maxOutstandingMessages:50,maxOutstandingBytes:10*1024*1024,// 10 MB},};// Get a publisher. Cache topic objects (publishers) and reuse them.consttopic=pubSubClient.topic(topicNameOrId,options);// For flow controlled publishing, we'll use a publisher flow controller// instead of `topic.publish()`.constflow=topic.flowControlled();// Publish messages in a fast loop.consttestMessage={data:Buffer.from('test!')};for(leti=0;i <1000;i++){// You can also just `await` on `publish()` unconditionally, but if// you want to avoid pausing to the event loop on each iteration,// you can manually check the return value before doing so.constwait=flow.publish(testMessage);if(wait){awaitwait;}}// Wait on any pending publish requests. Note that you can call `all()`// earlier if you like, and it will return a Promise for all messages// that have been sent to `flowController.publish()` so far.constmessageIds=awaitflow.all();console.log(`Published${messageIds.length} with flow control settings.`);}Python
Before trying this sample, follow the Python setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Python API reference documentation.
fromconcurrentimportfuturesfromgoogle.cloudimportpubsub_v1fromgoogle.cloud.pubsub_v1.typesimport(LimitExceededBehavior,PublisherOptions,PublishFlowControl,)# TODO(developer)# project_id = "your-project-id"# topic_id = "your-topic-id"# Configure how many messages the publisher client can hold in memory# and what to do when messages exceed the limit.flow_control_settings=PublishFlowControl(message_limit=100,# 100 messagesbyte_limit=10*1024*1024,# 10 MiBlimit_exceeded_behavior=LimitExceededBehavior.BLOCK,)publisher=pubsub_v1.PublisherClient(publisher_options=PublisherOptions(flow_control=flow_control_settings))topic_path=publisher.topic_path(project_id,topic_id)publish_futures=[]# Resolve the publish future in a separate thread.defcallback(publish_future:pubsub_v1.publisher.futures.Future)->None:message_id=publish_future.result()print(message_id)# Publish 1000 messages in quick succession may be constrained by# publisher flow control.forninrange(1,1000):data_str=f"Message number{n}"# Data must be a bytestringdata=data_str.encode("utf-8")publish_future=publisher.publish(topic_path,data)# Non-blocking. Allow the publisher client to batch messages.publish_future.add_done_callback(callback)publish_futures.append(publish_future)futures.wait(publish_futures,return_when=futures.ALL_COMPLETED)print(f"Published messages with flow control settings to{topic_path}.")Ruby
The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3.To see a list of Ruby v2 code samples, seethe deprecated code samples.
Before trying this sample, follow the Ruby setup instructions inQuickstart: Using Client Libraries.For more information, see thePub/Sub Ruby API reference documentation.
# topic_id = "your-topic-id"pubsub=Google::Cloud::PubSub.newpublisher=pubsub.publishertopic_id,async:{# Configure how many messages the publisher client can hold in memory# and what to do when messages exceed the limit.flow_control:{message_limit:100,byte_limit:10*1024*1024,# 10 MiB# Block more messages from being published when the limit is reached. The# other options are :ignore and :error.limit_exceeded_behavior::block}}# Rapidly publishing 1000 messages in a loop may be constrained by flow# control.1000.timesdo|i|publisher.publish_async"message#{i}"do|result|raise"Failed to publish the message."unlessresult.succeeded?endend# Stop the async_publisher to send all queued messages immediately.publisher.async_publisher.stop.wait!puts"Published messages with flow control settings to#{topic_id}."What's next
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.