|
16 | 16 |
|
17 | 17 | importcollections |
18 | 18 | importfunctools |
| 19 | +importinspect |
19 | 20 | importitertools |
20 | 21 | importlogging |
21 | 22 | importthreading |
|
62 | 63 | _REGULAR_SHUTDOWN_THREAD_NAME="Thread-RegularStreamShutdown" |
63 | 64 | _RPC_ERROR_THREAD_NAME="Thread-OnRpcTerminated" |
64 | 65 | _RETRYABLE_STREAM_ERRORS= ( |
| 66 | +exceptions.Aborted, |
65 | 67 | exceptions.DeadlineExceeded, |
66 | | -exceptions.ServiceUnavailable, |
| 68 | +exceptions.GatewayTimeout, |
67 | 69 | exceptions.InternalServerError, |
| 70 | +exceptions.ResourceExhausted, |
| 71 | +exceptions.ServiceUnavailable, |
68 | 72 | exceptions.Unknown, |
69 | | -exceptions.GatewayTimeout, |
70 | | -exceptions.Aborted, |
71 | 73 | ) |
72 | | -_TERMINATING_STREAM_ERRORS= (exceptions.Cancelled,) |
| 74 | +_TERMINATING_STREAM_ERRORS= ( |
| 75 | +exceptions.Cancelled, |
| 76 | +exceptions.InvalidArgument, |
| 77 | +exceptions.NotFound, |
| 78 | +exceptions.PermissionDenied, |
| 79 | +exceptions.Unauthenticated, |
| 80 | +exceptions.Unauthorized, |
| 81 | +) |
73 | 82 | _MAX_LOAD=1.0 |
74 | 83 | """The load threshold above which to pause the incoming message stream.""" |
75 | 84 |
|
|
98 | 107 | code_pb2.UNAVAILABLE, |
99 | 108 | } |
100 | 109 |
|
| 110 | +# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform |
| 111 | +# callers on unrecoverable errors. We can only pass this arg if it's available in the |
| 112 | +# `BackgroundConsumer` spec. |
| 113 | +_SHOULD_USE_ON_FATAL_ERROR_CALLBACK="on_fatal_exception"ininspect.getfullargspec( |
| 114 | +bidi.BackgroundConsumer |
| 115 | +) |
| 116 | + |
101 | 117 |
|
102 | 118 | def_wrap_as_exception(maybe_exception:Any)->BaseException: |
103 | 119 | """Wrap an object as a Python exception, if needed. |
@@ -876,7 +892,18 @@ def open( |
876 | 892 | assertself._schedulerisnotNone |
877 | 893 | scheduler_queue=self._scheduler.queue |
878 | 894 | self._dispatcher=dispatcher.Dispatcher(self,scheduler_queue) |
879 | | -self._consumer=bidi.BackgroundConsumer(self._rpc,self._on_response) |
| 895 | + |
| 896 | +# `on_fatal_exception` is only available in more recent library versions. |
| 897 | +# For backwards compatibility reasons, we only pass it when `google-api-core` supports it. |
| 898 | +if_SHOULD_USE_ON_FATAL_ERROR_CALLBACK: |
| 899 | +self._consumer=bidi.BackgroundConsumer( |
| 900 | +self._rpc, |
| 901 | +self._on_response, |
| 902 | +on_fatal_exception=self._on_fatal_exception, |
| 903 | + ) |
| 904 | +else: |
| 905 | +self._consumer=bidi.BackgroundConsumer(self._rpc,self._on_response) |
| 906 | + |
880 | 907 | self._leaser=leaser.Leaser(self) |
881 | 908 | self._heartbeater=heartbeater.Heartbeater(self) |
882 | 909 |
|
@@ -1247,6 +1274,17 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: |
1247 | 1274 |
|
1248 | 1275 | self.maybe_pause_consumer() |
1249 | 1276 |
|
| 1277 | +def_on_fatal_exception(self,exception:BaseException)->None: |
| 1278 | +""" |
| 1279 | + Called whenever `self.consumer` receives a non-retryable exception. |
| 1280 | + We close the manager on such non-retryable cases. |
| 1281 | + """ |
| 1282 | +_LOGGER.exception( |
| 1283 | +"Streaming pull terminating after receiving non-recoverable error: %s", |
| 1284 | +exception, |
| 1285 | + ) |
| 1286 | +self.close(exception) |
| 1287 | + |
1250 | 1288 | def_should_recover(self,exception:BaseException)->bool: |
1251 | 1289 | """Determine if an error on the RPC stream should be recovered. |
1252 | 1290 |
|
@@ -1283,8 +1321,10 @@ def _should_terminate(self, exception: BaseException) -> bool: |
1283 | 1321 | in a list of terminating exceptions. |
1284 | 1322 | """ |
1285 | 1323 | exception=_wrap_as_exception(exception) |
1286 | | -ifisinstance(exception,_TERMINATING_STREAM_ERRORS): |
1287 | | -_LOGGER.debug("Observed terminating stream error %s",exception) |
| 1324 | +is_api_error=isinstance(exception,exceptions.GoogleAPICallError) |
| 1325 | +# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.) |
| 1326 | +ifnotis_api_errororisinstance(exception,_TERMINATING_STREAM_ERRORS): |
| 1327 | +_LOGGER.error("Observed terminating stream error %s",exception) |
1288 | 1328 | returnTrue |
1289 | 1329 | _LOGGER.debug("Observed non-terminating stream error %s",exception) |
1290 | 1330 | returnFalse |
|