Pub/Sub OpenTelemetry tracing Stay organized with collections Save and categorize content based on your preferences.
OpenTelemetry tracing lets you identify and trace the latency of variousPub/Sub client library operations, such as batching, leasemanagement, and flow control. Collecting this information can help you debugclient library issues.
Some potential use cases for OpenTelemetry tracing include the following:
- Your service is experiencing a higher publishing latency than normal.
- You are experiencing a high number of message redeliveries.
- A change to your subscriber client's callback function causes processing totake longer than usual.
Before you begin
Before configuring OpenTelemetry, complete the following tasks:
- Set up Pub/Sub using one of theclientlibraries.
- Install theOpenTelemetry SDKand set up a trace exporter and a tracer provider.
- Enable theCloud TraceAPI.
- Understand how toread Cloud Observabilitytraces.
Required roles
To ensure that the service account has the necessary permissions to export traces to Cloud Trace, ask your administrator to grant the service account the following IAM roles on your project:
Important: You must grant these roles to the service account,not to your user account. Failure to grant the roles to the correct principal might result in permission errors.- All:Cloud Trace Agent (
roles/cloudtrace.agent)
For more information about granting roles, seeManage access to projects, folders, and organizations.
These predefined roles contain the permissions required to export traces to Cloud Trace. To see the exact permissions that are required, expand theRequired permissions section:
Required permissions
The following permissions are required to export traces to Cloud Trace:
- All:
cloudtrace.traces.patch
Your administrator might also be able to give the service account these permissions withcustom roles or otherpredefined roles.
OpenTelemetry tracing workflow
To set up OpenTelemetry tracing, you use the Pub/Subclient libraries and the OpenTelemetry SDK. With the SDK, you must set up atrace exporter and a tracer provider, before connecting to thePub/Sub libraries. In some libraries, setting up a tracer provideris optional.
Trace exporter. The OpenTelemetry SDK uses the trace exporter todetermine where to send traces.
Tracer provider. The Pub/Sub client libraries use the tracerprovider to create traces.
The following steps outline how to set up tracing:
- Instantiate a Cloud Trace OpenTelemetry exporter.
- If required, instantiate and register a Tracer Provider using the OpenTelemetry SDK.
- Configure your client with the enable OpenTelemetry tracing option.
- Use the Pub/Sub client libraries to publish a message.
How tracing works
For every message published, the client library creates a new trace. This tracerepresents the entire lifecycle of the message, from the moment you publish amessage to when the message is acknowledged. A trace encapsulates informationsuch as the duration of operations, parent spans and children spans, and linkedspans.
A trace is made up of aroot span and its correspondingchild spans. Thesespans represent the work the client library does when processing a message. Eachmessage trace contains the following:
- For publishing. Flow control, ordering key scheduling, batching, andthe length of the publish RPC.
- For subscriptions. Concurrency control, ordering key scheduling, andlease management.
In order to propagate information from the publish to subscribe side, theclient libraries inject a tracing specific attribute on the publishside. The context propagation mechanism is only enabled when tracing is turnedon and is prepended with thegoogclient_ prefix.
Publish Messages with tracing
The following code sample shows you how to enable tracing by using thePub/Sub client library and the OpenTelemetry SDK. In this sample, thetracing results are exported to Cloud Trace.
Note: Cloud Trace only provides exporters for Java, Go, Node, and Python. Toexport with any other languages, use theOpenTelemetry Collector withGoogle CloudExporter.Setting up these processes require additional setup steps not documented on this page.Considerations
When instantiating the tracer provider, you configure a sampling ratiowith the OpenTelemetry SDK. This ratio determines how many traces the SDKshould sample. A lower sampling rate can help reduce billing costs and preventyour service from exceeding theCloud Trace spanquota.
Warning: traces are subject to change. The name and attributes of a span mightchange without notice. Only use run traces interactively. Don't use inautomation. Running non-interactive traces can cause problems if the underlyingtrace architecture changes without notice.Go
import("context""fmt""io""cloud.google.com/go/pubsub/v2""go.opentelemetry.io/otel""google.golang.org/api/option"texporter"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace""go.opentelemetry.io/otel/sdk/resource"sdktrace"go.opentelemetry.io/otel/sdk/trace"semconv"go.opentelemetry.io/otel/semconv/v1.26.0")// publishOpenTelemetryTracing publishes a single message with OpenTelemetry tracing// enabled, exporting to Cloud Trace.funcpublishOpenTelemetryTracing(wio.Writer,projectID,topicIDstring,samplingfloat64)error{// projectID := "my-project-id"// topicID := "my-topic"ctx:=context.Background()exporter,err:=texporter.New(texporter.WithProjectID(projectID),// Disable spans created by the exporter.texporter.WithTraceClientOptions([]option.ClientOption{option.WithTelemetryDisabled()},),)iferr!=nil{returnfmt.Errorf("error instantiating exporter: %w",err)}resources:=resource.NewWithAttributes(semconv.SchemaURL,semconv.ServiceNameKey.String("publisher"),)// Instantiate a tracer provider with the following settingstp:=sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter),sdktrace.WithResource(resources),sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampling)),),)defertp.ForceFlush(ctx)// flushes any pending spansotel.SetTracerProvider(tp)// Create a new client with tracing enabled.client,err:=pubsub.NewClientWithConfig(ctx,projectID,&pubsub.ClientConfig{EnableOpenTelemetryTracing:true,})iferr!=nil{returnfmt.Errorf("pubsub: NewClient: %w",err)}deferclient.Close()// client.Publisher can be passed a topic ID (e.g. "my-topic") or// a fully qualified name (e.g. "projects/my-project/topics/my-topic").// If a topic ID is provided, the project ID from the client is used.// Reuse this publisher for all publish calls to send messages in batches.publisher:=client.Publisher(topicID)result:=publisher.Publish(ctx,&pubsub.Message{Data:[]byte("Publishing message with tracing"),})if_,err:=result.Get(ctx);err!=nil{returnfmt.Errorf("pubsub: result.Get: %w",err)}fmt.Fprintln(w,"Published a traced message")returnnil}C++
//Createafewnamespacealiasestomakethecodeeasiertoread.namespacegc=::google::cloud;namespaceotel=gc::otel;namespacepubsub=gc::pubsub;//Thisexampleusesasimplewrappertoexport(upload)OTeltracingdata//toGoogleCloudTrace.Morecomplexapplicationsmayusedifferent//authentication,orconfiguretheirownOTelexporter.autoproject=gc::Project(project_id);autoconfiguration=otel::ConfigureBasicTracing(project);autopublisher=pubsub::Publisher(pubsub::MakePublisherConnection(pubsub::Topic(project_id,topic_id),//ConfigurethispublishertoenableOTeltracing.Someapplicationsmay//chosetodisabletracinginsomepublishersortodynamicallyenable//thisoptionbasedontheirownconfiguration.gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));//Afterthispoint,usetheCloudPub/SubC++clientlibraryasusual.//Inthisexample,wewillsendafewmessagesandconfigureacallback//actionforeachone.std::vector<gc::future<void>>ids;for(inti=0;i <5;i++){autoid=publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build()).then([](gc::future<gc::StatusOr<std::string>>f){autoid=f.get();if(!id){std::cout <<"Error in publish: " <<id.status() <<"\n";return;}std::cout <<"Sent message with id: (" <<*id <<")\n";});ids.push_back(std::move(id));}//Blockuntilthemessagesareactuallysent.for(auto&id:ids)id.get();Python
Before trying this sample, follow the Python setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Python API reference documentation.
fromopentelemetryimporttracefromopentelemetry.sdk.traceimportTracerProviderfromopentelemetry.sdk.trace.exportimport(BatchSpanProcessor,)fromopentelemetry.exporter.cloud_traceimportCloudTraceSpanExporterfromopentelemetry.sdk.trace.samplingimportTraceIdRatioBased,ParentBasedfromgoogle.cloud.pubsub_v1importPublisherClientfromgoogle.cloud.pubsub_v1.typesimportPublisherOptions# TODO(developer)# topic_project_id = "your-topic-project-id"# trace_project_id = "your-trace-project-id"# topic_id = "your-topic-id"# In this sample, we use a Google Cloud Trace to export the OpenTelemetry# traces: https://cloud.google.com/trace/docs/setup/python-ot# Choose and configure the exporter for your set up accordingly.sampler=ParentBased(root=TraceIdRatioBased(1))trace.set_tracer_provider(TracerProvider(sampler=sampler))# Export to Google Trace.cloud_trace_exporter=CloudTraceSpanExporter(project_id=trace_project_id,)trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(cloud_trace_exporter))# Set the `enable_open_telemetry_tracing` option to True when creating# the publisher client. This in itself is necessary and sufficient for# the library to export OpenTelemetry traces. However, where the traces# must be exported to needs to be configured based on your OpenTelemetry# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/publisher=PublisherClient(publisher_options=PublisherOptions(enable_open_telemetry_tracing=True,),)# The `topic_path` method creates a fully qualified identifier# in the form `projects/{project_id}/topics/{topic_id}`topic_path=publisher.topic_path(topic_project_id,topic_id)# Publish messages.forninrange(1,10):data_str=f"Message number{n}"# Data must be a bytestringdata=data_str.encode("utf-8")# When you publish a message, the client returns a future.future=publisher.publish(topic_path,data)print(future.result())print(f"Published messages to{topic_path}.")TypeScript
/** * TODO(developer): Uncomment these variables before running the sample. */// const topicNameOrId = 'YOUR_TOPIC_OR_ID';// const data = 'Hello, world!";// Imports the Google Cloud client libraryimport{PubSub}from'@google-cloud/pubsub';// Imports the OpenTelemetry APIimport{NodeTracerProvider}from'@opentelemetry/sdk-trace-node';import{diag,DiagConsoleLogger,DiagLogLevel}from'@opentelemetry/api';import{SimpleSpanProcessor}from'@opentelemetry/sdk-trace-base';// To output to the console for testing, use the ConsoleSpanExporter.// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';// To output to Cloud Trace, import the OpenTelemetry bridge library.import{TraceExporter}from'@google-cloud/opentelemetry-cloud-trace-exporter';import{Resource}from'@opentelemetry/resources';import{SEMRESATTRS_SERVICE_NAME}from'@opentelemetry/semantic-conventions';// Enable the diagnostic logger for OpenTelemetrydiag.setLogger(newDiagConsoleLogger(),DiagLogLevel.DEBUG);// Log spans out to the console, for testing.// const exporter = new ConsoleSpanExporter();// Log spans out to Cloud Trace, for production.constexporter=newTraceExporter();// Build a tracer provider and a span processor to do// something with the spans we're generating.constprovider=newNodeTracerProvider({resource:newResource({[SEMRESATTRS_SERVICE_NAME]:'otel publisher example',}),});constprocessor=newSimpleSpanProcessor(exporter);provider.addSpanProcessor(processor);provider.register();// Creates a client; cache this for further use.constpubSubClient=newPubSub({enableOpenTelemetryTracing:true});asyncfunctionpublishMessage(topicNameOrId:string,data:string){// Publishes the message as a string, e.g. "Hello, world!"// or JSON.stringify(someObject)constdataBuffer=Buffer.from(data);// Cache topic objects (publishers) and reuse them.constpublisher=pubSubClient.topic(topicNameOrId);constmessageId=awaitpublisher.publishMessage({data:dataBuffer});console.log(`Message${messageId} published.`);// The rest of the sample is in service to making sure that any// buffered Pub/Sub messages and/or OpenTelemetry spans are properly// flushed to the server side. In normal usage, you'd only need to do// something like this on process shutdown.awaitpublisher.flush();awaitprocessor.forceFlush();awaitnewPromise(r=>setTimeout(r,OTEL_TIMEOUT*1000));}Node.js
/** * TODO(developer): Uncomment these variables before running the sample. */// const topicNameOrId = 'YOUR_TOPIC_OR_ID';// const data = 'Hello, world!";// Imports the Google Cloud client libraryconst{PubSub}=require('@google-cloud/pubsub');// Imports the OpenTelemetry APIconst{NodeTracerProvider}=require('@opentelemetry/sdk-trace-node');const{diag,DiagConsoleLogger,DiagLogLevel}=require('@opentelemetry/api');const{SimpleSpanProcessor}=require('@opentelemetry/sdk-trace-base');// To output to the console for testing, use the ConsoleSpanExporter.// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';// To output to Cloud Trace, import the OpenTelemetry bridge library.const{TraceExporter,}=require('@google-cloud/opentelemetry-cloud-trace-exporter');const{Resource}=require('@opentelemetry/resources');const{SEMRESATTRS_SERVICE_NAME,}=require('@opentelemetry/semantic-conventions');// Enable the diagnostic logger for OpenTelemetrydiag.setLogger(newDiagConsoleLogger(),DiagLogLevel.DEBUG);// Log spans out to the console, for testing.// const exporter = new ConsoleSpanExporter();// Log spans out to Cloud Trace, for production.constexporter=newTraceExporter();// Build a tracer provider and a span processor to do// something with the spans we're generating.constprovider=newNodeTracerProvider({resource:newResource({[SEMRESATTRS_SERVICE_NAME]:'otel publisher example',}),});constprocessor=newSimpleSpanProcessor(exporter);provider.addSpanProcessor(processor);provider.register();// Creates a client; cache this for further use.constpubSubClient=newPubSub({enableOpenTelemetryTracing:true});asyncfunctionpublishMessage(topicNameOrId,data){// Publishes the message as a string, e.g. "Hello, world!"// or JSON.stringify(someObject)constdataBuffer=Buffer.from(data);// Cache topic objects (publishers) and reuse them.constpublisher=pubSubClient.topic(topicNameOrId);constmessageId=awaitpublisher.publishMessage({data:dataBuffer});console.log(`Message${messageId} published.`);// The rest of the sample is in service to making sure that any// buffered Pub/Sub messages and/or OpenTelemetry spans are properly// flushed to the server side. In normal usage, you'd only need to do// something like this on process shutdown.awaitpublisher.flush();awaitprocessor.forceFlush();awaitnewPromise(r=>setTimeout(r,OTEL_TIMEOUT*1000));}Java
importcom.google.api.core.ApiFuture;importcom.google.cloud.opentelemetry.trace.TraceConfiguration;importcom.google.cloud.opentelemetry.trace.TraceExporter;importcom.google.cloud.pubsub.v1.Publisher;importcom.google.protobuf.ByteString;importcom.google.pubsub.v1.PubsubMessage;importcom.google.pubsub.v1.TopicName;importio.opentelemetry.api.OpenTelemetry;importio.opentelemetry.sdk.OpenTelemetrySdk;importio.opentelemetry.sdk.resources.Resource;importio.opentelemetry.sdk.trace.SdkTracerProvider;importio.opentelemetry.sdk.trace.export.SimpleSpanProcessor;importio.opentelemetry.sdk.trace.export.SpanExporter;importio.opentelemetry.sdk.trace.samplers.Sampler;importio.opentelemetry.semconv.ResourceAttributes;importjava.io.IOException;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;publicclassOpenTelemetryPublisherExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringprojectId="your-project-id";StringtopicId="your-topic-id";openTelemetryPublisherExample(projectId,topicId);}publicstaticvoidopenTelemetryPublisherExample(StringprojectId,StringtopicId)throwsIOException,ExecutionException,InterruptedException{Resourceresource=Resource.getDefault().toBuilder().put(ResourceAttributes.SERVICE_NAME,"publisher-example").build();// Creates a Cloud Trace exporter.SpanExportertraceExporter=TraceExporter.createWithConfiguration(TraceConfiguration.builder().setProjectId(projectId).build());SdkTracerProvidersdkTracerProvider=SdkTracerProvider.builder().setResource(resource).addSpanProcessor(SimpleSpanProcessor.create(traceExporter)).setSampler(Sampler.alwaysOn()).build();OpenTelemetryopenTelemetry=OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal();TopicNametopicName=TopicName.of(projectId,topicId);Publisherpublisher=null;try{// Create a publisher instance with the created OpenTelemetry object and enabling tracing.publisher=Publisher.newBuilder(topicName).setOpenTelemetry(openTelemetry).setEnableOpenTelemetryTracing(true).build();Stringmessage="Hello World!";ByteStringdata=ByteString.copyFromUtf8(message);PubsubMessagepubsubMessage=PubsubMessage.newBuilder().setData(data).build();// Once published, returns a server-assigned message id (unique within the topic)ApiFuture<String>messageIdFuture=publisher.publish(pubsubMessage);StringmessageId=messageIdFuture.get();System.out.println("Published message ID: "+messageId);}finally{if(publisher!=null){// When finished with the publisher, shutdown to free up resources.publisher.shutdown();publisher.awaitTermination(1,TimeUnit.MINUTES);}}}}Receive messages with tracing
Go
import("context""fmt""io""sync/atomic""time""cloud.google.com/go/pubsub/v2"texporter"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace""go.opentelemetry.io/otel""go.opentelemetry.io/otel/sdk/resource"sdktrace"go.opentelemetry.io/otel/sdk/trace"semconv"go.opentelemetry.io/otel/semconv/v1.24.0""google.golang.org/api/option")funcsubscribeOpenTelemetryTracing(wio.Writer,projectID,subIDstring,sampleRatefloat64)error{// projectID := "my-project-id"// subID := "my-sub"// sampleRate := "1.0"ctx:=context.Background()exporter,err:=texporter.New(texporter.WithProjectID(projectID),// Disable spans created by the exporter.texporter.WithTraceClientOptions([]option.ClientOption{option.WithTelemetryDisabled()},),)iferr!=nil{returnfmt.Errorf("error instantiating exporter: %w",err)}resources:=resource.NewWithAttributes(semconv.SchemaURL,semconv.ServiceNameKey.String("subscriber"),)// Instantiate a tracer provider with the following settingstp:=sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter),sdktrace.WithResource(resources),sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampleRate)),),)defertp.ForceFlush(ctx)// flushes any pending spansotel.SetTracerProvider(tp)// Create a new client with tracing enabled.client,err:=pubsub.NewClientWithConfig(ctx,projectID,&pubsub.ClientConfig{EnableOpenTelemetryTracing:true,})iferr!=nil{returnfmt.Errorf("pubsub.NewClient: %w",err)}deferclient.Close()// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").// If a subscription ID is provided, the project ID from the client is used.sub:=client.Subscriber(subID)// Receive messages for 10 seconds, which simplifies testing.// Comment this out in production, since `Receive` should// be used as a long running operation.ctx,cancel:=context.WithTimeout(ctx,10*time.Second)defercancel()varreceivedint32err=sub.Receive(ctx,func(_context.Context,msg*pubsub.Message){fmt.Fprintf(w,"Got message: %q\n",string(msg.Data))atomic.AddInt32(&received,1)msg.Ack()})iferr!=nil{returnfmt.Errorf("sub.Receive: %w",err)}fmt.Fprintf(w,"Received %d messages\n",received)returnnil}C++
#include "google/cloud/opentelemetry/configure_basic_tracing.h"#include "google/cloud/opentelemetry_options.h"#include "google/cloud/pubsub/message.h"#include "google/cloud/pubsub/publisher.h"#include "google/cloud/pubsub/subscriber.h"#include "google/cloud/pubsub/subscription.h"#include <iostream>intmain(intargc,char*argv[])try{if(argc!=4){std::cerr <<"Usage: " <<argv[0] <<" <project-id> <topic-id> <subscription-id>\n";return1;}std::stringconstproject_id=argv[1];std::stringconsttopic_id=argv[2];std::stringconstsubscription_id=argv[3];//Createafewnamespacealiasestomakethecodeeasiertoread.namespacegc=::google::cloud;namespaceotel=gc::otel;namespacepubsub=gc::pubsub;autoconstexprkWaitTimeout=std::chrono::seconds(30);autoproject=gc::Project(project_id);autoconfiguration=otel::ConfigureBasicTracing(project);//Publishamessagewithtracingenabled.autopublisher=pubsub::Publisher(pubsub::MakePublisherConnection(pubsub::Topic(project_id,topic_id),gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));//Blockuntilthemessageisactuallysentandthrowonerror.autoid=publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build()).get().value();std::cout <<"Sent message with id: (" <<id <<")\n";//Receiveamessageusingstreamingpullwithtracingenabled.autosubscriber=pubsub::Subscriber(pubsub::MakeSubscriberConnection(pubsub::Subscription(project_id,subscription_id),gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));autosession=subscriber.Subscribe([&](pubsub::Messageconst&m,pubsub::AckHandlerh){std::cout <<"Received message " <<m <<"\n";std::move(h).ack();});std::cout <<"Waiting for messages on "+subscription_id+"...\n";//Blocksuntilthetimeoutisreached.autoresult=session.wait_for(kWaitTimeout);if(result==std::future_status::timeout){std::cout <<"timeout reached, ending session\n";session.cancel();}return0;}catch(google::cloud::Statusconst&status){std::cerr <<"google::cloud::Status thrown: " <<status <<"\n";return1;}Python
fromopentelemetryimporttracefromopentelemetry.sdk.traceimportTracerProviderfromopentelemetry.sdk.trace.exportimport(BatchSpanProcessor,)fromopentelemetry.exporter.cloud_traceimportCloudTraceSpanExporterfromopentelemetry.sdk.trace.samplingimportTraceIdRatioBased,ParentBasedfromgoogle.cloudimportpubsub_v1fromgoogle.cloud.pubsub_v1importSubscriberClientfromgoogle.cloud.pubsub_v1.typesimportSubscriberOptions# TODO(developer)# subscription_project_id = "your-subscription-project-id"# subscription_id = "your-subscription-id"# cloud_trace_project_id = "your-cloud-trace-project-id"# timeout = 300.0# In this sample, we use a Google Cloud Trace to export the OpenTelemetry# traces: https://cloud.google.com/trace/docs/setup/python-ot# Choose and configure the exporter for your set up accordingly.sampler=ParentBased(root=TraceIdRatioBased(1))trace.set_tracer_provider(TracerProvider(sampler=sampler))# Export to Google Tracecloud_trace_exporter=CloudTraceSpanExporter(project_id=cloud_trace_project_id,)trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(cloud_trace_exporter))# Set the `enable_open_telemetry_tracing` option to True when creating# the subscriber client. This in itself is necessary and sufficient for# the library to export OpenTelemetry traces. However, where the traces# must be exported to needs to be configured based on your OpenTelemetry# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/subscriber=SubscriberClient(subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True))# The `subscription_path` method creates a fully qualified identifier# in the form `projects/{project_id}/subscriptions/{subscription_id}`subscription_path=subscriber.subscription_path(subscription_project_id,subscription_id)# Define callback to be called when a message is received.defcallback(message:pubsub_v1.subscriber.message.Message)->None:# Ack message after processing it.print(message.data)message.ack()# Wrap subscriber in a 'with' block to automatically call close() when done.withsubscriber:try:# Optimistically subscribe to messages on the subscription.streaming_pull_future=subscriber.subscribe(subscription_path,callback=callback)streaming_pull_future.result(timeout=timeout)exceptTimeoutError:print("Successfully subscribed until the timeout passed.")streaming_pull_future.cancel()# Trigger the shutdown.streaming_pull_future.result()# Block until the shutdown is complete.TypeScript
/** * TODO(developer): Uncomment these variables before running the sample. */// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';// Imports the Google Cloud client libraryimport{Message,PubSub}from'@google-cloud/pubsub';// Imports the OpenTelemetry APIimport{NodeTracerProvider}from'@opentelemetry/sdk-trace-node';import{diag,DiagConsoleLogger,DiagLogLevel}from'@opentelemetry/api';import{SimpleSpanProcessor}from'@opentelemetry/sdk-trace-base';// To output to the console for testing, use the ConsoleSpanExporter.// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';// To output to Cloud Trace, import the OpenTelemetry bridge library.import{TraceExporter}from'@google-cloud/opentelemetry-cloud-trace-exporter';import{Resource}from'@opentelemetry/resources';import{SEMRESATTRS_SERVICE_NAME}from'@opentelemetry/semantic-conventions';// Enable the diagnostic logger for OpenTelemetrydiag.setLogger(newDiagConsoleLogger(),DiagLogLevel.DEBUG);// Log spans out to the console, for testing.// const exporter = new ConsoleSpanExporter();// Log spans out to Cloud Trace, for production.constexporter=newTraceExporter();// Build a tracer provider and a span processor to do// something with the spans we're generating.constprovider=newNodeTracerProvider({resource:newResource({[SEMRESATTRS_SERVICE_NAME]:'otel subscriber example',}),});constprocessor=newSimpleSpanProcessor(exporter);provider.addSpanProcessor(processor);provider.register();// Creates a client; cache this for further use.constpubSubClient=newPubSub({enableOpenTelemetryTracing:true});asyncfunctionsubscriptionListen(subscriptionNameOrId:string){constsubscriber=pubSubClient.subscription(subscriptionNameOrId);// Message handler for subscriberconstmessageHandler=async(message:Message)=>{console.log(`Message${message.id} received.`);message.ack();};// Error handler for subscriberconsterrorHandler=async(error:Error)=>{console.log('Received error:',error);};// Listens for new messages from the topicsubscriber.on('message',messageHandler);subscriber.on('error',errorHandler);// Ensures that all spans got flushed by the exporter. This function// is in service to making sure that any buffered Pub/Sub messages// and/or OpenTelemetry spans are properly flushed to the server// side. In normal usage, you'd only need to do something like this// on process shutdown.asyncfunctionshutdown(){awaitsubscriber.close();awaitprocessor.forceFlush();awaitnewPromise(r=>setTimeout(r,OTEL_TIMEOUT*1000));}// Wait a bit for the subscription to receive messages, then shut down// gracefully. This is for the sample only; normally you would not need// this delay.awaitnewPromise<void>(r=>setTimeout(async()=>{subscriber.removeAllListeners();awaitshutdown();r();},SUBSCRIBER_TIMEOUT*1000),);}Node.js
/** * TODO(developer): Uncomment these variables before running the sample. */// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';// Imports the Google Cloud client libraryconst{PubSub}=require('@google-cloud/pubsub');// Imports the OpenTelemetry APIconst{NodeTracerProvider}=require('@opentelemetry/sdk-trace-node');const{diag,DiagConsoleLogger,DiagLogLevel}=require('@opentelemetry/api');const{SimpleSpanProcessor}=require('@opentelemetry/sdk-trace-base');// To output to the console for testing, use the ConsoleSpanExporter.// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';// To output to Cloud Trace, import the OpenTelemetry bridge library.const{TraceExporter,}=require('@google-cloud/opentelemetry-cloud-trace-exporter');const{Resource}=require('@opentelemetry/resources');const{SEMRESATTRS_SERVICE_NAME,}=require('@opentelemetry/semantic-conventions');// Enable the diagnostic logger for OpenTelemetrydiag.setLogger(newDiagConsoleLogger(),DiagLogLevel.DEBUG);// Log spans out to the console, for testing.// const exporter = new ConsoleSpanExporter();// Log spans out to Cloud Trace, for production.constexporter=newTraceExporter();// Build a tracer provider and a span processor to do// something with the spans we're generating.constprovider=newNodeTracerProvider({resource:newResource({[SEMRESATTRS_SERVICE_NAME]:'otel subscriber example',}),});constprocessor=newSimpleSpanProcessor(exporter);provider.addSpanProcessor(processor);provider.register();// Creates a client; cache this for further use.constpubSubClient=newPubSub({enableOpenTelemetryTracing:true});asyncfunctionsubscriptionListen(subscriptionNameOrId){constsubscriber=pubSubClient.subscription(subscriptionNameOrId);// Message handler for subscriberconstmessageHandler=asyncmessage=>{console.log(`Message${message.id} received.`);message.ack();};// Error handler for subscriberconsterrorHandler=asyncerror=>{console.log('Received error:',error);};// Listens for new messages from the topicsubscriber.on('message',messageHandler);subscriber.on('error',errorHandler);// Ensures that all spans got flushed by the exporter. This function// is in service to making sure that any buffered Pub/Sub messages// and/or OpenTelemetry spans are properly flushed to the server// side. In normal usage, you'd only need to do something like this// on process shutdown.asyncfunctionshutdown(){awaitsubscriber.close();awaitprocessor.forceFlush();awaitnewPromise(r=>setTimeout(r,OTEL_TIMEOUT*1000));}// Wait a bit for the subscription to receive messages, then shut down// gracefully. This is for the sample only; normally you would not need// this delay.awaitnewPromise(r=>setTimeout(async()=>{subscriber.removeAllListeners();awaitshutdown();r();},SUBSCRIBER_TIMEOUT*1000),);}Java
importcom.google.cloud.opentelemetry.trace.TraceConfiguration;importcom.google.cloud.opentelemetry.trace.TraceExporter;importcom.google.cloud.pubsub.v1.AckReplyConsumer;importcom.google.cloud.pubsub.v1.MessageReceiver;importcom.google.cloud.pubsub.v1.Subscriber;importcom.google.pubsub.v1.ProjectSubscriptionName;importcom.google.pubsub.v1.PubsubMessage;importio.opentelemetry.api.OpenTelemetry;importio.opentelemetry.sdk.OpenTelemetrySdk;importio.opentelemetry.sdk.resources.Resource;importio.opentelemetry.sdk.trace.SdkTracerProvider;importio.opentelemetry.sdk.trace.export.SimpleSpanProcessor;importio.opentelemetry.sdk.trace.export.SpanExporter;importio.opentelemetry.sdk.trace.samplers.Sampler;importio.opentelemetry.semconv.ResourceAttributes;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;publicclassOpenTelemetrySubscriberExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringprojectId="your-project-id";StringsubscriptionId="your-subscription-id";openTelemetrySubscriberExample(projectId,subscriptionId);}publicstaticvoidopenTelemetrySubscriberExample(StringprojectId,StringsubscriptionId){Resourceresource=Resource.getDefault().toBuilder().put(ResourceAttributes.SERVICE_NAME,"subscriber-example").build();// Creates a Cloud Trace exporter.SpanExportertraceExporter=TraceExporter.createWithConfiguration(TraceConfiguration.builder().setProjectId(projectId).build());SdkTracerProvidersdkTracerProvider=SdkTracerProvider.builder().setResource(resource).addSpanProcessor(SimpleSpanProcessor.create(traceExporter)).setSampler(Sampler.alwaysOn()).build();OpenTelemetryopenTelemetry=OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal();ProjectSubscriptionNamesubscriptionName=ProjectSubscriptionName.of(projectId,subscriptionId);// Instantiate an asynchronous message receiver.MessageReceiverreceiver=(PubsubMessagemessage,AckReplyConsumerconsumer)->{// Handle incoming message, then ack the received message.System.out.println("Id: "+message.getMessageId());System.out.println("Data: "+message.getData().toStringUtf8());consumer.ack();};Subscribersubscriber=null;try{subscriber=Subscriber.newBuilder(subscriptionName,receiver).setOpenTelemetry(openTelemetry).setEnableOpenTelemetryTracing(true).build();// Start the subscriber.subscriber.startAsync().awaitRunning();System.out.printf("Listening for messages on %s:\n",subscriptionName.toString());// Allow the subscriber to run for 30s unless an unrecoverable error occurs.subscriber.awaitTerminated(30,TimeUnit.SECONDS);}catch(TimeoutExceptiontimeoutException){// Shut down the subscriber after 30s. Stop receiving messages.subscriber.stopAsync();}}}Analyze a trace
The following sections contain detailed information about how to track andanalyze a trace in the Google Cloud console.
Considerations
- When publishing a batch of messages, the publish RPC span is captured in aseparate trace.
- A publish RPC has multiple origin spans, since multiple create calls canresult in a publish RPC when they are batched together.
Spans in OpenTelemetry can have zero or one parent spans.
Spans representing batched operations, such apublish batch,(which logically should have multipleparents) can't be represented usingzero or one parent spans.
Track spans created during the message lifecycle
The following image shows an example of spans that are created in a single tracefor a single message.

Each span can have additional attributes. Span attributes convey additionalmetadata such as the message's ordering key, the message ID, and the size of themessage.

The main publish and subscribe spans are augmented with span events whichcorrespond to when a network call is issued and when it is completed.

Troubleshoot common issues
The following issues can cause problems with tracing:
- The service account that you use for exporting traces doesn't havethe required
roles/cloudtrace.agentrole. - The quota of the maximum number of ingested spans in Cloud Trace has beenreached.
- Your application is terminated without calling the appropriate flushfunction.
What's next
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.