Publishing messages to Lite topics

Note: Pub/Sub Lite is deprecated. Effective March 18, 2026, Pub/Sub Lite will be turned down.
  • Current customers: Pub/Sub Lite remains functional until March 18, 2026.
    If you have not used Pub/Sub Lite within the 90-day period preceding July 15, 2025 (April 15, 2025 - July 15, 2025), you won't be able to access Pub/Sub Lite starting on July 15, 2025.
  • New customers: Pub/Sub Lite is no longer available for new customers after September 24, 2024.

You can migrate your Pub/Sub Lite service toGoogle Cloud Managed Service for Apache Kafka orPub/Sub.

This page explains how to publish messages to Lite topics. You can publishmessages with the Pub/Sub Lite client library for Java.

After publishing messages andcreating a Lite subscription to a Lite topic,you canreceive messages from the Litesubscription.

Message format

A message consists of fields with the message data and metadata.Specify any of the following in the message:

The client library automatically assigns the message to a partition, and thePub/Sub Lite service adds the following fields to the message:

  • A message ID unique within the partition
  • A timestamp for when the Pub/Sub Lite service stores the message inthe partition

Publishing messages

To publish messages, request a streaming connection to the Lite topic and thensend messages over the streaming connection.

Note: The publisher client object is long lived and maintains a connection toPub/Sub Lite. This object should not be created for each message- itshould be reused for all messages for the same topic coming from the sameprogram.

The following sample shows you how to publish messages to a Lite topic:

gcloud

This command requires Python 3.6 or greater, and requires the grpcio Pythonpackage to be installed. For MacOS, Linux, and Cloud Shell users, run:

sudopip3installgrpcioexportCLOUDSDK_PYTHON_SITEPACKAGES=1

To publish a message, use thegcloud pubsub lite-topics publishcommand:

gcloudpubsublite-topicspublishTOPIC_ID\--location=LITE_LOCATION\--message=MESSAGE_DATA

Replace the following:

  • TOPIC_ID: the ID of the Lite topic
  • LITE_LOCATION: thelocation of the Lite topic
  • MESSAGE_DATA: a string with the message data

Go

Before running this sample, follow the Go setup instructions inPub/Sub Lite Client Libraries.

packagemainimport("context""flag""fmt""log""sync""cloud.google.com/go/pubsub""cloud.google.com/go/pubsublite/pscompat""golang.org/x/sync/errgroup")funcmain(){// NOTE: Set these flags for an existing Pub/Sub Lite topic when running this// sample.projectID:=flag.String("project_id","","Cloud Project ID")zone:=flag.String("zone","","Cloud Zone where the topic resides, e.g. us-central1-a")topicID:=flag.String("topic_id","","Existing Pub/Sub Lite topic")messageCount:=flag.Int("message_count",100,"The number of messages to send")flag.Parse()ctx:=context.Background()topicPath:=fmt.Sprintf("projects/%s/locations/%s/topics/%s",*projectID,*zone,*topicID)// Create the publisher client.publisher,err:=pscompat.NewPublisherClient(ctx,topicPath)iferr!=nil{log.Fatalf("pscompat.NewPublisherClient error: %v",err)}// Ensure the publisher will be shut down.deferpublisher.Stop()// Collect any messages that need to be republished with a new publisher// client.vartoRepublish[]*pubsub.MessagevartoRepublishMusync.Mutex// Publish messages. Messages are automatically batched.g:=new(errgroup.Group)fori:=0;i <*messageCount;i++{msg:=&pubsub.Message{Data:[]byte(fmt.Sprintf("message-%d",i)),}result:=publisher.Publish(ctx,msg)g.Go(func()error{// Get blocks until the result is ready.id,err:=result.Get(ctx)iferr!=nil{// NOTE: A failed PublishResult indicates that the publisher client// encountered a fatal error and has permanently terminated. After the// fatal error has been resolved, a new publisher client instance must// be created to republish failed messages.fmt.Printf("Publish error: %v\n",err)toRepublishMu.Lock()toRepublish=append(toRepublish,msg)toRepublishMu.Unlock()returnerr}// Metadata decoded from the id contains the partition and offset.metadata,err:=pscompat.ParseMessageMetadata(id)iferr!=nil{fmt.Printf("Failed to parse message metadata %q: %v\n",id,err)returnerr}fmt.Printf("Published: partition=%d, offset=%d\n",metadata.Partition,metadata.Offset)returnnil})}iferr:=g.Wait();err!=nil{fmt.Printf("Publishing finished with error: %v\n",err)}fmt.Printf("Published %d messages\n",*messageCount-len(toRepublish))// Print the error that caused the publisher client to terminate (if any),// which may contain more context than PublishResults.iferr:=publisher.Error();err!=nil{fmt.Printf("Publisher client terminated due to error: %v\n",publisher.Error())}}

Java

Before running this sample, follow the Java setup instructions inPub/Sub Lite Client Libraries.

importcom.google.api.core.ApiFuture;importcom.google.api.core.ApiFutures;importcom.google.api.gax.rpc.ApiException;importcom.google.cloud.pubsublite.CloudRegion;importcom.google.cloud.pubsublite.CloudRegionOrZone;importcom.google.cloud.pubsublite.CloudZone;importcom.google.cloud.pubsublite.MessageMetadata;importcom.google.cloud.pubsublite.ProjectNumber;importcom.google.cloud.pubsublite.TopicName;importcom.google.cloud.pubsublite.TopicPath;importcom.google.cloud.pubsublite.cloudpubsub.Publisher;importcom.google.cloud.pubsublite.cloudpubsub.PublisherSettings;importcom.google.protobuf.ByteString;importcom.google.pubsub.v1.PubsubMessage;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.ExecutionException;publicclassPublisherExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringcloudRegion="your-cloud-region";charzoneId='b';// Choose an existing topic for the publish example to work.StringtopicId="your-topic-id";longprojectNumber=Long.parseLong("123456789");intmessageCount=100;// True if using a regional location. False if using a zonal location.// https://cloud.google.com/pubsub/lite/docs/topicsbooleanregional=false;publisherExample(cloudRegion,zoneId,projectNumber,topicId,messageCount,regional);}// Publish messages to a topic.publicstaticvoidpublisherExample(StringcloudRegion,charzoneId,longprojectNumber,StringtopicId,intmessageCount,booleanregional)throwsApiException,ExecutionException,InterruptedException{CloudRegionOrZonelocation;if(regional){location=CloudRegionOrZone.of(CloudRegion.of(cloudRegion));}else{location=CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion),zoneId));}TopicPathtopicPath=TopicPath.newBuilder().setProject(ProjectNumber.of(projectNumber)).setLocation(location).setName(TopicName.of(topicId)).build();Publisherpublisher=null;List<ApiFuture<String>>futures=newArrayList<>();try{PublisherSettingspublisherSettings=PublisherSettings.newBuilder().setTopicPath(topicPath).build();publisher=Publisher.create(publisherSettings);// Start the publisher. Upon successful starting, its state will become RUNNING.publisher.startAsync().awaitRunning();for(inti=0;i <messageCount;i++){Stringmessage="message-"+i;// Convert the message to a byte string.ByteStringdata=ByteString.copyFromUtf8(message);PubsubMessagepubsubMessage=PubsubMessage.newBuilder().setData(data).build();// Publish a message. Messages are automatically batched.ApiFuture<String>future=publisher.publish(pubsubMessage);futures.add(future);}}finally{ArrayList<MessageMetadata>metadata=newArrayList<>();List<String>ackIds=ApiFutures.allAsList(futures).get();for(Stringid:ackIds){// Decoded metadata contains partition and offset.metadata.add(MessageMetadata.decode(id));}System.out.println(metadata+"\nPublished "+ackIds.size()+" messages.");if(publisher!=null){// Shut down the publisher.publisher.stopAsync().awaitTerminated();System.out.println("Publisher is shut down.");}}}}

Python

Before running this sample, follow the Python setup instructions inPub/Sub Lite Client Libraries.

fromgoogle.cloud.pubsublite.cloudpubsubimportPublisherClientfromgoogle.cloud.pubsublite.typesimport(CloudRegion,CloudZone,MessageMetadata,TopicPath,)# TODO(developer):# project_number = 1122334455# cloud_region = "us-central1"# zone_id = "a"# topic_id = "your-topic-id"# regional = Trueifregional:location=CloudRegion(cloud_region)else:location=CloudZone(CloudRegion(cloud_region),zone_id)topic_path=TopicPath(project_number,location,topic_id)# PublisherClient() must be used in a `with` block or have __enter__() called before use.withPublisherClient()aspublisher_client:data="Hello world!"api_future=publisher_client.publish(topic_path,data.encode("utf-8"))# result() blocks. To resolve API futures asynchronously, use add_done_callback().message_id=api_future.result()message_metadata=MessageMetadata.decode(message_id)print(f"Published a message to{topic_path} with partition{message_metadata.partition.value} and offset{message_metadata.cursor.offset}.")

The client library asynchronously sends messages and handles errors. If anerror occurs, the client library sends the message again.

  1. The Pub/Sub Lite service closes the stream.
  2. The client library buffers the messages and reestablishes a connection to theLite topic.
  3. The client library sends the messages in order.

After you publish a message, the Pub/Sub Lite service stores themessage in a partition and returns the message ID to the publisher.

Using ordering keys

If messages have the same ordering key, the client library assigns the messagesto the same partition. The ordering key must be a string of at most 1,024 bytes.

The ordering key is in thekey field of a message.You can set ordering keys with the client library.

gcloud

This command requires Python 3.6 or greater, and requires the grpcio Pythonpackage to be installed. For MacOS, Linux, and Cloud Shell users, run:

sudopip3installgrpcioexportCLOUDSDK_PYTHON_SITEPACKAGES=1

To publish a message, use thegcloud pubsub lite-topics publishcommand:

gcloudpubsublite-topicspublishTOPIC_ID\--location=LITE_LOCATION\--ordering-key=ORDERING_KEY\--message=MESSAGE_DATA

Replace the following:

  • TOPIC_ID: the ID of the Lite topic
  • LITE_LOCATION: thelocation of the Lite topic
  • ORDERING_KEY: a string used to assign messages to partitions
  • MESSAGE_DATA: a string with the message data

Go

Before running this sample, follow the Go setup instructions inPub/Sub Lite Client Libraries.

import("context""fmt""io""cloud.google.com/go/pubsub""cloud.google.com/go/pubsublite/pscompat")funcpublishWithOrderingKey(wio.Writer,projectID,zone,topicIDstring,messageCountint)error{// projectID := "my-project-id"// zone := "us-central1-a"// topicID := "my-topic"// messageCount := 10ctx:=context.Background()topicPath:=fmt.Sprintf("projects/%s/locations/%s/topics/%s",projectID,zone,topicID)// Create the publisher client.publisher,err:=pscompat.NewPublisherClient(ctx,topicPath)iferr!=nil{returnfmt.Errorf("pscompat.NewPublisherClient error: %w",err)}// Ensure the publisher will be shut down.deferpublisher.Stop()// Messages of the same ordering key will always get published to the same// partition. When OrderingKey is unset, messages can get published to// different partitions if more than one partition exists for the topic.varresults[]*pubsub.PublishResultfori:=0;i <messageCount;i++{r:=publisher.Publish(ctx,&pubsub.Message{OrderingKey:"test_ordering_key",Data:[]byte(fmt.Sprintf("message-%d",i)),})results=append(results,r)}// Print publish results.varpublishedCountintfor_,r:=rangeresults{// Get blocks until the result is ready.id,err:=r.Get(ctx)iferr!=nil{// NOTE: A failed PublishResult indicates that the publisher client// encountered a fatal error and has permanently terminated. After the// fatal error has been resolved, a new publisher client instance must be// created to republish failed messages.fmt.Fprintf(w,"Publish error: %v\n",err)continue}// Metadata decoded from the id contains the partition and offset.metadata,err:=pscompat.ParseMessageMetadata(id)iferr!=nil{returnfmt.Errorf("failed to parse message metadata %q: %w",id,err)}fmt.Fprintf(w,"Published: partition=%d, offset=%d\n",metadata.Partition,metadata.Offset)publishedCount++}fmt.Fprintf(w,"Published %d messages with ordering key\n",publishedCount)returnpublisher.Error()}

Java

Before running this sample, follow the Java setup instructions inPub/Sub Lite Client Libraries.

importcom.google.api.core.ApiFuture;importcom.google.api.gax.rpc.ApiException;importcom.google.cloud.pubsublite.CloudRegion;importcom.google.cloud.pubsublite.CloudRegionOrZone;importcom.google.cloud.pubsublite.CloudZone;importcom.google.cloud.pubsublite.MessageMetadata;importcom.google.cloud.pubsublite.ProjectNumber;importcom.google.cloud.pubsublite.TopicName;importcom.google.cloud.pubsublite.TopicPath;importcom.google.cloud.pubsublite.cloudpubsub.Publisher;importcom.google.cloud.pubsublite.cloudpubsub.PublisherSettings;importcom.google.protobuf.ByteString;importcom.google.pubsub.v1.PubsubMessage;importjava.util.concurrent.ExecutionException;publicclassPublishWithOrderingKeyExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringcloudRegion="your-cloud-region";charzoneId='b';// Choose an existing topic for the publish example to work.StringtopicId="your-topic-id";longprojectNumber=Long.parseLong("123456789");// True if using a regional location. False if using a zonal location.// https://cloud.google.com/pubsub/lite/docs/topicsbooleanregional=false;publishWithOrderingKeyExample(cloudRegion,zoneId,projectNumber,topicId,regional);}// Publish a message to a topic with an ordering key.publicstaticvoidpublishWithOrderingKeyExample(StringcloudRegion,charzoneId,longprojectNumber,StringtopicId,booleanregional)throwsApiException,ExecutionException,InterruptedException{CloudRegionOrZonelocation;if(regional){location=CloudRegionOrZone.of(CloudRegion.of(cloudRegion));}else{location=CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion),zoneId));}TopicPathtopicPath=TopicPath.newBuilder().setProject(ProjectNumber.of(projectNumber)).setLocation(location).setName(TopicName.of(topicId)).build();PublisherSettingspublisherSettings=PublisherSettings.newBuilder().setTopicPath(topicPath).build();Publisherpublisher=Publisher.create(publisherSettings);// Start the publisher. Upon successful starting, its state will become RUNNING.publisher.startAsync().awaitRunning();Stringmessage="message-with-ordering-key";// Convert the message to a byte string.ByteStringdata=ByteString.copyFromUtf8(message);PubsubMessagepubsubMessage=PubsubMessage.newBuilder().setData(data)// Messages of the same ordering key will always get published to the// same partition. When OrderingKey is unset, messages can get published// to different partitions if more than one partition exists for the topic..setOrderingKey("testing").build();// Publish a message.ApiFuture<String>future=publisher.publish(pubsubMessage);// Shut down the publisher.publisher.stopAsync().awaitTerminated();StringackId=future.get();MessageMetadatametadata=MessageMetadata.decode(ackId);System.out.println("Published a message with ordering key:\n"+metadata);}}

Python

Before running this sample, follow the Python setup instructions inPub/Sub Lite Client Libraries.

fromgoogle.cloud.pubsublite.cloudpubsubimportPublisherClientfromgoogle.cloud.pubsublite.typesimport(CloudRegion,CloudZone,MessageMetadata,TopicPath,)# TODO(developer):# project_number = 1122334455# cloud_region = "us-central1"# zone_id = "a"# topic_id = "your-topic-id"# num_messages = 100# regional = Trueifregional:location=CloudRegion(cloud_region)else:location=CloudZone(CloudRegion(cloud_region),zone_id)topic_path=TopicPath(project_number,location,topic_id)# PublisherClient() must be used in a `with` block or have __enter__() called before use.withPublisherClient()aspublisher_client:formessageinrange(num_messages):data=f"{message}"# Messages of the same ordering key will always get published to the same partition.# When ordering_key is unset, messsages can get published ot different partitions if# more than one partition exists for the topic.api_future=publisher_client.publish(topic_path,data.encode("utf-8"),ordering_key="testing")# result() blocks. To resolve api futures asynchronously, use add_done_callback().message_id=api_future.result()message_metadata=MessageMetadata.decode(message_id)print(f"Published{data} to partition{message_metadata.partition.value} and offset{message_metadata.cursor.offset}.")print(f"Finished publishing{num_messages} messages with an ordering key to{str(topic_path)}.")

You can send multiple messages to the same partition using ordering keys, sosubscribers receive the messagesin order. The clientlibrary might assign multiple ordering keys to the same partition.

Set the event time

You can use event time to publish your Lite messages. Event time is a customattribute that you can add to your message.

You can set the event timestamp with the client library or the gcloud CLI.

This command requires Python 3.6 or greater, and requires the grpcio Pythonpackage to be installed. For MacOS, Linux, and Cloud Shell users, run:

sudopip3installgrpcioexportCLOUDSDK_PYTHON_SITEPACKAGES=1

To publish a message, use thegcloud pubsub lite-topics publishcommand:

gcloudpubsublite-topicspublishTOPIC_ID\--location=LITE_LOCATION\--event-time=EVENT_TIME\--message=MESSAGE_DATA

Replace the following:

  • TOPIC_ID: the ID of the Lite topic

  • LITE_LOCATION: thelocation of the Lite topic

  • EVENT_TIME: a user-specified event time. For more informationabout time formats, rungcloud topic datetimes.

  • MESSAGE_DATA: a string with the message data

Using attributes

Message attributes are key-value pairs with metadata about the message. Theattributes can be text or byte strings.

The attributes are in theattributes fieldof a message. You can set attributes with the client library.

gcloud

This command requires Python 3.6 or greater, and requires the grpcio Pythonpackage to be installed. For MacOS, Linux, and Cloud Shell users, run:

sudopip3installgrpcioexportCLOUDSDK_PYTHON_SITEPACKAGES=1

To publish a message, use thegcloud pubsub lite-topics publishcommand:

gcloudpubsublite-topicspublishTOPIC_ID\--location=LITE_LOCATION\--message=MESSAGE_DATA\--attribute=KEY=VALUE,...

Replace the following:

  • TOPIC_ID: the ID of the Lite topic
  • LITE_LOCATION: thelocation of the Lite topic
  • MESSAGE_DATA: a string with the message data
  • KEY: the key of amessage attribute
  • VALUE: the value for the key of the message attribute

Go

Before running this sample, follow the Go setup instructions inPub/Sub Lite Client Libraries.

import("context""fmt""io""cloud.google.com/go/pubsub""cloud.google.com/go/pubsublite/pscompat")funcpublishWithCustomAttributes(wio.Writer,projectID,zone,topicIDstring)error{// projectID := "my-project-id"// zone := "us-central1-a"// topicID := "my-topic"ctx:=context.Background()topicPath:=fmt.Sprintf("projects/%s/locations/%s/topics/%s",projectID,zone,topicID)// Create the publisher client.publisher,err:=pscompat.NewPublisherClient(ctx,topicPath)iferr!=nil{returnfmt.Errorf("pscompat.NewPublisherClient error: %w",err)}// Ensure the publisher will be shut down.deferpublisher.Stop()// Publish a message with custom attributes.result:=publisher.Publish(ctx,&pubsub.Message{Data:[]byte("message-with-custom-attributes"),Attributes:map[string]string{"year":"2020","author":"unknown",},})// Get blocks until the result is ready.id,err:=result.Get(ctx)iferr!=nil{returnfmt.Errorf("publish error: %w",err)}fmt.Fprintf(w,"Published a message with custom attributes: %v\n",id)returnpublisher.Error()}

Java

Before running this sample, follow the Java setup instructions inPub/Sub Lite Client Libraries.

importcom.google.api.core.ApiFuture;importcom.google.api.gax.rpc.ApiException;importcom.google.cloud.pubsublite.CloudRegion;importcom.google.cloud.pubsublite.CloudRegionOrZone;importcom.google.cloud.pubsublite.CloudZone;importcom.google.cloud.pubsublite.MessageMetadata;importcom.google.cloud.pubsublite.ProjectNumber;importcom.google.cloud.pubsublite.TopicName;importcom.google.cloud.pubsublite.TopicPath;importcom.google.cloud.pubsublite.cloudpubsub.MessageTransforms;importcom.google.cloud.pubsublite.cloudpubsub.Publisher;importcom.google.cloud.pubsublite.cloudpubsub.PublisherSettings;importcom.google.common.collect.ImmutableMap;importcom.google.protobuf.ByteString;importcom.google.protobuf.util.Timestamps;importcom.google.pubsub.v1.PubsubMessage;importjava.time.Instant;importjava.util.concurrent.ExecutionException;publicclassPublishWithCustomAttributesExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringcloudRegion="your-cloud-region";charzoneId='b';// Choose an existing topic for the publish example to work.StringtopicId="your-topic-id";longprojectNumber=Long.parseLong("123456789");// True if using a regional location. False if using a zonal location.// https://cloud.google.com/pubsub/lite/docs/topicsbooleanregional=false;publishWithCustomAttributesExample(cloudRegion,zoneId,projectNumber,topicId,regional);}// Publish messages to a topic with custom attributes.publicstaticvoidpublishWithCustomAttributesExample(StringcloudRegion,charzoneId,longprojectNumber,StringtopicId,booleanregional)throwsApiException,ExecutionException,InterruptedException{CloudRegionOrZonelocation;if(regional){location=CloudRegionOrZone.of(CloudRegion.of(cloudRegion));}else{location=CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion),zoneId));}TopicPathtopicPath=TopicPath.newBuilder().setProject(ProjectNumber.of(projectNumber)).setLocation(location).setName(TopicName.of(topicId)).build();PublisherSettingspublisherSettings=PublisherSettings.newBuilder().setTopicPath(topicPath).build();Publisherpublisher=Publisher.create(publisherSettings);// Start the publisher. Upon successful starting, its state will become RUNNING.publisher.startAsync().awaitRunning();// Prepare the message data as a byte string.StringmessageData="message-with-custom-attributes";ByteStringdata=ByteString.copyFromUtf8(messageData);// Prepare a protobuf-encoded event timestamp for the message.Instantnow=Instant.now();StringeventTime=MessageTransforms.encodeAttributeEventTime(Timestamps.fromMillis(now.toEpochMilli()));PubsubMessagepubsubMessage=PubsubMessage.newBuilder().setData(data)// Add two sets of custom attributes to the message..putAllAttributes(ImmutableMap.of("year","2020","author","unknown"))// Add an event timestamp as an attribute..putAttributes(MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO,eventTime).build();// Publish a message.ApiFuture<String>future=publisher.publish(pubsubMessage);// Shut down the publisher.publisher.stopAsync().awaitTerminated();StringackId=future.get();MessageMetadatametadata=MessageMetadata.decode(ackId);System.out.println("Published a message with custom attributes:\n"+metadata);}}

Python

Before running this sample, follow the Python setup instructions inPub/Sub Lite Client Libraries.

fromgoogle.cloud.pubsublite.cloudpubsubimportPublisherClientfromgoogle.cloud.pubsublite.typesimport(CloudRegion,CloudZone,MessageMetadata,TopicPath,)# TODO(developer):# project_number = 1122334455# cloud_region = "us-central1"# zone_id = "a"# topic_id = "your-topic-id"# regional = Trueifregional:location=CloudRegion(cloud_region)else:location=CloudZone(CloudRegion(cloud_region),zone_id)topic_path=TopicPath(project_number,location,topic_id)# PublisherClient() must be used in a `with` block or have __enter__() called before use.withPublisherClient()aspublisher_client:data="Hello world!"api_future=publisher_client.publish(topic_path,data.encode("utf-8"),year="2020",author="unknown",)# result() blocks. To resolve api futures asynchronously, use add_done_callback().message_id=api_future.result()message_metadata=MessageMetadata.decode(message_id)print(f"Published{data} to partition{message_metadata.partition.value} and offset{message_metadata.cursor.offset}.")print(f"Finished publishing a message with custom attributes to{str(topic_path)}.")

Attributes can indicate how to process a message. Subscribers can parse theattributes field of a message and process the message according to itsattributes.

Batching messages

The client library publishes messages in batches. Larger batches use fewercompute resources but increase latency. You can change the batch size withbatching settings.

The following table lists the batching settings that you can configure:

SettingDescriptionDefault
Request sizeThe maximum size, in bytes, of the batch.3.5 MiB
Number of messagesThe maximum number of messages in a batch.1,000 messages
Publish delayThe amount of time, in milliseconds, between adding the message to a batch and sending the batch to the Lite topic.50 milliseconds

You can configure batching settings with the client library.

Go

Before running this sample, follow the Go setup instructions inPub/Sub Lite Client Libraries.

import("context""fmt""io""time""cloud.google.com/go/pubsub""cloud.google.com/go/pubsublite/pscompat")funcpublishWithBatchSettings(wio.Writer,projectID,zone,topicIDstring,messageCountint)error{// projectID := "my-project-id"// zone := "us-central1-a"// topicID := "my-topic"// messageCount := 10ctx:=context.Background()topicPath:=fmt.Sprintf("projects/%s/locations/%s/topics/%s",projectID,zone,topicID)// Batch settings control how the publisher batches messages. These settings// apply per partition.// See https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-variables// for DefaultPublishSettings.settings:=pscompat.PublishSettings{ByteThreshold:5*1024,// 5 KiBCountThreshold:1000,// 1,000 messagesDelayThreshold:100*time.Millisecond,}// Create the publisher client.publisher,err:=pscompat.NewPublisherClientWithSettings(ctx,topicPath,settings)iferr!=nil{returnfmt.Errorf("pscompat.NewPublisherClientWithSettings error: %w",err)}// Ensure the publisher will be shut down.deferpublisher.Stop()// Publish requests are sent to the server based on request size, message// count and time since last publish, whichever condition is met first.varresults[]*pubsub.PublishResultfori:=0;i <messageCount;i++{r:=publisher.Publish(ctx,&pubsub.Message{Data:[]byte(fmt.Sprintf("message-%d",i)),})results=append(results,r)}// Print publish results.varpublishedCountintfor_,r:=rangeresults{// Get blocks until the result is ready.id,err:=r.Get(ctx)iferr!=nil{// NOTE: A failed PublishResult indicates that the publisher client// encountered a fatal error and has permanently terminated. After the// fatal error has been resolved, a new publisher client instance must be// created to republish failed messages.fmt.Fprintf(w,"Publish error: %v\n",err)continue}fmt.Fprintf(w,"Published: %v\n",id)publishedCount++}fmt.Fprintf(w,"Published %d messages with batch settings\n",publishedCount)returnpublisher.Error()}

Java

Before running this sample, follow the Java setup instructions inPub/Sub Lite Client Libraries.

importcom.google.api.core.ApiFuture;importcom.google.api.core.ApiFutures;importcom.google.api.gax.batching.BatchingSettings;importcom.google.api.gax.rpc.ApiException;importcom.google.cloud.pubsublite.CloudRegion;importcom.google.cloud.pubsublite.CloudRegionOrZone;importcom.google.cloud.pubsublite.CloudZone;importcom.google.cloud.pubsublite.MessageMetadata;importcom.google.cloud.pubsublite.ProjectNumber;importcom.google.cloud.pubsublite.TopicName;importcom.google.cloud.pubsublite.TopicPath;importcom.google.cloud.pubsublite.cloudpubsub.Publisher;importcom.google.cloud.pubsublite.cloudpubsub.PublisherSettings;importcom.google.protobuf.ByteString;importcom.google.pubsub.v1.PubsubMessage;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.ExecutionException;importorg.threeten.bp.Duration;publicclassPublishWithBatchSettingsExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringcloudRegion="your-cloud-region";charzoneId='b';// Choose an existing topic for the publish example to work.StringtopicId="your-topic-id";longprojectNumber=Long.parseLong("123456789");intmessageCount=100;// True if using a regional location. False if using a zonal location.// https://cloud.google.com/pubsub/lite/docs/topicsbooleanregional=false;publishWithBatchSettingsExample(cloudRegion,zoneId,projectNumber,topicId,messageCount,regional);}// Publish messages to a topic with batch settings.publicstaticvoidpublishWithBatchSettingsExample(StringcloudRegion,charzoneId,longprojectNumber,StringtopicId,intmessageCount,booleanregional)throwsApiException,ExecutionException,InterruptedException{CloudRegionOrZonelocation;if(regional){location=CloudRegionOrZone.of(CloudRegion.of(cloudRegion));}else{location=CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion),zoneId));}TopicPathtopicPath=TopicPath.newBuilder().setProject(ProjectNumber.of(projectNumber)).setLocation(location).setName(TopicName.of(topicId)).build();Publisherpublisher=null;List<ApiFuture<String>>futures=newArrayList<>();try{// Batch settings control how the publisher batches messageslongrequestBytesThreshold=5000L;// default : 3_500_000 byteslongmessageCountBatchSize=100L;// default : 1000L messageDurationpublishDelayThreshold=Duration.ofMillis(100);// default : 50 ms// Publish request get triggered based on request size, messages count & time since last// publish, whichever condition is met first.BatchingSettingsbatchingSettings=BatchingSettings.newBuilder().setRequestByteThreshold(requestBytesThreshold).setElementCountThreshold(messageCountBatchSize).setDelayThreshold(publishDelayThreshold).build();PublisherSettingspublisherSettings=PublisherSettings.newBuilder().setTopicPath(topicPath).setBatchingSettings(batchingSettings).build();publisher=Publisher.create(publisherSettings);// Start the publisher. Upon successful starting, its state will become RUNNING.publisher.startAsync().awaitRunning();for(inti=0;i <messageCount;i++){Stringmessage="message-"+i;// Convert the message to a byte string.ByteStringdata=ByteString.copyFromUtf8(message);PubsubMessagepubsubMessage=PubsubMessage.newBuilder().setData(data).build();// Publish a message.ApiFuture<String>future=publisher.publish(pubsubMessage);futures.add(future);}}finally{ArrayList<MessageMetadata>metadata=newArrayList<>();List<String>ackIds=ApiFutures.allAsList(futures).get();System.out.println("Published "+ackIds.size()+" messages with batch settings.");if(publisher!=null){// Shut down the publisher.publisher.stopAsync().awaitTerminated();}}}}

Python

Before running this sample, follow the Python setup instructions inPub/Sub Lite Client Libraries.

fromgoogle.cloud.pubsub_v1.typesimportBatchSettingsfromgoogle.cloud.pubsublite.cloudpubsubimportPublisherClientfromgoogle.cloud.pubsublite.typesimport(CloudRegion,CloudZone,MessageMetadata,TopicPath,)# TODO(developer):# project_number = 1122334455# cloud_region = "us-central1"# zone_id = "a"# topic_id = "your-topic-id"# num_messages = 100# regional = Trueifregional:location=CloudRegion(cloud_region)else:location=CloudZone(CloudRegion(cloud_region),zone_id)topic_path=TopicPath(project_number,location,topic_id)batch_setttings=BatchSettings(# 2 MiB. Default to 3 MiB. Must be less than 4 MiB gRPC's per-message limit.max_bytes=2*1024*1024,# 100 ms. Default to 50 ms.max_latency=0.1,# Default to 1000.max_messages=100,)# PublisherClient() must be used in a `with` block or have __enter__() called before use.withPublisherClient(per_partition_batching_settings=batch_setttings)aspublisher_client:formessageinrange(num_messages):data=f"{message}"api_future=publisher_client.publish(topic_path,data.encode("utf-8"))# result() blocks. To resolve API futures asynchronously, use add_done_callback().message_id=api_future.result()message_metadata=MessageMetadata.decode(message_id)print(f"Published{data} to partition{message_metadata.partition.value} and offset{message_metadata.cursor.offset}.")print(f"Finished publishing{num_messages} messages with batch settings to{str(topic_path)}.")

When a publisher application starts, the client library creates a batch for eachpartition in a Lite topic. For example, if a Lite topic has two partitions,publishers create two batches and send each batch to a partition.

After you publish a message, the client library buffers it until the batchexceeds the maximum request size, the maximum number of messages, or thepublish delay.

Ordering messages

Lite topics order messages in each partition by when you publish the messages.To assign messages to the same partition,use an ordering key.

Pub/Sub Lite delivers the messages from a partition in order, andsubscribers can process the messages in order. For details, seeReceiving messages.

Publish Idempotency

Pub/Sub Lite client libraries support idempotent publishing, from thefollowing versions:

If publishing of a message is retried due to network or server errors, it isstored exactly once. Idempotency is guaranteed only within the same session; itcannot be guaranteed if the same message is republished using a new publisherclient. It does not incur any additional service costs or increase publishlatency.

Enable or disable idempotent publishing

Idempotent publishing is enabled by default in the Pub/Sub Lite clientlibraries. It can be disabled using publisher client settings in the respectiveclient library.

If idempotent publishing is enabled, the offset returned in a publish resultmight be-1. This value is returned when the message is identified as aduplicate of an already successfully published message, but the server did nothave sufficient information to return the offset of the message at publish time.Messages received by subscribers always have a valid offset.

Troubleshooting

Duplicates received

As idempotency is limited to a single session, duplicates might be received ifyou recreate the publisher client to publish the same messages.

A subscriber client might receive the same message multiple times if partitionsare automatically assigned to subscribers by the Pub/Sub Lite service(the default setting). A message might be redelivered to another subscriberclient when a reassignment occurs.

Publisher error

State for a publisher session is garbage collected in the server after 7 days ofinactivity. If a session is resumed after this time period, the publisher clientterminates with an error message similar to "Failed Precondition: Expectedmessage to have publish sequence number of..." and does not accept new messages.Recreate the publisher client to resolve this error.

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.