Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite081beb

Browse files
authored
Fix: Surface Fatal Stream Errors to Future; Adjust Retryable Error Codes (#1422)
1 parent272b09f commite081beb

File tree

2 files changed

+92
-15
lines changed

2 files changed

+92
-15
lines changed

‎google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py‎

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
importcollections
1818
importfunctools
19+
importinspect
1920
importitertools
2021
importlogging
2122
importthreading
@@ -62,14 +63,22 @@
6263
_REGULAR_SHUTDOWN_THREAD_NAME="Thread-RegularStreamShutdown"
6364
_RPC_ERROR_THREAD_NAME="Thread-OnRpcTerminated"
6465
_RETRYABLE_STREAM_ERRORS= (
66+
exceptions.Aborted,
6567
exceptions.DeadlineExceeded,
66-
exceptions.ServiceUnavailable,
68+
exceptions.GatewayTimeout,
6769
exceptions.InternalServerError,
70+
exceptions.ResourceExhausted,
71+
exceptions.ServiceUnavailable,
6872
exceptions.Unknown,
69-
exceptions.GatewayTimeout,
70-
exceptions.Aborted,
7173
)
72-
_TERMINATING_STREAM_ERRORS= (exceptions.Cancelled,)
74+
_TERMINATING_STREAM_ERRORS= (
75+
exceptions.Cancelled,
76+
exceptions.InvalidArgument,
77+
exceptions.NotFound,
78+
exceptions.PermissionDenied,
79+
exceptions.Unauthenticated,
80+
exceptions.Unauthorized,
81+
)
7382
_MAX_LOAD=1.0
7483
"""The load threshold above which to pause the incoming message stream."""
7584

@@ -98,6 +107,13 @@
98107
code_pb2.UNAVAILABLE,
99108
}
100109

110+
# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform
111+
# callers on unrecoverable errors. We can only pass this arg if it's available in the
112+
# `BackgroundConsumer` spec.
113+
_SHOULD_USE_ON_FATAL_ERROR_CALLBACK="on_fatal_exception"ininspect.getfullargspec(
114+
bidi.BackgroundConsumer
115+
)
116+
101117

102118
def_wrap_as_exception(maybe_exception:Any)->BaseException:
103119
"""Wrap an object as a Python exception, if needed.
@@ -876,7 +892,18 @@ def open(
876892
assertself._schedulerisnotNone
877893
scheduler_queue=self._scheduler.queue
878894
self._dispatcher=dispatcher.Dispatcher(self,scheduler_queue)
879-
self._consumer=bidi.BackgroundConsumer(self._rpc,self._on_response)
895+
896+
# `on_fatal_exception` is only available in more recent library versions.
897+
# For backwards compatibility reasons, we only pass it when `google-api-core` supports it.
898+
if_SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
899+
self._consumer=bidi.BackgroundConsumer(
900+
self._rpc,
901+
self._on_response,
902+
on_fatal_exception=self._on_fatal_exception,
903+
)
904+
else:
905+
self._consumer=bidi.BackgroundConsumer(self._rpc,self._on_response)
906+
880907
self._leaser=leaser.Leaser(self)
881908
self._heartbeater=heartbeater.Heartbeater(self)
882909

@@ -1247,6 +1274,17 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
12471274

12481275
self.maybe_pause_consumer()
12491276

1277+
def_on_fatal_exception(self,exception:BaseException)->None:
1278+
"""
1279+
Called whenever `self.consumer` receives a non-retryable exception.
1280+
We close the manager on such non-retryable cases.
1281+
"""
1282+
_LOGGER.exception(
1283+
"Streaming pull terminating after receiving non-recoverable error: %s",
1284+
exception,
1285+
)
1286+
self.close(exception)
1287+
12501288
def_should_recover(self,exception:BaseException)->bool:
12511289
"""Determine if an error on the RPC stream should be recovered.
12521290
@@ -1283,8 +1321,10 @@ def _should_terminate(self, exception: BaseException) -> bool:
12831321
in a list of terminating exceptions.
12841322
"""
12851323
exception=_wrap_as_exception(exception)
1286-
ifisinstance(exception,_TERMINATING_STREAM_ERRORS):
1287-
_LOGGER.debug("Observed terminating stream error %s",exception)
1324+
is_api_error=isinstance(exception,exceptions.GoogleAPICallError)
1325+
# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
1326+
ifnotis_api_errororisinstance(exception,_TERMINATING_STREAM_ERRORS):
1327+
_LOGGER.error("Observed terminating stream error %s",exception)
12881328
returnTrue
12891329
_LOGGER.debug("Observed non-terminating stream error %s",exception)
12901330
returnFalse

‎tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py‎

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,13 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
13331333
leaser.return_value.start.assert_called_once()
13341334
assertmanager.leaser==leaser.return_value
13351335

1336-
background_consumer.assert_called_once_with(manager._rpc,manager._on_response)
1336+
ifstreaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
1337+
background_consumer.assert_called_once_with(
1338+
manager._rpc,manager._on_response,manager._on_fatal_exception
1339+
)
1340+
else:
1341+
background_consumer.assert_called_once_with(manager._rpc,manager._on_response)
1342+
13371343
background_consumer.return_value.start.assert_called_once()
13381344
assertmanager._consumer==background_consumer.return_value
13391345

@@ -1432,6 +1438,31 @@ def test_close():
14321438
assertmanager.is_activeisFalse
14331439

14341440

1441+
deftest_closes_on_fatal_consumer_error():
1442+
(
1443+
manager,
1444+
consumer,
1445+
dispatcher,
1446+
leaser,
1447+
heartbeater,
1448+
scheduler,
1449+
)=make_running_manager()
1450+
1451+
ifstreaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
1452+
error=ValueError("some fatal exception")
1453+
manager._on_fatal_exception(error)
1454+
1455+
await_manager_shutdown(manager,timeout=3)
1456+
1457+
consumer.stop.assert_called_once()
1458+
leaser.stop.assert_called_once()
1459+
dispatcher.stop.assert_called_once()
1460+
heartbeater.stop.assert_called_once()
1461+
scheduler.shutdown.assert_called_once()
1462+
1463+
assertmanager.is_activeisFalse
1464+
1465+
14351466
deftest_close_inactive_consumer():
14361467
(
14371468
manager,
@@ -2270,18 +2301,24 @@ def test__should_recover_false():
22702301
deftest__should_terminate_true():
22712302
manager=make_manager()
22722303

2273-
details="Cancelled. Go away, before I taunt you a second time."
2274-
exc=exceptions.Cancelled(details)
2275-
2276-
assertmanager._should_terminate(exc)isTrue
2304+
forexcin [
2305+
exceptions.Cancelled(""),
2306+
exceptions.PermissionDenied(""),
2307+
TypeError(),
2308+
ValueError(),
2309+
]:
2310+
assertmanager._should_terminate(exc)
22772311

22782312

22792313
deftest__should_terminate_false():
22802314
manager=make_manager()
22812315

2282-
exc=TypeError("wahhhhhh")
2283-
2284-
assertmanager._should_terminate(exc)isFalse
2316+
forexcin [
2317+
exceptions.ResourceExhausted(""),
2318+
exceptions.ServiceUnavailable(""),
2319+
exceptions.DeadlineExceeded(""),
2320+
]:
2321+
assertnotmanager._should_terminate(exc)
22852322

22862323

22872324
@mock.patch("threading.Thread",autospec=True)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp