Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc3b9327

Browse files
authored
feat(pubsub): add delivery attempt property to message object received by user code (#10205)
- Return None when a DeadLetterPolicy hasn't been set on the subscription.
1 parentad93c6c commitc3b9327

File tree

4 files changed

+80
-6
lines changed

4 files changed

+80
-6
lines changed

‎pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,10 @@ def _on_response(self, response):
569569

570570
forreceived_messageinresponse.received_messages:
571571
message=google.cloud.pubsub_v1.subscriber.message.Message(
572-
received_message.message,received_message.ack_id,self._scheduler.queue
572+
received_message.message,
573+
received_message.ack_id,
574+
received_message.delivery_attempt,
575+
self._scheduler.queue,
573576
)
574577
# Making a decision based on the load, and modifying the data that
575578
# affects the load -> needs a lock, as that state can be modified

‎pubsub/google/cloud/pubsub_v1/subscriber/message.py‎

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class Message(object):
7070
published.
7171
"""
7272

73-
def__init__(self,message,ack_id,request_queue):
73+
def__init__(self,message,ack_id,delivery_attempt,request_queue):
7474
"""Construct the Message.
7575
7676
.. note::
@@ -82,12 +82,16 @@ def __init__(self, message, ack_id, request_queue):
8282
message (~.pubsub_v1.types.PubsubMessage): The message received
8383
from Pub/Sub.
8484
ack_id (str): The ack_id received from Pub/Sub.
85+
delivery_attempt (int): The delivery attempt counter received
86+
from Pub/Sub if a DeadLetterPolicy is set on the subscription,
87+
and zero otherwise.
8588
request_queue (queue.Queue): A queue provided by the policy that
8689
can accept requests; the policy is responsible for handling
8790
those requests.
8891
"""
8992
self._message=message
9093
self._ack_id=ack_id
94+
self._delivery_attempt=delivery_attemptifdelivery_attempt>0elseNone
9195
self._request_queue=request_queue
9296
self.message_id=message.message_id
9397

@@ -162,6 +166,30 @@ def ack_id(self):
162166
"""str: the ID used to ack the message."""
163167
returnself._ack_id
164168

169+
@property
170+
defdelivery_attempt(self):
171+
"""The delivery attempt counter is 1 + (the sum of number of NACKs
172+
and number of ack_deadline exceeds) for this message. It is set to None
173+
if a DeadLetterPolicy is not set on the subscription.
174+
175+
A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline
176+
exceeds event is whenever a message is not acknowledged within
177+
ack_deadline. Note that ack_deadline is initially
178+
Subscription.ackDeadlineSeconds, but may get extended automatically by
179+
the client library.
180+
181+
The first delivery of a given message will have this value as 1. The value
182+
is calculated at best effort and is approximate.
183+
184+
EXPERIMENTAL: This feature is part of a closed alpha release. This
185+
API might be changed in backward-incompatible ways and is not recommended
186+
for production use. It is not subject to any SLA or deprecation policy.
187+
188+
Returns:
189+
Optional[int]: The delivery attempt counter or None.
190+
"""
191+
returnself._delivery_attempt
192+
165193
defack(self):
166194
"""Acknowledge the given message.
167195

‎pubsub/tests/unit/pubsub_v1/subscriber/test_message.py‎

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,21 @@
3333
PUBLISHED_SECONDS=datetime_helpers.to_milliseconds(PUBLISHED)//1000
3434

3535

36-
defcreate_message(data,ack_id="ACKID",**attrs):
36+
defcreate_message(data,ack_id="ACKID",delivery_attempt=0,**attrs):
3737
withmock.patch.object(time,"time")astime_:
3838
time_.return_value=RECEIVED_SECONDS
3939
msg=message.Message(
40-
types.PubsubMessage(
40+
message=types.PubsubMessage(
4141
attributes=attrs,
4242
data=data,
4343
message_id="message_id",
4444
publish_time=timestamp_pb2.Timestamp(
4545
seconds=PUBLISHED_SECONDS,nanos=PUBLISHED_MICROS*1000
4646
),
4747
),
48-
ack_id,
49-
queue.Queue(),
48+
ack_id=ack_id,
49+
delivery_attempt=delivery_attempt,
50+
request_queue=queue.Queue(),
5051
)
5152
returnmsg
5253

@@ -72,6 +73,17 @@ def test_ack_id():
7273
assertmsg.ack_id==ack_id
7374

7475

76+
deftest_delivery_attempt():
77+
delivery_attempt=10
78+
msg=create_message(b"foo",delivery_attempt=delivery_attempt)
79+
assertmsg.delivery_attempt==delivery_attempt
80+
81+
82+
deftest_delivery_attempt_is_none():
83+
msg=create_message(b"foo",delivery_attempt=0)
84+
assertmsg.delivery_attemptisNone
85+
86+
7587
deftest_publish_time():
7688
msg=create_message(b"foo")
7789
assertmsg.publish_time==PUBLISHED

‎pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py‎

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,37 @@ def test__get_initial_request_wo_leaser():
627627
assertinitial_request.modify_deadline_seconds== []
628628

629629

630+
deftest__on_response_delivery_attempt():
631+
manager,_,dispatcher,leaser,_,scheduler=make_running_manager()
632+
manager._callback=mock.sentinel.callback
633+
634+
# Set up the messages.
635+
response=types.StreamingPullResponse(
636+
received_messages=[
637+
types.ReceivedMessage(
638+
ack_id="fack",message=types.PubsubMessage(data=b"foo",message_id="1")
639+
),
640+
types.ReceivedMessage(
641+
ack_id="back",
642+
message=types.PubsubMessage(data=b"bar",message_id="2"),
643+
delivery_attempt=6,
644+
),
645+
]
646+
)
647+
648+
# adjust message bookkeeping in leaser
649+
fake_leaser_add(leaser,init_msg_count=0,assumed_msg_size=42)
650+
651+
manager._on_response(response)
652+
653+
schedule_calls=scheduler.schedule.mock_calls
654+
assertlen(schedule_calls)==2
655+
msg1=schedule_calls[0][1][1]
656+
assertmsg1.delivery_attemptisNone
657+
msg2=schedule_calls[1][1][1]
658+
assertmsg2.delivery_attempt==6
659+
660+
630661
deftest__on_response_no_leaser_overload():
631662
manager,_,dispatcher,leaser,_,scheduler=make_running_manager()
632663
manager._callback=mock.sentinel.callback

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp