Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit86c50cd

Browse files
committed
Set default stream ACK deadline to subscriptions'
When subscribing, it makes sense to use the configured subscription'smaximum ACK deadline for the streamimng pull, instead of an optimisticminimum of 10 seconds.Using an optimistic deadline affects messages that are put on hold andare not lease managed, because by the time the client dispatches themto the user's callback, the optimistic ACK deadline could have alreadybeen missed, resulting in the backend unnecessary re-sending thosemessages, even if the subscription's ACK deadline has not been hit yet.
1 parentee0f70a commit86c50cd

File tree

3 files changed

+95
-22
lines changed

3 files changed

+95
-22
lines changed

‎pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py‎

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def load(self):
208208
float: The load value.
209209
"""
210210
ifself._leaserisNone:
211-
return0
211+
return0.0
212212

213213
returnmax(
214214
[
@@ -384,14 +384,26 @@ def open(self, callback, on_callback_error):
384384
)
385385

386386
# Create the RPC
387+
subscription=self._client.api.get_subscription(self._subscription)
388+
stream_ack_deadline_seconds=subscription.ack_deadline_seconds
389+
390+
get_initial_request=functools.partial(
391+
self._get_initial_request,stream_ack_deadline_seconds
392+
)
387393
self._rpc=bidi.ResumableBidiRpc(
388394
start_rpc=self._client.api.streaming_pull,
389-
initial_request=self._get_initial_request,
395+
initial_request=get_initial_request,
390396
should_recover=self._should_recover,
391397
throttle_reopen=True,
392398
)
393399
self._rpc.add_done_callback(self._on_rpc_done)
394400

401+
_LOGGER.debug(
402+
"Creating a stream, default ACK deadline set to {} seconds.".format(
403+
stream_ack_deadline_seconds
404+
)
405+
)
406+
395407
# Create references to threads
396408
self._dispatcher=dispatcher.Dispatcher(self,self._scheduler.queue)
397409
self._consumer=bidi.BackgroundConsumer(self._rpc,self._on_response)
@@ -462,12 +474,16 @@ def close(self, reason=None):
462474
forcallbackinself._close_callbacks:
463475
callback(self,reason)
464476

465-
def_get_initial_request(self):
477+
def_get_initial_request(self,stream_ack_deadline_seconds):
466478
"""Return the initial request for the RPC.
467479
468480
This defines the initial request that must always be sent to Pub/Sub
469481
immediately upon opening the subscription.
470482
483+
Args:
484+
stream_ack_deadline_seconds (int):
485+
The default message acknowledge deadline for the stream.
486+
471487
Returns:
472488
google.cloud.pubsub_v1.types.StreamingPullRequest: A request
473489
suitable for being the first request on the stream (and not
@@ -486,7 +502,7 @@ def _get_initial_request(self):
486502
request=types.StreamingPullRequest(
487503
modify_deadline_ack_ids=list(lease_ids),
488504
modify_deadline_seconds=[self.ack_deadline]*len(lease_ids),
489-
stream_ack_deadline_seconds=self.ack_histogram.percentile(99),
505+
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
490506
subscription=self._subscription,
491507
)
492508

@@ -511,14 +527,6 @@ def _on_response(self, response):
511527
self._messages_on_hold.qsize(),
512528
)
513529

514-
# Immediately modack the messages we received, as this tells the server
515-
# that we've received them.
516-
items= [
517-
requests.ModAckRequest(message.ack_id,self._ack_histogram.percentile(99))
518-
formessageinresponse.received_messages
519-
]
520-
self._dispatcher.modify_ack_deadline(items)
521-
522530
invoke_callbacks_for= []
523531

524532
forreceived_messageinresponse.received_messages:
@@ -535,6 +543,15 @@ def _on_response(self, response):
535543
else:
536544
self._messages_on_hold.put(message)
537545

546+
# Immediately (i.e. without waiting for the auto lease management)
547+
# modack the messages we received and not put on hold, as this tells
548+
# the server that we've received them.
549+
items= [
550+
requests.ModAckRequest(message.ack_id,self._ack_histogram.percentile(99))
551+
formessageininvoke_callbacks_for
552+
]
553+
self._dispatcher.modify_ack_deadline(items)
554+
538555
_LOGGER.debug(
539556
"Scheduling callbacks for %s new messages, new total on hold %s.",
540557
len(invoke_callbacks_for),

‎pubsub/tests/system.py‎

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,55 @@ class CallbackError(Exception):
381381
withpytest.raises(CallbackError):
382382
future.result(timeout=30)
383383

384+
deftest_streaming_pull_ack_deadline(
385+
self,publisher,subscriber,project,topic_path,subscription_path,cleanup
386+
):
387+
# Make sure the topic and subscription get deleted.
388+
cleanup.append((publisher.delete_topic,topic_path))
389+
cleanup.append((subscriber.delete_subscription,subscription_path))
390+
391+
# Create a topic and a subscription, then subscribe to the topic. This
392+
# must happen before the messages are published.
393+
publisher.create_topic(topic_path)
394+
395+
# Subscribe to the topic. This must happen before the messages
396+
# are published.
397+
subscriber.create_subscription(
398+
subscription_path,topic_path,ack_deadline_seconds=60
399+
)
400+
401+
# publish some messages and wait for completion
402+
self._publish_messages(publisher,topic_path,batch_sizes=[2])
403+
404+
# subscribe to the topic
405+
callback=StreamingPullCallback(
406+
processing_time=15,# more than the default ACK deadline of 10 seconds
407+
resolve_at_msg_count=3,# one more than the published messages count
408+
)
409+
flow_control=types.FlowControl(max_messages=1)
410+
sub_future=subscriber.subscribe(
411+
subscription_path,callback,flow_control=flow_control
412+
)
413+
414+
# We expect to process the first two messages in 2 * 15 seconds, and
415+
# any duplicate message that is re-sent by the backend in additional
416+
# 15 seconds, totalling 45 seconds (+ overhead) --> if there have been
417+
# no duplicates in 60 seconds, we can reasonably assume that there
418+
# won't be any.
419+
try:
420+
callback.done_future.result(timeout=60)
421+
exceptexceptions.TimeoutError:
422+
# future timed out, because we received no excessive messages
423+
assertsorted(callback.seen_message_ids)== [1,2]
424+
else:
425+
pytest.fail(
426+
"Expected to receive 2 messages, but got at least {}.".format(
427+
len(callback.seen_message_ids)
428+
)
429+
)
430+
finally:
431+
sub_future.cancel()
432+
384433
deftest_streaming_pull_max_messages(
385434
self,publisher,topic_path,subscriber,subscription_path,cleanup
386435
):

‎pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py‎

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,11 @@ def test_heartbeat_inactive():
405405
)
406406
deftest_open(heartbeater,dispatcher,leaser,background_consumer,resumable_bidi_rpc):
407407
manager=make_manager()
408+
manager._client.api.get_subscription.return_value=types.Subscription(
409+
name="projects/foo/subscriptions/bar",
410+
topic="projects/foo/topics/baz",
411+
ack_deadline_seconds=123,
412+
)
408413

409414
manager.open(mock.sentinel.callback,mock.sentinel.on_callback_error)
410415

@@ -426,10 +431,14 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
426431

427432
resumable_bidi_rpc.assert_called_once_with(
428433
start_rpc=manager._client.api.streaming_pull,
429-
initial_request=manager._get_initial_request,
434+
initial_request=mock.ANY,
430435
should_recover=manager._should_recover,
431436
throttle_reopen=True,
432437
)
438+
initial_request_arg=resumable_bidi_rpc.call_args.kwargs["initial_request"]
439+
assertinitial_request_arg.func==manager._get_initial_request
440+
assertinitial_request_arg.args[0]==123
441+
433442
resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
434443
manager._on_rpc_done
435444
)
@@ -574,11 +583,11 @@ def test__get_initial_request():
574583
manager._leaser=mock.create_autospec(leaser.Leaser,instance=True)
575584
manager._leaser.ack_ids= ["1","2"]
576585

577-
initial_request=manager._get_initial_request()
586+
initial_request=manager._get_initial_request(123)
578587

579588
assertisinstance(initial_request,types.StreamingPullRequest)
580589
assertinitial_request.subscription=="subscription-name"
581-
assertinitial_request.stream_ack_deadline_seconds==10
590+
assertinitial_request.stream_ack_deadline_seconds==123
582591
assertinitial_request.modify_deadline_ack_ids== ["1","2"]
583592
assertinitial_request.modify_deadline_seconds== [10,10]
584593

@@ -587,11 +596,11 @@ def test__get_initial_request_wo_leaser():
587596
manager=make_manager()
588597
manager._leaser=None
589598

590-
initial_request=manager._get_initial_request()
599+
initial_request=manager._get_initial_request(123)
591600

592601
assertisinstance(initial_request,types.StreamingPullRequest)
593602
assertinitial_request.subscription=="subscription-name"
594-
assertinitial_request.stream_ack_deadline_seconds==10
603+
assertinitial_request.stream_ack_deadline_seconds==123
595604
assertinitial_request.modify_deadline_ack_ids== []
596605
assertinitial_request.modify_deadline_seconds== []
597606

@@ -660,12 +669,10 @@ def test__on_response_with_leaser_overload():
660669
# are called in the expected way.
661670
manager._on_response(response)
662671

672+
# only the messages that are added to the lease management and dispatched to
673+
# callbacks should have their ACK deadline extended
663674
dispatcher.modify_ack_deadline.assert_called_once_with(
664-
[
665-
requests.ModAckRequest("fack",10),
666-
requests.ModAckRequest("back",10),
667-
requests.ModAckRequest("zack",10),
668-
]
675+
[requests.ModAckRequest("fack",10)]
669676
)
670677

671678
# one message should be scheduled, the leaser capacity allows for it

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp