Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite907f6e

Browse files
authored
fix: ACK deadline set for received messages can be too low (#416)
Fixes#413.This PR makes sure that the ACK deadline set for the received messages is always consistent with what the leaser uses internally when extending the ACK deadlines for the leased messages.See the issue description and a [comment](#413 (comment)) explaining a possible sequence of events that lead to a bug.**PR checklist**- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-pubsub/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea- [x] Ensure the tests and linter pass- [x] Code coverage does not decrease (if any source code was changed)- [x] Appropriate docs were updated (if necessary)
1 parentde5429a commite907f6e

File tree

7 files changed

+191
-64
lines changed

7 files changed

+191
-64
lines changed

‎google/cloud/pubsub_v1/subscriber/_protocol/histogram.py‎

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
from __future__importabsolute_import,division
1616

1717

18+
MIN_ACK_DEADLINE=10
19+
MAX_ACK_DEADLINE=600
20+
21+
1822
classHistogram(object):
1923
"""Representation of a single histogram.
2024
@@ -27,8 +31,9 @@ class Histogram(object):
2731
are free to use a different formula.
2832
2933
The precision of data stored is to the nearest integer. Additionally,
30-
values outside the range of ``10 <= x <= 600`` are stored as ``10`` or
31-
``600``, since these are the boundaries of leases in the actual API.
34+
values outside the range of ``MIN_ACK_DEADLINE <= x <= MAX_ACK_DEADLINE`` are stored
35+
as ``MIN_ACK_DEADLINE`` or ``MAX_ACK_DEADLINE``, since these are the boundaries of
36+
leases in the actual API.
3237
"""
3338

3439
def__init__(self,data=None):
@@ -83,41 +88,43 @@ def __repr__(self):
8388
defmax(self):
8489
"""Return the maximum value in this histogram.
8590
86-
If there are no values in the histogram at all, return600.
91+
If there are no values in the histogram at all, return``MAX_ACK_DEADLINE``.
8792
8893
Returns:
8994
int: The maximum value in the histogram.
9095
"""
9196
iflen(self._data)==0:
92-
return600
97+
returnMAX_ACK_DEADLINE
9398
returnnext(iter(reversed(sorted(self._data.keys()))))
9499

95100
@property
96101
defmin(self):
97102
"""Return the minimum value in this histogram.
98103
99-
If there are no values in the histogram at all, return10.
104+
If there are no values in the histogram at all, return``MIN_ACK_DEADLINE``.
100105
101106
Returns:
102107
int: The minimum value in the histogram.
103108
"""
104109
iflen(self._data)==0:
105-
return10
110+
returnMIN_ACK_DEADLINE
106111
returnnext(iter(sorted(self._data.keys())))
107112

108113
defadd(self,value):
109114
"""Add the value to this histogram.
110115
111116
Args:
112-
value (int): The value. Values outside of ``10 <= x <= 600``
113-
will be raised to ``10`` or reduced to ``600``.
117+
value (int): The value. Values outside of
118+
``MIN_ACK_DEADLINE <= x <= MAX_ACK_DEADLINE``
119+
will be raised to ``MIN_ACK_DEADLINE`` or reduced to
120+
``MAX_ACK_DEADLINE``.
114121
"""
115122
# If the value is out of bounds, bring it in bounds.
116123
value=int(value)
117-
ifvalue<10:
118-
value=10
119-
ifvalue>600:
120-
value=600
124+
ifvalue<MIN_ACK_DEADLINE:
125+
value=MIN_ACK_DEADLINE
126+
elifvalue>MAX_ACK_DEADLINE:
127+
value=MAX_ACK_DEADLINE
121128

122129
# Add the value to the histogram's data dictionary.
123130
self._data.setdefault(value,0)
@@ -129,7 +136,7 @@ def percentile(self, percent):
129136
130137
Args:
131138
percent (Union[int, float]): The precentile being sought. The
132-
default consumer implementationsuseconsistently use ``99``.
139+
default consumer implementations consistently use ``99``.
133140
134141
Returns:
135142
int: The value corresponding to the requested percentile.
@@ -150,5 +157,5 @@ def percentile(self, percent):
150157
returnk
151158

152159
# The only way to get here is if there was no data.
153-
# In this case, just return10 seconds.
154-
return10
160+
# In this case, just returnthe shortest possible deadline.
161+
returnMIN_ACK_DEADLINE

‎google/cloud/pubsub_v1/subscriber/_protocol/leaser.py‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ def maintain_leases(self):
128128
# Determine the appropriate duration for the lease. This is
129129
# based off of how long previous messages have taken to ack, with
130130
# a sensible default and within the ranges allowed by Pub/Sub.
131-
deadline=self._manager.ack_deadline
131+
# Also update the deadline currently used if enough new ACK data has been
132+
# gathered since the last deadline update.
133+
deadline=self._manager._obtain_ack_deadline(maybe_update=True)
132134
_LOGGER.debug("The current deadline value is %d seconds.",deadline)
133135

134136
# Make a copy of the leased messages. This is needed because it's

‎google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py‎

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def __init__(
143143
self._await_callbacks_on_shutdown=await_callbacks_on_shutdown
144144
self._ack_histogram=histogram.Histogram()
145145
self._last_histogram_size=0
146-
self._ack_deadline=10
146+
self._ack_deadline=histogram.MIN_ACK_DEADLINE
147147
self._rpc=None
148148
self._callback=None
149149
self._closing=threading.Lock()
@@ -179,6 +179,11 @@ def __init__(
179179
# currently on hold.
180180
self._pause_resume_lock=threading.Lock()
181181

182+
# A lock protecting the current ACK deadline used in the lease management. This
183+
# value can be potentially updated both by the leaser thread and by the message
184+
# consumer thread when invoking the internal _on_response() callback.
185+
self._ack_deadline_lock=threading.Lock()
186+
182187
# The threads created in ``.open()``.
183188
self._dispatcher=None
184189
self._leaser=None
@@ -223,29 +228,49 @@ def ack_histogram(self):
223228

224229
@property
225230
defack_deadline(self):
226-
"""Return the current ack deadline based on historical time-to-ack.
227-
228-
This method is "sticky". It will only perform the computations to
229-
check on the right ack deadline if the histogram has gained a
230-
significant amount of new information.
231+
"""Return the current ACK deadline based on historical data without updating it.
231232
232233
Returns:
233234
int: The ack deadline.
234235
"""
235-
target_size=min(
236-
self._last_histogram_size*2,self._last_histogram_size+100
237-
)
238-
hist_size=len(self.ack_histogram)
236+
returnself._obtain_ack_deadline(maybe_update=False)
237+
238+
def_obtain_ack_deadline(self,maybe_update:bool)->int:
239+
"""The actual `ack_deadline` implementation.
240+
241+
This method is "sticky". It will only perform the computations to check on the
242+
right ACK deadline if explicitly requested AND if the histogram with past
243+
time-to-ack data has gained a significant amount of new information.
244+
245+
Args:
246+
maybe_update (bool):
247+
If ``True``, also update the current ACK deadline before returning it if
248+
enough new ACK data has been gathered.
239249
240-
ifhist_size>target_size:
241-
self._last_histogram_size=hist_size
242-
self._ack_deadline=self.ack_histogram.percentile(percent=99)
250+
Returns:
251+
int: The current ACK deadline in seconds to use.
252+
"""
253+
withself._ack_deadline_lock:
254+
ifnotmaybe_update:
255+
returnself._ack_deadline
243256

244-
ifself.flow_control.max_duration_per_lease_extension>0:
245-
self._ack_deadline=min(
246-
self._ack_deadline,self.flow_control.max_duration_per_lease_extension
257+
target_size=min(
258+
self._last_histogram_size*2,self._last_histogram_size+100
247259
)
248-
returnself._ack_deadline
260+
hist_size=len(self.ack_histogram)
261+
262+
ifhist_size>target_size:
263+
self._last_histogram_size=hist_size
264+
self._ack_deadline=self.ack_histogram.percentile(percent=99)
265+
266+
ifself.flow_control.max_duration_per_lease_extension>0:
267+
# The setting in flow control could be too low, adjust if needed.
268+
flow_control_setting=max(
269+
self.flow_control.max_duration_per_lease_extension,
270+
histogram.MIN_ACK_DEADLINE,
271+
)
272+
self._ack_deadline=min(self._ack_deadline,flow_control_setting)
273+
returnself._ack_deadline
249274

250275
@property
251276
defload(self):
@@ -490,7 +515,7 @@ def open(self, callback, on_callback_error):
490515
)
491516

492517
# Create the RPC
493-
stream_ack_deadline_seconds=self.ack_histogram.percentile(99)
518+
stream_ack_deadline_seconds=self.ack_deadline
494519

495520
get_initial_request=functools.partial(
496521
self._get_initial_request,stream_ack_deadline_seconds
@@ -688,7 +713,7 @@ def _on_response(self, response):
688713
# modack the messages we received, as this tells the server that we've
689714
# received them.
690715
items= [
691-
requests.ModAckRequest(message.ack_id,self._ack_histogram.percentile(99))
716+
requests.ModAckRequest(message.ack_id,self.ack_deadline)
692717
formessageinreceived_messages
693718
]
694719
self._dispatcher.modify_ack_deadline(items)

‎google/cloud/pubsub_v1/types.py‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ class LimitExceededBehavior(str, enum.Enum):
152152
FlowControl.max_duration_per_lease_extension.__doc__= (
153153
"The max amount of time in seconds for a single lease extension attempt. "
154154
"Bounds the delay before a message redelivery if the subscriber "
155-
"fails to extend the deadline."
155+
"fails to extend the deadline. Must be between 10 and 600 (inclusive). Ignored "
156+
"if set to 0."
156157
)
157158

158159

‎tests/unit/pubsub_v1/subscriber/test_histogram.py‎

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def test_contains():
3333

3434
deftest_max():
3535
histo=histogram.Histogram()
36-
asserthisto.max==600
36+
asserthisto.max==histogram.MAX_ACK_DEADLINE
3737
histo.add(120)
3838
asserthisto.max==120
3939
histo.add(150)
@@ -44,7 +44,7 @@ def test_max():
4444

4545
deftest_min():
4646
histo=histogram.Histogram()
47-
asserthisto.min==10
47+
asserthisto.min==histogram.MIN_ACK_DEADLINE
4848
histo.add(60)
4949
asserthisto.min==60
5050
histo.add(30)
@@ -63,20 +63,23 @@ def test_add():
6363

6464
deftest_add_lower_limit():
6565
histo=histogram.Histogram()
66-
histo.add(5)
67-
assert5notinhisto
68-
assert10inhisto
66+
low_value=histogram.MIN_ACK_DEADLINE-1
67+
histo.add(low_value)
68+
assertlow_valuenotinhisto
69+
asserthistogram.MIN_ACK_DEADLINEinhisto
6970

7071

7172
deftest_add_upper_limit():
7273
histo=histogram.Histogram()
73-
histo.add(12000)
74-
assert12000notinhisto
75-
assert600inhisto
74+
high_value=histogram.MAX_ACK_DEADLINE+1
75+
histo.add(high_value)
76+
asserthigh_valuenotinhisto
77+
asserthistogram.MAX_ACK_DEADLINEinhisto
7678

7779

7880
deftest_percentile():
7981
histo=histogram.Histogram()
82+
asserthisto.percentile(42)==histogram.MIN_ACK_DEADLINE# default when empty
8083
[histo.add(i)foriinrange(101,201)]
8184
asserthisto.percentile(100)==200
8285
asserthisto.percentile(101)==200

‎tests/unit/pubsub_v1/subscriber/test_leaser.py‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def create_manager(flow_control=types.FlowControl()):
8484
manager.is_active=True
8585
manager.flow_control=flow_control
8686
manager.ack_histogram=histogram.Histogram()
87-
manager.ack_deadline=10
87+
manager._obtain_ack_deadline.return_value=10
8888
returnmanager
8989

9090

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp