Receiving messages from Lite subscriptions

Note: Pub/Sub Lite is deprecated. Effective March 18, 2026, Pub/Sub Lite will be turned down.
  • Current customers: Pub/Sub Lite remains functional until March 18, 2026.
    If you have not used Pub/Sub Lite within the 90-day period preceding July 15, 2025 (April 15, 2025 - July 15, 2025), you won't be able to access Pub/Sub Lite starting on July 15, 2025.
  • New customers: Pub/Sub Lite is no longer available for new customers after September 24, 2024.

You can migrate your Pub/Sub Lite service toGoogle Cloud Managed Service for Apache Kafka orPub/Sub.

This page explains how to receive messages from Lite subscriptions. You canreceive messages with the Pub/Sub Lite client library for Java.

Lite subscriptions connect Lite topics to subscriber applications; subscribersreceive messages from Lite subscriptions. Subscribers receive every message thatpublisher applications send to the Lite topic, including the messages thatpublishers send before you create the Lite subscription.

Before receiving messages from a Lite subscription,create a Lite topic,create a Lite subscription to the Lite topic,andpublish messages to the Lite topic.

Receiving messages

To receive messages from a Lite subscription, request messages from the Litesubscription. The client library automatically connects to thepartitions in the Lite topicattached to the Lite subscription. If more than one subscriber client isinstantiated, messages will be distributed across all clients. The number ofpartitions in the topic determines the maximum number of subscriber clientsthat can simultaneously connect to a subscription.

Subscribers might take up to one minute to initialize and start receivingmessages. After initialization, messages are received with minimal latency.

The following sample shows you how to receive messages from Lite subscriptions:

gcloud

This command requires Python 3.6 or greater, and requires the grpcio Pythonpackage to be installed. For MacOS, Linux, and Cloud Shell users, run:

sudopip3installgrpcioexportCLOUDSDK_PYTHON_SITEPACKAGES=1

To receive messages, use thegcloud pubsub lite-subscriptions subscribecommand:

gcloudpubsublite-subscriptionssubscribeSUBSCRIPTION_ID\--location=LITE_LOCATION\--auto-ack

Replace the following:

  • SUBSCRIPTION_ID: the ID of the Lite subscription
  • LITE_LOCATION: thelocation of the Lite subscription

Go

Before running this sample, follow the Go setup instructions inPub/Sub Lite Client Libraries.

packagemainimport("context""flag""fmt""log""sync/atomic""time""cloud.google.com/go/pubsub""cloud.google.com/go/pubsublite/pscompat")funcmain(){// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing// published messages when running this sample.projectID:=flag.String("project_id","","Cloud Project ID")zone:=flag.String("zone","","Cloud Zone where the topic resides, e.g. us-central1-a")subscriptionID:=flag.String("subscription_id","","Existing Pub/Sub Lite subscription")timeout:=flag.Duration("timeout",90*time.Second,"The duration to receive messages")flag.Parse()ctx:=context.Background()subscriptionPath:=fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s",*projectID,*zone,*subscriptionID)// Configure flow control settings. These settings apply per partition.// The message stream is paused based on the maximum size or number of// messages that the subscriber has already received, whichever condition is// met first.settings:=pscompat.ReceiveSettings{// 10 MiB. Must be greater than the allowed size of the largest message// (1 MiB).MaxOutstandingBytes:10*1024*1024,// 1,000 outstanding messages. Must be > 0.MaxOutstandingMessages:1000,}// Create the subscriber client.subscriber,err:=pscompat.NewSubscriberClientWithSettings(ctx,subscriptionPath,settings)iferr!=nil{log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v",err)}// Listen for messages until the timeout expires.log.Printf("Listening to messages on %s for %v...\n",subscriptionPath,*timeout)cctx,cancel:=context.WithTimeout(ctx,*timeout)defercancel()varreceiveCountint32// Receive blocks until the context is cancelled or an error occurs.iferr:=subscriber.Receive(cctx,func(ctxcontext.Context,msg*pubsub.Message){// NOTE: May be called concurrently; synchronize access to shared memory.atomic.AddInt32(&receiveCount,1)// Metadata decoded from the message ID contains the partition and offset.metadata,err:=pscompat.ParseMessageMetadata(msg.ID)iferr!=nil{log.Fatalf("Failed to parse %q: %v",msg.ID,err)}fmt.Printf("Received (partition=%d, offset=%d): %s\n",metadata.Partition,metadata.Offset,string(msg.Data))msg.Ack()});err!=nil{log.Fatalf("SubscriberClient.Receive error: %v",err)}fmt.Printf("Received %d messages\n",receiveCount)}

Java

Before running this sample, follow the Java setup instructions inPub/Sub Lite Client Libraries.

importcom.google.api.gax.rpc.ApiException;importcom.google.cloud.pubsub.v1.AckReplyConsumer;importcom.google.cloud.pubsub.v1.MessageReceiver;importcom.google.cloud.pubsublite.CloudRegion;importcom.google.cloud.pubsublite.CloudRegionOrZone;importcom.google.cloud.pubsublite.CloudZone;importcom.google.cloud.pubsublite.MessageMetadata;importcom.google.cloud.pubsublite.ProjectNumber;importcom.google.cloud.pubsublite.SubscriptionName;importcom.google.cloud.pubsublite.SubscriptionPath;importcom.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;importcom.google.cloud.pubsublite.cloudpubsub.MessageTransforms;importcom.google.cloud.pubsublite.cloudpubsub.Subscriber;importcom.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;importcom.google.protobuf.Timestamp;importcom.google.pubsub.v1.PubsubMessage;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;publicclassSubscriberExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringcloudRegion="your-cloud-region";charzoneId='b';// Choose an existing subscription for the subscribe example to work.StringsubscriptionId="your-subscription-id";longprojectNumber=Long.parseLong("123456789");// True if using a regional location. False if using a zonal location.// https://cloud.google.com/pubsub/lite/docs/topicsbooleanregional=false;subscriberExample(cloudRegion,zoneId,projectNumber,subscriptionId,regional);}publicstaticvoidsubscriberExample(StringcloudRegion,charzoneId,longprojectNumber,StringsubscriptionId,booleanregional)throwsApiException{CloudRegionOrZonelocation;if(regional){location=CloudRegionOrZone.of(CloudRegion.of(cloudRegion));}else{location=CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion),zoneId));}SubscriptionPathsubscriptionPath=SubscriptionPath.newBuilder().setLocation(location).setProject(ProjectNumber.of(projectNumber)).setName(SubscriptionName.of(subscriptionId)).build();// The message stream is paused based on the maximum size or number of messages that the// subscriber has already received, whichever condition is met first.FlowControlSettingsflowControlSettings=FlowControlSettings.builder()// 10 MiB. Must be greater than the allowed size of the largest message (1 MiB)..setBytesOutstanding(10*1024*1024L)// 1,000 outstanding messages. Must be >0..setMessagesOutstanding(1000L).build();MessageReceiverreceiver=(PubsubMessagemessage,AckReplyConsumerconsumer)->{System.out.println("Id : "+MessageMetadata.decode(message.getMessageId()));System.out.println("Data : "+message.getData().toStringUtf8());System.out.println("Ordering key : "+message.getOrderingKey());System.out.println("Attributes : ");message.getAttributesMap().forEach((key,value)->{if(key==MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO){Timestampts=MessageTransforms.decodeAttributeEventTime(value);System.out.println(key+" = "+ts.toString());}else{System.out.println(key+" = "+value);}});// Acknowledge the message.consumer.ack();};SubscriberSettingssubscriberSettings=SubscriberSettings.newBuilder().setSubscriptionPath(subscriptionPath).setReceiver(receiver)// Flow control settings are set at the partition level..setPerPartitionFlowControlSettings(flowControlSettings).build();Subscribersubscriber=Subscriber.create(subscriberSettings);// Start the subscriber. Upon successful starting, its state will become RUNNING.subscriber.startAsync().awaitRunning();System.out.println("Listening to messages on "+subscriptionPath.toString()+"...");try{System.out.println(subscriber.state());// Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters// unrecoverable errors before then, its state will change to FAILED and an// IllegalStateException will be thrown.subscriber.awaitTerminated(90,TimeUnit.SECONDS);}catch(TimeoutExceptiont){// Shut down the subscriber. This will change the state of the subscriber to TERMINATED.subscriber.stopAsync().awaitTerminated();System.out.println("Subscriber is shut down: "+subscriber.state());}}}

Python

Before running this sample, follow the Python setup instructions inPub/Sub Lite Client Libraries.

fromconcurrent.futures._baseimportTimeoutErrorfromgoogle.pubsub_v1importPubsubMessagefromgoogle.cloud.pubsublite.cloudpubsubimportSubscriberClientfromgoogle.cloud.pubsublite.typesimport(CloudRegion,CloudZone,FlowControlSettings,MessageMetadata,SubscriptionPath,)# TODO(developer):# project_number = 1122334455# cloud_region = "us-central1"# zone_id = "a"# subscription_id = "your-subscription-id"# timeout = 90# regional = Trueifregional:location=CloudRegion(cloud_region)else:location=CloudZone(CloudRegion(cloud_region),zone_id)subscription_path=SubscriptionPath(project_number,location,subscription_id)# Configure when to pause the message stream for more incoming messages based on the# maximum size or number of messages that a single-partition subscriber has received,# whichever condition is met first.per_partition_flow_control_settings=FlowControlSettings(# 1,000 outstanding messages. Must be >0.messages_outstanding=1000,# 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).bytes_outstanding=10*1024*1024,)defcallback(message:PubsubMessage):message_data=message.data.decode("utf-8")metadata=MessageMetadata.decode(message.message_id)print(f"Received{message_data} of ordering key{message.ordering_key} with id{metadata}.")message.ack()# SubscriberClient() must be used in a `with` block or have __enter__() called before use.withSubscriberClient()assubscriber_client:streaming_pull_future=subscriber_client.subscribe(subscription_path,callback=callback,per_partition_flow_control_settings=per_partition_flow_control_settings,)print(f"Listening for messages on{str(subscription_path)}...")try:streaming_pull_future.result(timeout=timeout)exceptTimeoutErrororKeyboardInterrupt:streaming_pull_future.cancel()assertstreaming_pull_future.done()

The client library establishes bidirectional streaming connections to eachof the partitions in the Lite topic.

  1. The subscriber requests connections to the partitions.

  2. The Pub/Sub Lite service delivers the messages to the subscriber.

After the subscriber processes the message, the subscriber mustacknowledge the message. The client libraryasynchronously processes and acknowledges messages in a callback. To limit thenumber of unacknowledged messages the subscriber can store in memory,configure the flow control settings.

If multiple subscribers receive messages from the same Lite subscription, thePub/Sub Lite service connects each subscriber to an equal proportionof partitions. For example, if two subscribers use the same Lite subscriptionand the Lite subscription is attached to a Lite topic with two partitions, eachsubscriber receives messages from one of the partitions.

Acknowledging messages

To acknowledge a message, send an acknowledgment to the Lite subscription.

Subscribers must acknowledge every message. Subscribers receive the oldestunacknowledged message first, followed by each subsequent message. If asubscriber skips one message, acknowledges the subsequent messages, and thenreconnects, the subscriber receives the unacknowledged message and eachsubsequent, acknowledged message.

Lite subscriptions don't have an acknowledgment deadline and thePub/Sub Lite service doesn't redeliver unacknowledged messages over anopen streaming connection.

Using flow control

After the Pub/Sub Lite service delivers messages to subscribers, thesubscribers store unacknowledged messages in memory. You can limit the number ofoutstanding messages that subscribers can store in memory using flow controlsettings. The flow control settings apply to each partition that a subscriberreceives messages from.

You can configure the following flow control settings:

The size of a message is in thesize_bytes field.You can configure flow control settings with the client library.

Go

To configure flow control settings, pass inReceiveSettings when callingpscompat.NewSubscriberClientWithSettings. You can set the following parameters inReceiveSettings:

  • MaxOutstandingMessages

  • MaxOutstandingBytes

For an example, seethis flow control sample.

Java

To configure flow control settings, use the following methods in theFlowControlRequest.Builderclass:

Python

To configure flow control settings, set the following parameters in theFlowControlSettingsclass:

  • bytes_outstanding

  • messages_outstanding

For example, if the maximum number of messages is 100 and the subscriberconnects to 10 partitions, the subscriber cannot receive more than 100 messagesfrom any of the 10 partitions. The total number of outstanding messages might begreater than 100, but the subscriber cannot store more than 100 messages fromeach partition.

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.