Parse messages from a topic with a schema Stay organized with collections Save and categorize content based on your preferences.
Topics may use schemas to define a format that their messages must follow.When subscribing to a topic with a schema, the messages sent to the subscriberare guaranteed to be valid messages. These messages conform to thetype and encoding specified in the schema settings associated with the topic.
The subscriber can determine the schema settings associated with a topicby looking at the following attributes:
googclient_schemaname: The name of the schema used for validation.If the schema is deleted, the name is_deleted-schema_.googclient_schemaencoding: The encoding of the message, either JSONor BINARY.googclient_schemarevisionid: The revision ID of the schemaused to parse and validate the message. Each revision has aunique revision ID associated with it. The revision ID is anauto-generated eight-character UUID. If the revision is deleted, the ID is_deleted-schema-revision_.
To learn more about schemas, seeSchema overview.
Code samples for subscribing to topics associated with a schema
These samples show how to process messages when subscribing totopics configured with schema.
C++
Before trying this sample, follow the C++ setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub C++ API reference documentation.
Avronamespacepubsub=::google::cloud::pubsub;using::google::cloud::future;using::google::cloud::StatusOr;return[](pubsub::Subscribersubscriber){autosession=subscriber.Subscribe([](pubsub::Messageconst&m,pubsub::AckHandlerh){std::cout <<"Message contents: " <<m.data() <<"\n";std::move(h).ack();});returnsession;}namespacepubsub=::google::cloud::pubsub;using::google::cloud::future;using::google::cloud::StatusOr;return[](pubsub::Subscribersubscriber){autosession=subscriber.Subscribe([](pubsub::Messageconst&m,pubsub::AckHandlerh){google::cloud::pubsub::samples::Statestate;state.ParseFromString(std::string{m.data()});std::cout <<"Message contents: " <<state.DebugString() <<"\n";std::move(h).ack();});returnsession;}C#
Before trying this sample, follow the C# setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub C# API reference documentation.
AvrousingAvro.IO;usingAvro.Specific;usingGoogle.Api.Gax;usingGoogle.Cloud.PubSub.V1;usingNewtonsoft.Json;usingSystem;usingSystem.IO;usingSystem.Threading;usingSystem.Threading.Tasks;publicclassPullAvroMessagesAsyncSample{publicasyncTask<int>PullAvroMessagesAsync(stringprojectId,stringsubscriptionId,boolacknowledge){SubscriptionNamesubscriptionName=SubscriptionName.FromProjectSubscription(projectId,subscriptionId);intmessageCount=0;SubscriberClientsubscriber=awaitnewSubscriberClientBuilder{SubscriptionName=subscriptionName,Settings=newSubscriberClient.Settings{AckExtensionWindow=TimeSpan.FromSeconds(4),AckDeadline=TimeSpan.FromSeconds(10),FlowControlSettings=newFlowControlSettings(maxOutstandingElementCount:100,maxOutstandingByteCount:10240)}}.BuildAsync();// SubscriberClient runs your message handle function on multiple// threads to maximize throughput.TaskstartTask=subscriber.StartAsync((PubsubMessagemessage,CancellationTokencancel)=>{stringencoding=message.Attributes["googclient_schemaencoding"];// AvroUtilities is a namespace. Below are files using the namespace.// https://github.com/GoogleCloudPlatform/dotnet-docs-samples/blob/main/pubsub/api/Pubsub.Samples/Utilities/State.cs// https://github.com/GoogleCloudPlatform/dotnet-docs-samples/blob/main/pubsub/api/Pubsub.Samples/Utilities/StateUtils.csAvroUtilities.Statestate=newAvroUtilities.State();switch(encoding){case"BINARY":using(varms=newMemoryStream(message.Data.ToByteArray())){vardecoder=newBinaryDecoder(ms);varreader=newSpecificDefaultReader(state.Schema,state.Schema);reader.Read<AvroUtilities.State>(state,decoder);}break;case"JSON":state=JsonConvert.DeserializeObject<AvroUtilities.State>(message.Data.ToStringUtf8());break;default:Console.WriteLine($"Encoding not provided in message.");break;}Console.WriteLine($"Message {message.MessageId}: {state}");Interlocked.Increment(refmessageCount);returnTask.FromResult(acknowledge?SubscriberClient.Reply.Ack:SubscriberClient.Reply.Nack);});// Run for 5 seconds.awaitTask.Delay(5000);awaitsubscriber.StopAsync(CancellationToken.None);// Lets make sure that the start task finished successfully after the call to stop.awaitstartTask;returnmessageCount;}}usingGoogle.Api.Gax;usingGoogle.Cloud.PubSub.V1;usingSystem;usingSystem.Threading;usingSystem.Threading.Tasks;publicclassPullProtoMessagesAsyncSample{publicasyncTask<int>PullProtoMessagesAsync(stringprojectId,stringsubscriptionId,boolacknowledge){SubscriptionNamesubscriptionName=SubscriptionName.FromProjectSubscription(projectId,subscriptionId);intmessageCount=0;SubscriberClientsubscriber=awaitnewSubscriberClientBuilder{SubscriptionName=subscriptionName,Settings=newSubscriberClient.Settings{AckExtensionWindow=TimeSpan.FromSeconds(4),AckDeadline=TimeSpan.FromSeconds(10),FlowControlSettings=newFlowControlSettings(maxOutstandingElementCount:100,maxOutstandingByteCount:10240)}}.BuildAsync();// SubscriberClient runs your message handle function on multiple// threads to maximize throughput.TaskstartTask=subscriber.StartAsync((PubsubMessagemessage,CancellationTokencancel)=>{stringencoding=message.Attributes["googclient_schemaencoding"];Utilities.Statestate=null;switch(encoding){case"BINARY":state=Utilities.State.Parser.ParseFrom(message.Data.ToByteArray());break;case"JSON":state=Utilities.State.Parser.ParseJson(message.Data.ToStringUtf8());break;default:Console.WriteLine($"Encoding not provided in message.");break;}Console.WriteLine($"Message {message.MessageId}: {state}");Interlocked.Increment(refmessageCount);returnTask.FromResult(acknowledge?SubscriberClient.Reply.Ack:SubscriberClient.Reply.Nack);});// Run for 5 seconds.awaitTask.Delay(5000);awaitsubscriber.StopAsync(CancellationToken.None);// Lets make sure that the start task finished successfully after the call to stop.awaitstartTask;returnmessageCount;}}Go
The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, seethe migration guide to v2.To see a list of v1 code samples, seethe deprecated code samples.
Before trying this sample, follow the Go setup instructions inQuickstart: Using Client Libraries.For more information, see thePub/Sub Go API reference documentation.
Avroimport("context""fmt""io""os""sync""time""cloud.google.com/go/pubsub/v2""github.com/linkedin/goavro/v2")funcsubscribeWithAvroSchema(wio.Writer,projectID,subID,avscFilestring)error{// projectID := "my-project-id"// topicID := "my-topic"// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"ctx:=context.Background()client,err:=pubsub.NewClient(ctx,projectID)iferr!=nil{returnfmt.Errorf("pubsub.NewClient: %w",err)}avroSchema,err:=os.ReadFile(avscFile)iferr!=nil{returnfmt.Errorf("os.ReadFile err: %w",err)}codec,err:=goavro.NewCodec(string(avroSchema))iferr!=nil{returnfmt.Errorf("goavro.NewCodec err: %w",err)}sub:=client.Subscriber(subID)ctx2,cancel:=context.WithTimeout(ctx,10*time.Second)defercancel()varmusync.Mutexsub.Receive(ctx2,func(ctxcontext.Context,msg*pubsub.Message){mu.Lock()defermu.Unlock()encoding:=msg.Attributes["googclient_schemaencoding"]varstatemap[string]interface{}ifencoding=="BINARY"{data,_,err:=codec.NativeFromBinary(msg.Data)iferr!=nil{fmt.Fprintf(w,"codec.NativeFromBinary err: %v\n",err)msg.Nack()return}fmt.Fprintf(w,"Received a binary-encoded message:\n%#v\n",data)state=data.(map[string]interface{})}elseifencoding=="JSON"{data,_,err:=codec.NativeFromTextual(msg.Data)iferr!=nil{fmt.Fprintf(w,"codec.NativeFromTextual err: %v\n",err)msg.Nack()return}fmt.Fprintf(w,"Received a JSON-encoded message:\n%#v\n",data)state=data.(map[string]interface{})}else{fmt.Fprintf(w,"Unknown message type(%s), nacking\n",encoding)msg.Nack()return}fmt.Fprintf(w,"%s is abbreviated as %s\n",state["name"],state["post_abbr"])msg.Ack()})returnnil}import("context""fmt""io""sync""time""cloud.google.com/go/pubsub/v2"statepb"github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas""google.golang.org/protobuf/encoding/protojson""google.golang.org/protobuf/proto")funcsubscribeWithProtoSchema(wio.Writer,projectID,subIDstring)error{// projectID := "my-project-id"// subID := "my-sub"ctx:=context.Background()client,err:=pubsub.NewClient(ctx,projectID)iferr!=nil{returnfmt.Errorf("pubsub.NewClient: %w",err)}// Create an instance of the message to be decoded (a single U.S. state).state:=&statepb.State{}sub:=client.Subscriber(subID)ctx2,cancel:=context.WithTimeout(ctx,10*time.Second)defercancel()varmusync.Mutexsub.Receive(ctx2,func(ctxcontext.Context,msg*pubsub.Message){mu.Lock()defermu.Unlock()encoding:=msg.Attributes["googclient_schemaencoding"]ifencoding=="BINARY"{iferr:=proto.Unmarshal(msg.Data,state);err!=nil{fmt.Fprintf(w,"proto.Unmarshal err: %v\n",err)msg.Nack()return}fmt.Printf("Received a binary-encoded message:\n%#v\n",state)}elseifencoding=="JSON"{iferr:=protojson.Unmarshal(msg.Data,state);err!=nil{fmt.Fprintf(w,"proto.Unmarshal err: %v\n",err)msg.Nack()return}fmt.Fprintf(w,"Received a JSON-encoded message:\n%#v\n",state)}else{fmt.Fprintf(w,"Unknown message type(%s), nacking\n",encoding)msg.Nack()return}fmt.Fprintf(w,"%s is abbreviated as %s\n",state.Name,state.PostAbbr)msg.Ack()})returnnil}Java
The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, seethe migration guide to v2.To see a list of v1 code samples, seethe deprecated code samples.
Before trying this sample, follow the Go setup instructions inQuickstart: Using Client Libraries.For more information, see thePub/Sub Go API reference documentation.
Avroimportcom.google.cloud.pubsub.v1.AckReplyConsumer;importcom.google.cloud.pubsub.v1.MessageReceiver;importcom.google.cloud.pubsub.v1.Subscriber;importcom.google.protobuf.ByteString;importcom.google.pubsub.v1.ProjectSubscriptionName;importcom.google.pubsub.v1.PubsubMessage;importjava.io.ByteArrayInputStream;importjava.io.IOException;importjava.io.InputStream;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;importorg.apache.avro.io.Decoder;importorg.apache.avro.io.DecoderFactory;importorg.apache.avro.specific.SpecificDatumReader;importutilities.State;publicclassSubscribeWithAvroSchemaExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringprojectId="your-project-id";// Use an existing subscription.StringsubscriptionId="your-subscription-id";subscribeWithAvroSchemaExample(projectId,subscriptionId);}publicstaticvoidsubscribeWithAvroSchemaExample(StringprojectId,StringsubscriptionId){ProjectSubscriptionNamesubscriptionName=ProjectSubscriptionName.of(projectId,subscriptionId);// Prepare a reader for the encoded Avro records.SpecificDatumReader<State>reader=newSpecificDatumReader<>(State.getClassSchema());// Instantiate an asynchronous message receiver.MessageReceiverreceiver=(PubsubMessagemessage,AckReplyConsumerconsumer)->{ByteStringdata=message.getData();// Get the schema encoding type.Stringencoding=message.getAttributesMap().get("googclient_schemaencoding");// Send the message data to a byte[] input stream.InputStreaminputStream=newByteArrayInputStream(data.toByteArray());Decoderdecoder=null;// Prepare an appropriate decoder for the message data in the input stream// based on the schema encoding type.block:try{switch(encoding){case"BINARY":decoder=DecoderFactory.get().directBinaryDecoder(inputStream,/* reuse= */null);System.out.println("Receiving a binary-encoded message:");break;case"JSON":decoder=DecoderFactory.get().jsonDecoder(State.getClassSchema(),inputStream);System.out.println("Receiving a JSON-encoded message:");break;default:breakblock;}// Obtain an object of the generated Avro class using the decoder.Statestate=reader.read(null,decoder);System.out.println(state.getName()+" is abbreviated as "+state.getPostAbbr());}catch(IOExceptione){System.err.println(e);}// Ack the message.consumer.ack();};Subscribersubscriber=null;try{subscriber=Subscriber.newBuilder(subscriptionName,receiver).build();subscriber.startAsync().awaitRunning();System.out.printf("Listening for messages on %s:\n",subscriptionName.toString());subscriber.awaitTerminated(30,TimeUnit.SECONDS);}catch(TimeoutExceptiontimeoutException){subscriber.stopAsync();}}}importcom.google.cloud.pubsub.v1.AckReplyConsumer;importcom.google.cloud.pubsub.v1.MessageReceiver;importcom.google.cloud.pubsub.v1.Subscriber;importcom.google.protobuf.ByteString;importcom.google.protobuf.InvalidProtocolBufferException;importcom.google.protobuf.util.JsonFormat;importcom.google.pubsub.v1.ProjectSubscriptionName;importcom.google.pubsub.v1.PubsubMessage;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;importutilities.StateProto.State;publicclassSubscribeWithProtoSchemaExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringprojectId="your-project-id";// Use an existing subscription.StringsubscriptionId="your-subscription-id";subscribeWithProtoSchemaExample(projectId,subscriptionId);}publicstaticvoidsubscribeWithProtoSchemaExample(StringprojectId,StringsubscriptionId){ProjectSubscriptionNamesubscriptionName=ProjectSubscriptionName.of(projectId,subscriptionId);MessageReceiverreceiver=(PubsubMessagemessage,AckReplyConsumerconsumer)->{ByteStringdata=message.getData();// Get the schema encoding type.Stringencoding=message.getAttributesMap().get("googclient_schemaencoding");block:try{switch(encoding){case"BINARY":// Obtain an object of the generated proto class.Statestate=State.parseFrom(data);System.out.println("Received a BINARY-formatted message: "+state);break;case"JSON":State.BuilderstateBuilder=State.newBuilder();JsonFormat.parser().merge(data.toStringUtf8(),stateBuilder);System.out.println("Received a JSON-formatted message:"+stateBuilder.build());break;default:breakblock;}}catch(InvalidProtocolBufferExceptione){e.printStackTrace();}consumer.ack();System.out.println("Ack'ed the message");};// Create subscriber client.Subscribersubscriber=Subscriber.newBuilder(subscriptionName,receiver).build();try{subscriber.startAsync().awaitRunning();System.out.printf("Listening for messages on %s:\n",subscriptionName);subscriber.awaitTerminated(30,TimeUnit.SECONDS);}catch(TimeoutExceptiontimeoutException){subscriber.stopAsync();}}}Node.js
Before trying this sample, follow the Node.js setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Node.js API reference documentation.
Avro/** * TODO(developer): Uncomment these variables before running the sample. */// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';// const timeout = 60;// Imports the Google Cloud client libraryconst{PubSub,Schema,Encodings}=require('@google-cloud/pubsub');// Node FS library, to load definitionsconstfs=require('fs');// And the Apache Avro libraryconstavro=require('avro-js');// Creates a client; cache this for further useconstpubSubClient=newPubSub();functionlistenForAvroRecords(subscriptionNameOrId,timeout){// References an existing subscriptionconstsubscription=pubSubClient.subscription(subscriptionNameOrId);// Make an encoder using the official avro-js library.constdefinition=fs.readFileSync('system-test/fixtures/provinces.avsc').toString();consttype=avro.parse(definition);// Create an event handler to handle messagesletmessageCount=0;constmessageHandler=asyncmessage=>{// "Ack" (acknowledge receipt of) the messagemessage.ack();// Get the schema metadata from the message.constschemaMetadata=Schema.metadataFromMessage(message.attributes);letresult;switch(schemaMetadata.encoding){caseEncodings.Binary:result=type.fromBuffer(message.data);break;caseEncodings.Json:result=type.fromString(message.data.toString());break;default:console.log(`Unknown schema encoding:${schemaMetadata.encoding}`);break;}console.log(`Received message${message.id}:`);console.log(`\tData:${JSON.stringify(result,null,4)}`);console.log(`\tAttributes:${message.attributes}`);messageCount+=1;};// Listen for new messages until timeout is hitsubscription.on('message',messageHandler);setTimeout(()=>{subscription.removeListener('message',messageHandler);console.log(`${messageCount} message(s) received.`);},timeout*1000);}/** * TODO(developer): Uncomment these variables before running the sample. */// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';// const timeout = 60;// Imports the Google Cloud client libraryconst{PubSub,Schema,Encodings}=require('@google-cloud/pubsub');// And the protobufjs libraryconstprotobuf=require('protobufjs');// Creates a client; cache this for further useconstpubSubClient=newPubSub();asyncfunctionlistenForProtobufMessages(subscriptionNameOrId,timeout){// References an existing subscriptionconstsubscription=pubSubClient.subscription(subscriptionNameOrId);// Make an decoder using the protobufjs library.//// Since we're providing the test message for a specific schema here, we'll// also code in the path to a sample proto definition.constroot=protobuf.loadSync('system-test/fixtures/provinces.proto');constProvince=root.lookupType('utilities.Province');// Create an event handler to handle messagesletmessageCount=0;constmessageHandler=asyncmessage=>{// "Ack" (acknowledge receipt of) the messagemessage.ack();// Get the schema metadata from the message.constschemaMetadata=Schema.metadataFromMessage(message.attributes);letresult;switch(schemaMetadata.encoding){caseEncodings.Binary:result=Province.decode(message.data);break;caseEncodings.Json:// This doesn't require decoding with the protobuf library,// since it's plain JSON. But you can still validate it against// your schema.result=JSON.parse(message.data.toString());console.log(`Validation of JSON:${Province.verify(result)}`);break;default:console.log(`Unknown schema encoding:${schemaMetadata.encoding}`);break;}console.log(`Received message${message.id}:`);console.log(`\tData:${JSON.stringify(result,null,4)}`);console.log(`\tAttributes:${JSON.stringify(message.attributes,null,4)}`);messageCount+=1;};// Listen for new messages until timeout is hitsubscription.on('message',messageHandler);setTimeout(()=>{subscription.removeListener('message',messageHandler);console.log(`${messageCount} message(s) received.`);},timeout*1000);}Node.js
Before trying this sample, follow the Node.js setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Node.js API reference documentation.
Avro/** * TODO(developer): Uncomment these variables before running the sample. */// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';// const timeout = 60;// Imports the Google Cloud client libraryimport{PubSub,Schema,Encodings,Message}from'@google-cloud/pubsub';// Node FS library, to load definitionsimport*asfsfrom'fs';// And the Apache Avro libraryimport*asavrofrom'avro-js';// Creates a client; cache this for further useconstpubSubClient=newPubSub();functionlistenForAvroRecords(subscriptionNameOrId:string,timeout:number){// References an existing subscriptionconstsubscription=pubSubClient.subscription(subscriptionNameOrId);// Make an encoder using the official avro-js library.constdefinition=fs.readFileSync('system-test/fixtures/provinces.avsc').toString();consttype=avro.parse(definition);// Create an event handler to handle messagesletmessageCount=0;constmessageHandler=async(message:Message)=>{// "Ack" (acknowledge receipt of) the messagemessage.ack();// Get the schema metadata from the message.constschemaMetadata=Schema.metadataFromMessage(message.attributes);letresult:object|undefined;switch(schemaMetadata.encoding){caseEncodings.Binary:result=type.fromBuffer(message.data);break;caseEncodings.Json:result=type.fromString(message.data.toString());break;default:console.log(`Unknown schema encoding:${schemaMetadata.encoding}`);break;}console.log(`Received message${message.id}:`);console.log(`\tData:${JSON.stringify(result,null,4)}`);console.log(`\tAttributes:${message.attributes}`);messageCount+=1;};// Listen for new messages until timeout is hitsubscription.on('message',messageHandler);setTimeout(()=>{subscription.removeListener('message',messageHandler);console.log(`${messageCount} message(s) received.`);},timeout*1000);}/** * TODO(developer): Uncomment these variables before running the sample. */// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';// const timeout = 60;// Imports the Google Cloud client libraryimport{PubSub,Schema,Encodings,Message}from'@google-cloud/pubsub';// And the protobufjs libraryimport*asprotobuffrom'protobufjs';// Creates a client; cache this for further useconstpubSubClient=newPubSub();asyncfunctionlistenForProtobufMessages(subscriptionNameOrId:string,timeout:number,){// References an existing subscriptionconstsubscription=pubSubClient.subscription(subscriptionNameOrId);// Make an decoder using the protobufjs library.//// Since we're providing the test message for a specific schema here, we'll// also code in the path to a sample proto definition.constroot=protobuf.loadSync('system-test/fixtures/provinces.proto');constProvince=root.lookupType('utilities.Province');// Create an event handler to handle messagesletmessageCount=0;constmessageHandler=async(message:Message)=>{// "Ack" (acknowledge receipt of) the messagemessage.ack();// Get the schema metadata from the message.constschemaMetadata=Schema.metadataFromMessage(message.attributes);letresult;switch(schemaMetadata.encoding){caseEncodings.Binary:result=Province.decode(message.data);break;caseEncodings.Json:// This doesn't require decoding with the protobuf library,// since it's plain JSON. But you can still validate it against// your schema.result=JSON.parse(message.data.toString());console.log(`Validation of JSON:${Province.verify(result)}`);break;default:console.log(`Unknown schema encoding:${schemaMetadata.encoding}`);break;}console.log(`Received message${message.id}:`);console.log(`\tData:${JSON.stringify(result,null,4)}`);console.log(`\tAttributes:${JSON.stringify(message.attributes,null,4)}`);messageCount+=1;};// Listen for new messages until timeout is hitsubscription.on('message',messageHandler);setTimeout(()=>{subscription.removeListener('message',messageHandler);console.log(`${messageCount} message(s) received.`);},timeout*1000);}PHP
Before trying this sample, follow the PHP setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub PHP API reference documentation.
Avrouse Google\Cloud\PubSub\PubSubClient;/** * Subscribe and pull messages using an AVRO schema. * * @param string $projectId * @param string $subscriptionId */function subscribe_avro_records($projectId, $subscriptionId, $definitionFile){ $pubsub = new PubSubClient([ 'projectId' => $projectId, ]); $subscription = $pubsub->subscription($subscriptionId); $definition = file_get_contents($definitionFile); $messages = $subscription->pull(); foreach ($messages as $message) { $decodedMessageData = ''; $encoding = $message->attribute('googclient_schemaencoding'); switch ($encoding) { case 'BINARY': $io = new \AvroStringIO($message->data()); $schema = \AvroSchema::parse($definition); $reader = new \AvroIODatumReader($schema); $decoder = new \AvroIOBinaryDecoder($io); $decodedMessageData = json_encode($reader->read($decoder)); break; case 'JSON': $decodedMessageData = $message->data(); break; } printf('Received a %d-encoded message %s', $encoding, $decodedMessageData); }}use Google\Cloud\PubSub\PubSubClient;/** * Subscribe and pull messages using a protocol buffer schema. * * Relies on a proto message of the following form: * ``` * syntax = "proto3"; * * package utilities; * * message StateProto { * string name = 1; * string post_abbr = 2; * } * ``` * * @param string $projectId * @param string $subscriptionId */function subscribe_proto_messages($projectId, $subscriptionId){ $pubsub = new PubSubClient([ 'projectId' => $projectId, ]); $subscription = $pubsub->subscription($subscriptionId); $messages = $subscription->pull(); foreach ($messages as $message) { $decodedMessageData = ''; $encoding = $message->attribute('googclient_schemaencoding'); switch ($encoding) { case 'BINARY': $protobufMessage = new \Utilities\StateProto(); $protobufMessage->mergeFromString($message->data()); $decodedMessageData = $protobufMessage->serializeToJsonString(); break; case 'JSON': $decodedMessageData = $message->data(); break; } printf('Received a %d-encoded message %s', $encoding, $decodedMessageData); }}Python
Before trying this sample, follow the Python setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Python API reference documentation.
Avroimportavro.schemaasschemafromavro.ioimportBinaryDecoder,DatumReaderfromconcurrent.futuresimportTimeoutErrorimportioimportjsonfromgoogle.cloud.pubsubimportSubscriberClient# TODO(developer)# project_id = "your-project-id"# subscription_id = "your-subscription-id"# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"# Number of seconds the subscriber listens for messages# timeout = 5.0subscriber=SubscriberClient()subscription_path=subscriber.subscription_path(project_id,subscription_id)withopen(avsc_file,"rb")asfile:avro_schema=schema.parse(file.read())defcallback(message:pubsub_v1.subscriber.message.Message)->None:# Get the message serialization type.encoding=message.attributes.get("googclient_schemaencoding")# Deserialize the message data accordingly.ifencoding=="BINARY":bout=io.BytesIO(message.data)decoder=BinaryDecoder(bout)reader=DatumReader(avro_schema)message_data=reader.read(decoder)print(f"Received a binary-encoded message:\n{message_data}")elifencoding=="JSON":message_data=json.loads(message.data)print(f"Received a JSON-encoded message:\n{message_data}")else:print(f"Received a message with no encoding:\n{message}")message.ack()streaming_pull_future=subscriber.subscribe(subscription_path,callback=callback)print(f"Listening for messages on{subscription_path}..\n")# Wrap subscriber in a 'with' block to automatically call close() when done.withsubscriber:try:# When `timeout` is not set, result() will block indefinitely,# unless an exception occurs first.streaming_pull_future.result(timeout=timeout)exceptTimeoutError:streaming_pull_future.cancel()# Trigger the shutdown.streaming_pull_future.result()# Block until the shutdown is complete.fromconcurrent.futuresimportTimeoutErrorfromgoogle.cloud.pubsubimportSubscriberClientfromgoogle.protobuf.json_formatimportParsefromutilitiesimportus_states_pb2# TODO(developer)# project_id = "your-project-id"# subscription_id = "your-subscription-id"# Number of seconds the subscriber listens for messages# timeout = 5.0subscriber=SubscriberClient()subscription_path=subscriber.subscription_path(project_id,subscription_id)# Instantiate a protoc-generated class defined in `us-states.proto`.state=us_states_pb2.StateProto()defcallback(message:pubsub_v1.subscriber.message.Message)->None:# Get the message serialization type.encoding=message.attributes.get("googclient_schemaencoding")# Deserialize the message data accordingly.ifencoding=="BINARY":state.ParseFromString(message.data)print(f"Received a binary-encoded message:\n{state}")elifencoding=="JSON":Parse(message.data,state)print(f"Received a JSON-encoded message:\n{state}")else:print(f"Received a message with no encoding:\n{message}")message.ack()streaming_pull_future=subscriber.subscribe(subscription_path,callback=callback)print(f"Listening for messages on{subscription_path}..\n")# Wrap subscriber in a 'with' block to automatically call close() when done.withsubscriber:try:# When `timeout` is not set, result() will block indefinitely,# unless an exception occurs first.streaming_pull_future.result(timeout=timeout)exceptTimeoutError:streaming_pull_future.cancel()# Trigger the shutdown.streaming_pull_future.result()# Block until the shutdown is complete.Ruby
The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3.To see a list of Ruby v2 code samples, seethe deprecated code samples.
Before trying this sample, follow the Ruby setup instructions inQuickstart: Using Client Libraries.For more information, see thePub/Sub Ruby API reference documentation.
Avro# subscription_id = "your-subscription-id"# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"pubsub=Google::Cloud::PubSub.newsubscriber=pubsub.subscribersubscription_idlistener=subscriber.listendo|received_message|encoding=received_message.attributes["googclient_schemaencoding"]caseencodingwhen"BINARY"require"avro"avro_schema=Avro::Schema.parseFile.read(avsc_file)buffer=StringIO.newreceived_message.datadecoder=Avro::IO::BinaryDecoder.newbufferreader=Avro::IO::DatumReader.newavro_schemamessage_data=reader.readdecoderputs"Received a binary-encoded message:\n#{message_data}"when"JSON"require"json"message_data=JSON.parsereceived_message.dataputs"Received a JSON-encoded message:\n#{message_data}"else"Received a message with no encoding:\n#{received_message.message_id}"endreceived_message.acknowledge!endlistener.start# Let the main thread sleep for 60 seconds so the thread for listening# messages does not quitsleep60listener.stop.wait!# subscription_id = "your-subscription-id"pubsub=Google::Cloud::PubSub.newsubscriber=pubsub.subscribersubscription_idlistener=subscriber.listendo|received_message|encoding=received_message.attributes["googclient_schemaencoding"]caseencodingwhen"BINARY"state=Utilities::StateProto.decodereceived_message.dataputs"Received a binary-encoded message:\n#{state}"when"JSON"require"json"state=Utilities::StateProto.decode_jsonreceived_message.dataputs"Received a JSON-encoded message:\n#{state}"else"Received a message with no encoding:\n#{received_message.message_id}"endreceived_message.acknowledge!endlistener.start# Let the main thread sleep for 60 seconds so the thread for listening# messages does not quitsleep60listener.stop.wait!Subscribe to a topic associated with an Avro schema with revisions
Avro requires messages to be parsed using the schema with which they areencoded. You can also translate messages to a different schema byusingAvro schema resolution.
Pub/Sub ensures that all schema revisions are forward andbackward compatible with all other revisions. This compatibility lets anyrevision to be used as the reader or writer schema.
When parsing a message encoded with a different schema revisionthan the one your subscriber uses, you might need to getthe original schema and pass it in as the writer schema.
It's best to cache the Avro reader object that can parse messages foreach schema revision encountered in order to minimize latencyand minimize the number of calls to theGetSchema API.
The following code shows these functions:
Read the attributes discussed in the previous section todetermine which schema revision is used to encode the message.
Fetch the schema revision and cache a reader generated with it.
Parse the message into the schema that your subscriber uses.
Go
The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, seethe migration guide to v2.To see a list of v1 code samples, seethe deprecated code samples.
Before trying this sample, follow the Go setup instructions inQuickstart: Using Client Libraries.For more information, see thePub/Sub Go API reference documentation.
import("context""fmt""io""sync""time""cloud.google.com/go/pubsub/v2"schema"cloud.google.com/go/pubsub/v2/apiv1""cloud.google.com/go/pubsub/v2/apiv1/pubsubpb""github.com/linkedin/goavro/v2")funcsubscribeWithAvroSchemaRevisions(wio.Writer,projectID,subIDstring)error{// projectID := "my-project-id"// topicID := "my-topic"ctx:=context.Background()client,err:=pubsub.NewClient(ctx,projectID)iferr!=nil{returnfmt.Errorf("pubsub.NewClient: %w",err)}schemaClient,err:=schema.NewSchemaClient(ctx)iferr!=nil{returnfmt.Errorf("pubsub.NewSchemaClient: %w",err)}// Create the cache for the codecs for different revision IDs.revisionCodecs:=make(map[string]*goavro.Codec)sub:=client.Subscriber(subID)ctx2,cancel:=context.WithTimeout(ctx,10*time.Second)defercancel()varmusync.Mutexsub.Receive(ctx2,func(ctxcontext.Context,msg*pubsub.Message){mu.Lock()defermu.Unlock()name:=msg.Attributes["googclient_schemaname"]revision:=msg.Attributes["googclient_schemarevisionid"]codec,ok:=revisionCodecs[revision]// If the codec doesn't exist in the map, this is the first time we// are seeing this revision. We need to fetch the schema and cache the// codec. It would be more typical to do this asynchronously, but is// shown here in a synchronous way to ease readability.if!ok{s:=&pubsubpb.GetSchemaRequest{Name:fmt.Sprintf("%s@%s",name,revision),View:pubsubpb.SchemaView_FULL,}schema,err:=schemaClient.GetSchema(ctx,s)iferr!=nil{fmt.Fprintf(w,"Nacking, cannot read message without schema: %v\n",err)msg.Nack()return}codec,err=goavro.NewCodec(schema.Definition)iferr!=nil{msg.Nack()fmt.Fprintf(w,"goavro.NewCodec err: %v\n",err)}revisionCodecs[revision]=codec}encoding:=msg.Attributes["googclient_schemaencoding"]varstatemap[string]interface{}ifencoding=="BINARY"{data,_,err:=codec.NativeFromBinary(msg.Data)iferr!=nil{fmt.Fprintf(w,"codec.NativeFromBinary err: %v\n",err)msg.Nack()return}fmt.Fprintf(w,"Received a binary-encoded message:\n%#v\n",data)state=data.(map[string]interface{})}elseifencoding=="JSON"{data,_,err:=codec.NativeFromTextual(msg.Data)iferr!=nil{fmt.Fprintf(w,"codec.NativeFromTextual err: %v\n",err)msg.Nack()return}fmt.Fprintf(w,"Received a JSON-encoded message:\n%#v\n",data)state=data.(map[string]interface{})}else{fmt.Fprintf(w,"Unknown message type(%s), nacking\n",encoding)msg.Nack()return}fmt.Fprintf(w,"%s is abbreviated as %s\n",state["name"],state["post_abbr"])msg.Ack()})returnnil}Java
Before trying this sample, follow the Java setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Java API reference documentation.
importcom.google.cloud.pubsub.v1.AckReplyConsumer;importcom.google.cloud.pubsub.v1.MessageReceiver;importcom.google.cloud.pubsub.v1.SchemaServiceClient;importcom.google.cloud.pubsub.v1.Subscriber;importcom.google.protobuf.ByteString;importcom.google.pubsub.v1.ProjectSubscriptionName;importcom.google.pubsub.v1.PubsubMessage;importcom.google.pubsub.v1.Schema;importjava.io.ByteArrayInputStream;importjava.io.IOException;importjava.io.InputStream;importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;importorg.apache.avro.io.Decoder;importorg.apache.avro.io.DecoderFactory;importorg.apache.avro.specific.SpecificDatumReader;importutilities.State;publicclassSubscribeWithAvroSchemaRevisionsExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringprojectId="your-project-id";// Use an existing subscription.StringsubscriptionId="your-subscription-id";subscribeWithAvroSchemaRevisionsExample(projectId,subscriptionId);}staticSchemaServiceClientgetSchemaServiceClient(){try{returnSchemaServiceClient.create();}catch(IOExceptione){System.out.println("Could not get schema client: "+e);returnnull;}}publicstaticvoidsubscribeWithAvroSchemaRevisionsExample(StringprojectId,StringsubscriptionId){// Used to get the schemas for revsions.finalSchemaServiceClientschemaServiceClient=getSchemaServiceClient();if(schemaServiceClient==null){return;}// Cache for the readers for different revision IDs.Map<String,SpecificDatumReader<State>>revisionReaders=newHashMap<String,SpecificDatumReader<State>>();ProjectSubscriptionNamesubscriptionName=ProjectSubscriptionName.of(projectId,subscriptionId);// Instantiate an asynchronous message receiver.MessageReceiverreceiver=(PubsubMessagemessage,AckReplyConsumerconsumer)->{// Get the schema encoding type.Stringname=message.getAttributesMap().get("googclient_schemaname");Stringrevision=message.getAttributesMap().get("googclient_schemarevisionid");SpecificDatumReader<State>reader=null;synchronized(revisionReaders){reader=revisionReaders.get(revision);}if(reader==null){// This is the first time we are seeing this revision. We need to// fetch the schema and cache its decoder. It would be more typical// to do this asynchronously, but is shown here in a synchronous// way to ease readability.try{Schemaschema=schemaServiceClient.getSchema(name+"@"+revision);org.apache.avro.SchemaavroSchema=neworg.apache.avro.Schema.Parser().parse(schema.getDefinition());reader=newSpecificDatumReader<State>(avroSchema,State.getClassSchema());synchronized(revisionReaders){revisionReaders.put(revision,reader);}}catch(Exceptione){System.out.println("Could not get schema: "+e);// Without the schema, we cannot read the message, so nack it.consumer.nack();return;}}ByteStringdata=message.getData();// Send the message data to a byte[] input stream.InputStreaminputStream=newByteArrayInputStream(data.toByteArray());Stringencoding=message.getAttributesMap().get("googclient_schemaencoding");Decoderdecoder=null;// Prepare an appropriate decoder for the message data in the input stream// based on the schema encoding type.try{switch(encoding){case"BINARY":decoder=DecoderFactory.get().directBinaryDecoder(inputStream,/* reuse= */null);System.out.println("Receiving a binary-encoded message:");break;case"JSON":decoder=DecoderFactory.get().jsonDecoder(State.getClassSchema(),inputStream);System.out.println("Receiving a JSON-encoded message:");break;default:System.out.println("Unknown message type; nacking.");consumer.nack();break;}// Obtain an object of the generated Avro class using the decoder.Statestate=reader.read(null,decoder);System.out.println(state.getName()+" is abbreviated as "+state.getPostAbbr());// Ack the message.consumer.ack();}catch(IOExceptione){System.err.println(e);// If we failed to process the message, nack it.consumer.nack();}};Subscribersubscriber=null;try{subscriber=Subscriber.newBuilder(subscriptionName,receiver).build();subscriber.startAsync().awaitRunning();System.out.printf("Listening for messages on %s:\n",subscriptionName.toString());subscriber.awaitTerminated(30,TimeUnit.SECONDS);}catch(TimeoutExceptiontimeoutException){subscriber.stopAsync();}}}Python
Before trying this sample, follow the Python setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Python API reference documentation.
importavro.schemaasschemafromavro.ioimportBinaryDecoder,DatumReaderfromconcurrent.futuresimportTimeoutErrorimportioimportjsonfromgoogle.api_core.exceptionsimportNotFoundfromgoogle.cloud.pubsubimportSchemaServiceClient,SubscriberClientschema_client=SchemaServiceClient()# TODO(developer)# project_id = "your-project-id"# subscription_id = "your-subscription-id"# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"# Number of seconds the subscriber listens for messages# timeout = 5.0subscriber=SubscriberClient()subscription_path=subscriber.subscription_path(project_id,subscription_id)withopen(avsc_file,"rb")asfile:reader_avro_schema=schema.parse(file.read())# Dict to keep readers for different schema revisions.revisions_to_readers={}defcallback(message:pubsub_v1.subscriber.message.Message)->None:# Get the message serialization type.schema_name=message.attributes.get("googclient_schemaname")schema_revision_id=message.attributes.get("googclient_schemarevisionid")encoding=message.attributes.get("googclient_schemaencoding")ifschema_revision_idnotinrevisions_to_readers:schema_path=schema_name+"@"+schema_revision_idtry:received_avro_schema=schema_client.get_schema(request={"name":schema_path})exceptNotFound:print(f"{schema_path} not found.")message.nack()returnwriter_avro_schema=schema.parse(received_avro_schema.definition)revisions_to_readers[schema_revision_id]=DatumReader(writer_avro_schema,reader_avro_schema)reader=revisions_to_readers[schema_revision_id]# Deserialize the message data accordingly.ifencoding=="BINARY":bout=io.BytesIO(message.data)decoder=BinaryDecoder(bout)message_data=reader.read(decoder)print(f"Received a binary-encoded message:\n{message_data}")elifencoding=="JSON":message_data=json.loads(message.data)print(f"Received a JSON-encoded message:\n{message_data}")else:print(f"Received a message with no encoding:\n{message}")message.nack()message.ack()streaming_pull_future=subscriber.subscribe(subscription_path,callback=callback)print(f"Listening for messages on{subscription_path}..\n")# Wrap subscriber in a 'with' block to automatically call close() when done.withsubscriber:try:# When `timeout` is not set, result() will block indefinitely,# unless an exception occurs first.streaming_pull_future.result(timeout=timeout)exceptTimeoutError:streaming_pull_future.cancel()# Trigger the shutdown.streaming_pull_future.result()# Block until the shutdown is complete.Required roles
To get the permissions you need to validate a message against a schema, complete the one of the following steps:
- Grant one of the following predefined roles to a service account:
roles/pubsub.admin,roles/pubsub.editor, orroles/pubsub.viewer. Create a custom role for a service account and add the following permissions
pubsub.schemas.validateandpubsub.schemas.get.To learn more about custom roles, seeCreate and manage Custom IAM roles.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.