Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd30d4f2

Browse files
committed
feat: Add OpenTelemetry integration
1 parent04e261c commitd30d4f2

File tree

8 files changed

+321
-41
lines changed

8 files changed

+321
-41
lines changed

‎README.rst‎

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,39 @@ For example, to use JSON Web Tokens, provide a `google.auth.jwt.Credentials`_ in
186186
187187
# The same for the publisher, except that the "audience" claim needs to be adjusted
188188
publisher_audience="https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
189-
credentials_pub= credentials.with_claims(audience=publisher_audience)
189+
credentials_pub= credentials.with_claims(audience=publisher_audience)
190190
publisher= pubsub_v1.PublisherClient(credentials=credentials_pub)
191191
192192
.. _Credentials:https://google-auth.readthedocs.io/en/latest/reference/google.auth.credentials.html#google.auth.credentials.Credentials
193193
.. _google-auth:https://google-auth.readthedocs.io/en/latest/index.html
194194
.. _google.auth.jwt.Credentials:https://google-auth.readthedocs.io/en/latest/reference/google.auth.jwt.html#google.auth.jwt.Credentials
195195

196196

197+
OpenTelemetry Tracing
198+
^^^^^^^^^^^^^^^^^^^^^
199+
200+
To enable OpenTelemetry tracing in Pub/Sub clients, the ``opentelemetry-api``, ``opentelemetry-sdk``,
201+
and ``opentelemetry-instrumentation`` libraries must be installed. After installation, OpenTelemetry
202+
can be used with any publisher or subscriber client by specifying an exporter for traces.
203+
204+
For example, for traces to be exported to Google Cloud Tracing, the Cloud Trace exporter must be specified.
205+
206+
..code-block::python
207+
208+
from opentelemetryimport trace
209+
from opentelemetry.sdk.traceimport TracerProvider
210+
from opentelemetry.sdk.trace.exportimport SimpleExportSpanProcessor
211+
from opentelemetry.exporter.cloud_traceimport CloudTraceSpanExporter
212+
213+
trace.set_tracer_provider(TracerProvider())
214+
trace.get_tracer_provider().add_span_processor(
215+
SimpleExportSpanProcessor(CloudTraceSpanExporter())
216+
)
217+
218+
For more information on OpenTelemetry, please consult the `OpenTelemetry documentation`_.
219+
220+
.. _OpenTelemetry documentation:https://opentelemetry-python.readthedocs.io
221+
197222
Versioning
198223
----------
199224

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright 2020, Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
importlogging
16+
fromcontextlibimportcontextmanager
17+
18+
fromgoogle.api_core.exceptionsimportGoogleAPICallError
19+
20+
_LOGGER=logging.getLogger(__name__)
21+
22+
try:
23+
fromopentelemetryimporttrace
24+
fromopentelemetryimportpropagators
25+
fromopentelemetry.traceimportSpanContext
26+
fromopentelemetry.traceimportget_current_span
27+
fromopentelemetry.traceimportset_span_in_context
28+
fromopentelemetry.trace.statusimportStatus
29+
fromopentelemetry.instrumentation.utilsimporthttp_status_to_canonical_code
30+
31+
USE_OPENTELEMETRY=True
32+
exceptImportError:
33+
_LOGGER.info(
34+
"This service supports OpenTelemetry, but OpenTelemetry could"
35+
"not be imported. To use OpenTelemetry, please install the"
36+
"opentelemetry-api, opentelemetry-sdk, and opentelemetry-instrumentation"
37+
"pip modules. See also"
38+
"https://opentelemetry-python.readthedocs.io/en/stable/getting-started.html"
39+
)
40+
USE_OPENTELEMETRY=False
41+
pass
42+
43+
44+
@contextmanager
45+
defcreate_span(span_name,attributes=None,parent=None):
46+
""" Creates a new span
47+
48+
Args:
49+
span_name (str): the name of the new span
50+
attributes ([dict], optional): A dictionary
51+
containing all attributes to add to a span. Defaults to None.
52+
parent ([dict], optional): A dictionary
53+
containing the attributes of a . Defaults to None.
54+
55+
Yields:
56+
[opentelemetry.trace.Span]: The newly created span, or None if
57+
OpenTelemetry could not be imported
58+
"""
59+
60+
# OpenTelemetry could not be imported.
61+
ifnotUSE_OPENTELEMETRY:
62+
yieldNone
63+
return
64+
65+
tracer=trace.get_tracer(__name__)
66+
67+
ifparentisnotNone:
68+
# Form the parent's context from the parent dict provided
69+
try:
70+
parent_span_context=SpanContext(
71+
trace_id=parent["trace_id"],
72+
span_id=parent["span_id"],
73+
is_remote=parent["is_remote"],
74+
trace_flags=parent["trace_flags"],
75+
trace_state=parent["trace_state"],
76+
)
77+
except:
78+
parent_span_context=None
79+
else:
80+
parent_span_context=None
81+
82+
# Create a new span and yield it
83+
withtracer.start_as_current_span(
84+
span_name,attributes=attributes,parent=parent_span_context
85+
)asspan:
86+
try:
87+
yieldspan
88+
exceptGoogleAPICallErroraserror:
89+
iferror.codeisnotNone:
90+
span.set_status(Status(http_status_to_canonical_code(error.code)))
91+
raise

‎google/cloud/pubsub_v1/publisher/client.py‎

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
importpkg_resources
2121
importthreading
2222
importtime
23+
importsys
24+
importjson
2325

2426
importgrpc
2527
importsix
@@ -29,6 +31,7 @@
2931

3032
fromgoogle.cloud.pubsub_v1import_gapic
3133
fromgoogle.cloud.pubsub_v1importtypes
34+
fromgoogle.cloud.pubsub_v1.opentelemetry_tracingimportcreate_span
3235
fromgoogle.cloud.pubsub_v1.gapicimportpublisher_client
3336
fromgoogle.cloud.pubsub_v1.gapic.transportsimportpublisher_grpc_transport
3437
fromgoogle.cloud.pubsub_v1.publisherimportexceptions
@@ -369,38 +372,47 @@ def publish(self, topic, data, ordering_key="", **attrs):
369372
"be sent as text strings."
370373
)
371374

372-
# Create the Pub/Sub message object.
373-
message=types.PubsubMessage(
374-
data=data,ordering_key=ordering_key,attributes=attrs
375-
)
375+
span_name="{} publisher".format(topic)
376+
span_attributes= {"data":data.decode()}
377+
withcreate_span(span_name,attributes=span_attributes)asspan:
378+
ifspanisnotNone:
379+
# Add the context of the span as an attribute
380+
attrs["googclient_OpenTelemetrySpanContext"]=json.dumps(
381+
span.get_context().__dict__
382+
)
376383

377-
# Messages should go through flow control to prevent excessive
378-
# queuing on the client side (depending on the settings).
379-
try:
380-
self._flow_controller.add(message)
381-
exceptexceptions.FlowControlLimitErrorasexc:
382-
future=futures.Future()
383-
future.set_exception(exc)
384-
returnfuture
384+
# Create the Pub/Sub message object.
385+
message=types.PubsubMessage(
386+
data=data,ordering_key=ordering_key,attributes=attrs
387+
)
385388

386-
defon_publish_done(future):
387-
self._flow_controller.release(message)
389+
# Messages should go through flow control to prevent excessive
390+
# queuing on the client side (depending on the settings).
391+
try:
392+
self._flow_controller.add(message)
393+
exceptexceptions.FlowControlLimitErrorasexc:
394+
future=futures.Future()
395+
future.set_exception(exc)
396+
returnfuture
388397

389-
withself._batch_lock:
390-
ifself._is_stopped:
391-
raiseRuntimeError("Cannot publish on a stopped publisher.")
398+
defon_publish_done(future):
399+
self._flow_controller.release(message)
392400

393-
sequencer=self._get_or_create_sequencer(topic,ordering_key)
401+
withself._batch_lock:
402+
ifself._is_stopped:
403+
raiseRuntimeError("Cannot publish on a stopped publisher.")
394404

395-
# Delegate the publishing to the sequencer.
396-
future=sequencer.publish(message)
397-
future.add_done_callback(on_publish_done)
405+
sequencer=self._get_or_create_sequencer(topic,ordering_key)
398406

399-
# Create a timer thread if necessary to enforce the batching
400-
# timeout.
401-
self._ensure_commit_timer_runs_no_lock()
407+
# Delegate the publishing to the sequencer.
408+
future=sequencer.publish(message)
409+
future.add_done_callback(on_publish_done)
410+
411+
# Create a timer thread if necessary to enforce the batching
412+
# timeout.
413+
self._ensure_commit_timer_runs_no_lock()
402414

403-
returnfuture
415+
returnfuture
404416

405417
defensure_cleanup_and_commit_timer_runs(self):
406418
""" Ensure a cleanup/commit timer thread is running.

‎google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py‎

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
importlogging
2020
importthreading
2121
importuuid
22+
importjson
2223

2324
importgrpc
2425
importsix
2526

2627
fromgoogle.api_coreimportbidi
2728
fromgoogle.api_coreimportexceptions
2829
fromgoogle.cloud.pubsub_v1importtypes
30+
fromgoogle.cloud.pubsub_v1.opentelemetry_tracingimportcreate_span
2931
fromgoogle.cloud.pubsub_v1.subscriber._protocolimportdispatcher
3032
fromgoogle.cloud.pubsub_v1.subscriber._protocolimportheartbeater
3133
fromgoogle.cloud.pubsub_v1.subscriber._protocolimporthistogram
@@ -619,20 +621,40 @@ def _on_response(self, response):
619621

620622
withself._pause_resume_lock:
621623
forreceived_messageinresponse.received_messages:
622-
message=google.cloud.pubsub_v1.subscriber.message.Message(
623-
received_message.message,
624-
received_message.ack_id,
625-
received_message.delivery_attempt,
626-
self._scheduler.queue,
627-
)
628-
self._messages_on_hold.put(message)
629-
self._on_hold_bytes+=message.size
630-
req=requests.LeaseRequest(
631-
ack_id=message.ack_id,
632-
byte_size=message.size,
633-
ordering_key=message.ordering_key,
634-
)
635-
self.leaser.add([req])
624+
if (
625+
"googclient_OpenTelemetrySpanContext"
626+
inreceived_message.message.attributes.keys()
627+
):
628+
publisher_span_context=json.loads(
629+
received_message.message.attributes[
630+
"googclient_OpenTelemetrySpanContext"
631+
]
632+
)
633+
else:
634+
publisher_span_context=None
635+
span_attributes= {
636+
"ack_id":received_message.ack_id,
637+
"delivery_attempt":received_message.delivery_attempt,
638+
}
639+
withcreate_span(
640+
"subscriber",
641+
attributes=span_attributes,
642+
parent=publisher_span_context,
643+
):
644+
message=google.cloud.pubsub_v1.subscriber.message.Message(
645+
received_message.message,
646+
received_message.ack_id,
647+
received_message.delivery_attempt,
648+
self._scheduler.queue,
649+
)
650+
self._messages_on_hold.put(message)
651+
self._on_hold_bytes+=message.size
652+
req=requests.LeaseRequest(
653+
ack_id=message.ack_id,
654+
byte_size=message.size,
655+
ordering_key=message.ordering_key,
656+
)
657+
self.leaser.add([req])
636658

637659
self._maybe_release_messages()
638660

‎noxfile.py‎

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,14 @@ def lint_setup_py(session):
7070

7171
defdefault(session):
7272
# Install all test dependencies, then install this package in-place.
73-
session.install("mock","pytest","pytest-cov")
73+
session.install(
74+
"mock",
75+
"pytest",
76+
"pytest-cov",
77+
"opentelemetry-api",
78+
"opentelemetry-sdk",
79+
"opentelemetry-instrumentation",
80+
)
7481
session.install("-e",".")
7582

7683
# Run py.test against the unit tests.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp