Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1b6f3d2

Browse files
feat: Add OpenTelemetry support for Subscribe Side (#1252)
1 parent1ae49de commit1b6f3d2

17 files changed

+2066
-30
lines changed

‎google/cloud/pubsub_v1/open_telemetry/context_propagation.py‎

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
fromopentelemetry.propagators.textmapimportSetter
15+
fromtypingimportOptional,List
16+
17+
fromopentelemetry.propagators.textmapimportSetter,Getter
1618

1719
fromgoogle.pubsub_v1importPubsubMessage
1820

@@ -37,3 +39,17 @@ def set(self, carrier: PubsubMessage, key: str, value: str) -> None:
3739
None
3840
"""
3941
carrier.attributes["googclient_"+key]=value
42+
43+
44+
classOpenTelemetryContextGetter(Getter):
45+
"""
46+
Used by Open Telemetry for context propagation.
47+
"""
48+
49+
defget(self,carrier:PubsubMessage,key:str)->Optional[List[str]]:
50+
if ("googclient_"+key)notincarrier.attributes:
51+
returnNone
52+
return [carrier.attributes["googclient_"+key]]
53+
54+
defkeys(self,carrier:PubsubMessage)->List[str]:
55+
returnlist(map(str,carrier.attributes.keys()))
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# Copyright 2024, Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
fromtypingimportOptional,List
16+
fromdatetimeimportdatetime
17+
18+
fromopentelemetryimporttrace,context
19+
fromopentelemetry.trace.propagation.tracecontextimportTraceContextTextMapPropagator
20+
fromopentelemetry.trace.propagationimportset_span_in_context
21+
22+
fromgoogle.cloud.pubsub_v1.open_telemetry.context_propagationimport (
23+
OpenTelemetryContextGetter,
24+
)
25+
fromgoogle.pubsub_v1.typesimportPubsubMessage
26+
27+
_OPEN_TELEMETRY_TRACER_NAME:str="google.cloud.pubsub_v1"
28+
_OPEN_TELEMETRY_MESSAGING_SYSTEM:str="gcp_pubsub"
29+
30+
31+
classSubscribeOpenTelemetry:
32+
def__init__(self,message:PubsubMessage):
33+
self._message:PubsubMessage=message
34+
35+
# subscribe span will be initialized by the `start_subscribe_span`
36+
# method.
37+
self._subscribe_span:Optional[trace.Span]=None
38+
39+
# subscriber concurrency control span will be initialized by the
40+
# `start_subscribe_concurrency_control_span` method.
41+
self._concurrency_control_span:Optional[trace.Span]=None
42+
43+
# scheduler span will be initialized by the
44+
# `start_subscribe_scheduler_span` method.
45+
self._scheduler_span:Optional[trace.Span]=None
46+
47+
# This will be set by `start_subscribe_span` method and will be used
48+
# for other spans, such as process span.
49+
self._subscription_id:Optional[str]=None
50+
51+
# This will be set by `start_process_span` method.
52+
self._process_span:Optional[trace.Span]=None
53+
54+
# This will be set by `start_subscribe_span` method, if a publisher create span
55+
# context was extracted from trace propagation. And will be used by spans like
56+
# proces span to add links to the publisher create span.
57+
self._publisher_create_span_context:Optional[context.Context]=None
58+
59+
# This will be set by `start_subscribe_span` method and will be used
60+
# for other spans, such as modack span.
61+
self._project_id:Optional[str]=None
62+
63+
@property
64+
defsubscription_id(self)->Optional[str]:
65+
returnself._subscription_id
66+
67+
@property
68+
defproject_id(self)->Optional[str]:
69+
returnself._project_id
70+
71+
@property
72+
defsubscribe_span(self)->Optional[trace.Span]:
73+
returnself._subscribe_span
74+
75+
defstart_subscribe_span(
76+
self,
77+
subscription:str,
78+
exactly_once_enabled:bool,
79+
ack_id:str,
80+
delivery_attempt:int,
81+
)->None:
82+
tracer=trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
83+
parent_span_context=TraceContextTextMapPropagator().extract(
84+
carrier=self._message,
85+
getter=OpenTelemetryContextGetter(),
86+
)
87+
self._publisher_create_span_context=parent_span_context
88+
split_subscription:List[str]=subscription.split("/")
89+
assertlen(split_subscription)==4
90+
subscription_short_name=split_subscription[3]
91+
self._project_id=split_subscription[1]
92+
self._subscription_id=subscription_short_name
93+
withtracer.start_as_current_span(
94+
name=f"{subscription_short_name} subscribe",
95+
context=parent_span_contextifparent_span_contextelseNone,
96+
kind=trace.SpanKind.CONSUMER,
97+
attributes={
98+
"messaging.system":_OPEN_TELEMETRY_MESSAGING_SYSTEM,
99+
"messaging.destination.name":subscription_short_name,
100+
"gcp.project_id":subscription.split("/")[1],
101+
"messaging.message.id":self._message.message_id,
102+
"messaging.message.body.size":len(self._message.data),
103+
"messaging.gcp_pubsub.message.ack_id":ack_id,
104+
"messaging.gcp_pubsub.message.ordering_key":self._message.ordering_key,
105+
"messaging.gcp_pubsub.message.exactly_once_delivery":exactly_once_enabled,
106+
"code.function":"_on_response",
107+
"messaging.gcp_pubsub.message.delivery_attempt":delivery_attempt,
108+
},
109+
end_on_exit=False,
110+
)assubscribe_span:
111+
self._subscribe_span=subscribe_span
112+
113+
defadd_subscribe_span_event(self,event:str)->None:
114+
assertself._subscribe_spanisnotNone
115+
self._subscribe_span.add_event(
116+
name=event,
117+
attributes={
118+
"timestamp":str(datetime.now()),
119+
},
120+
)
121+
122+
defend_subscribe_span(self)->None:
123+
assertself._subscribe_spanisnotNone
124+
self._subscribe_span.end()
125+
126+
defset_subscribe_span_result(self,result:str)->None:
127+
assertself._subscribe_spanisnotNone
128+
self._subscribe_span.set_attribute(
129+
key="messaging.gcp_pubsub.result",
130+
value=result,
131+
)
132+
133+
defstart_subscribe_concurrency_control_span(self)->None:
134+
assertself._subscribe_spanisnotNone
135+
tracer=trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
136+
withtracer.start_as_current_span(
137+
name="subscriber concurrency control",
138+
kind=trace.SpanKind.INTERNAL,
139+
context=set_span_in_context(self._subscribe_span),
140+
end_on_exit=False,
141+
)asconcurrency_control_span:
142+
self._concurrency_control_span=concurrency_control_span
143+
144+
defend_subscribe_concurrency_control_span(self)->None:
145+
assertself._concurrency_control_spanisnotNone
146+
self._concurrency_control_span.end()
147+
148+
defstart_subscribe_scheduler_span(self)->None:
149+
assertself._subscribe_spanisnotNone
150+
tracer=trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
151+
withtracer.start_as_current_span(
152+
name="subscriber scheduler",
153+
kind=trace.SpanKind.INTERNAL,
154+
context=set_span_in_context(self._subscribe_span),
155+
end_on_exit=False,
156+
)asscheduler_span:
157+
self._scheduler_span=scheduler_span
158+
159+
defend_subscribe_scheduler_span(self)->None:
160+
assertself._scheduler_spanisnotNone
161+
self._scheduler_span.end()
162+
163+
defstart_process_span(self)->None:
164+
assertself._subscribe_spanisnotNone
165+
tracer=trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
166+
publish_create_span_link:Optional[trace.Link]=None
167+
ifself._publisher_create_span_context:
168+
publish_create_span:trace.Span=trace.get_current_span(
169+
self._publisher_create_span_context
170+
)
171+
span_context:Optional[
172+
trace.SpanContext
173+
]=publish_create_span.get_span_context()
174+
publish_create_span_link= (
175+
trace.Link(span_context)ifspan_contextelseNone
176+
)
177+
178+
withtracer.start_as_current_span(
179+
name=f"{self._subscription_id} process",
180+
attributes={
181+
"messaging.system":_OPEN_TELEMETRY_MESSAGING_SYSTEM,
182+
},
183+
kind=trace.SpanKind.INTERNAL,
184+
context=set_span_in_context(self._subscribe_span),
185+
links=[publish_create_span_link]ifpublish_create_span_linkelseNone,
186+
end_on_exit=False,
187+
)asprocess_span:
188+
self._process_span=process_span
189+
190+
defend_process_span(self)->None:
191+
assertself._process_spanisnotNone
192+
self._process_span.end()
193+
194+
defadd_process_span_event(self,event:str)->None:
195+
assertself._process_spanisnotNone
196+
self._process_span.add_event(
197+
name=event,
198+
attributes={
199+
"timestamp":str(datetime.now()),
200+
},
201+
)
202+
203+
204+
defstart_modack_span(
205+
subscribe_span_links:List[trace.Link],
206+
subscription_id:Optional[str],
207+
message_count:int,
208+
deadline:float,
209+
project_id:Optional[str],
210+
code_function:str,
211+
receipt_modack:bool,
212+
)->trace.Span:
213+
assertsubscription_idisnotNone
214+
assertproject_idisnotNone
215+
tracer=trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
216+
withtracer.start_as_current_span(
217+
name=f"{subscription_id} modack",
218+
attributes={
219+
"messaging.system":_OPEN_TELEMETRY_MESSAGING_SYSTEM,
220+
"messaging.batch.message_count":message_count,
221+
"messaging.gcp_pubsub.message.ack_deadline":deadline,
222+
"messaging.destination.name":subscription_id,
223+
"gcp.project_id":project_id,
224+
"messaging.operation.name":"modack",
225+
"code.function":code_function,
226+
"messaging.gcp_pubsub.is_receipt_modack":receipt_modack,
227+
},
228+
links=subscribe_span_links,
229+
kind=trace.SpanKind.CLIENT,
230+
end_on_exit=False,
231+
)asmodack_span:
232+
returnmodack_span
233+
234+
235+
defstart_ack_span(
236+
subscription_id:str,
237+
message_count:int,
238+
project_id:str,
239+
links:List[trace.Link],
240+
)->trace.Span:
241+
tracer=trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
242+
withtracer.start_as_current_span(
243+
name=f"{subscription_id} ack",
244+
attributes={
245+
"messaging.system":_OPEN_TELEMETRY_MESSAGING_SYSTEM,
246+
"messaging.batch.message_count":message_count,
247+
"messaging.operation":"ack",
248+
"gcp.project_id":project_id,
249+
"messaging.destination.name":subscription_id,
250+
"code.function":"ack",
251+
},
252+
kind=trace.SpanKind.CLIENT,
253+
links=links,
254+
end_on_exit=False,
255+
)asack_span:
256+
returnack_span
257+
258+
259+
defstart_nack_span(
260+
subscription_id:str,
261+
message_count:int,
262+
project_id:str,
263+
links:List[trace.Link],
264+
)->trace.Span:
265+
tracer=trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
266+
withtracer.start_as_current_span(
267+
name=f"{subscription_id} nack",
268+
attributes={
269+
"messaging.system":_OPEN_TELEMETRY_MESSAGING_SYSTEM,
270+
"messaging.batch.message_count":message_count,
271+
"messaging.operation":"nack",
272+
"gcp.project_id":project_id,
273+
"messaging.destination.name":subscription_id,
274+
"code.function":"modify_ack_deadline",
275+
},
276+
kind=trace.SpanKind.CLIENT,
277+
links=links,
278+
end_on_exit=False,
279+
)asnack_span:
280+
returnnack_span

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp