Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc59c393

Browse files
author
Jesse
authored
Retry attempts that fail due to a connection timeout (#24)
* Isolate delay bounding logic* Move error details scope up one-level.* Retry GetOperationStatus if an OSError was raised during execution. Add retry_delay_default to use in this case.* Log when a request is retried due to an OSError. Emit warnings for unexpected OSError codes* Update docstring for make_request* Nit: unit tests show the .warn message is deprecated. DeprecationWarning: The 'warn' function is deprecated, use 'warning' insteadSigned-off-by: Jesse Whitehouse <jesse@whitehouse.dev>
1 parent2961524 commitc59c393

File tree

2 files changed

+118
-15
lines changed

2 files changed

+118
-15
lines changed

‎src/databricks/sql/thrift_backend.py‎

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
fromdecimalimportDecimal
2+
importerrno
23
importlogging
34
importmath
45
importtime
@@ -15,6 +16,9 @@
1516

1617
fromdatabricks.sql.thrift_api.TCLIServiceimportTCLIService,ttypes
1718
fromdatabricks.sqlimport*
19+
fromdatabricks.sql.thrift_api.TCLIService.TCLIServiceimport (
20+
ClientasTCLIServiceClient,
21+
)
1822
fromdatabricks.sql.utilsimport (
1923
ArrowQueue,
2024
ExecuteResponse,
@@ -39,6 +43,7 @@
3943
"_retry_delay_max": (float,60,5,3600),
4044
"_retry_stop_after_attempts_count": (int,30,1,60),
4145
"_retry_stop_after_attempts_duration": (float,900,1,86400),
46+
"_retry_delay_default": (float,5,1,60),
4247
}
4348

4449

@@ -71,6 +76,8 @@ def __init__(
7176
# _retry_delay_min (default: 1)
7277
# _retry_delay_max (default: 60)
7378
# {min,max} pre-retry delay bounds
79+
# _retry_delay_default (default: 5)
80+
# Only used when GetOperationStatus fails due to a TCP/OS Error.
7481
# _retry_stop_after_attempts_count (default: 30)
7582
# total max attempts during retry sequence
7683
# _retry_stop_after_attempts_duration (default: 900)
@@ -158,7 +165,7 @@ def _initialize_retry_args(self, kwargs):
158165
"retry parameter: {} given_or_default {}".format(key,given_or_default)
159166
)
160167
ifbound!=given_or_default:
161-
logger.warn(
168+
logger.warning(
162169
"Override out of policy retry parameter: "
163170
+"{} given {}, restricted to {}".format(
164171
key,given_or_default,bound
@@ -243,7 +250,9 @@ def _handle_request_error(self, error_info, attempt, elapsed):
243250
# FUTURE: Consider moving to https://github.com/litl/backoff or
244251
# https://github.com/jd/tenacity for retry logic.
245252
defmake_request(self,method,request):
246-
"""Execute given request, attempting retries when receiving HTTP 429/503.
253+
"""Execute given request, attempting retries when
254+
1. Receiving HTTP 429/503 from server
255+
2. OSError is raised during a GetOperationStatus
247256
248257
For delay between attempts, honor the given Retry-After header, but with bounds.
249258
Use lower bound of expontial-backoff based on _retry_delay_min,
@@ -260,17 +269,21 @@ def make_request(self, method, request):
260269
defget_elapsed():
261270
returntime.time()-t0
262271

272+
defbound_retry_delay(attempt,proposed_delay):
273+
"""bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay]"""
274+
delay=int(proposed_delay)
275+
delay=max(delay,self._retry_delay_min*math.pow(1.5,attempt-1))
276+
delay=min(delay,self._retry_delay_max)
277+
returndelay
278+
263279
defextract_retry_delay(attempt):
264280
# encapsulate retry checks, returns None || delay-in-secs
265281
# Retry IFF 429/503 code + Retry-After header set
266282
http_code=getattr(self._transport,"code",None)
267283
retry_after=getattr(self._transport,"headers", {}).get("Retry-After")
268284
ifhttp_codein [429,503]andretry_after:
269285
# bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay]
270-
delay=int(retry_after)
271-
delay=max(delay,self._retry_delay_min*math.pow(1.5,attempt-1))
272-
delay=min(delay,self._retry_delay_max)
273-
returndelay
286+
returnbound_retry_delay(attempt,int(retry_after))
274287
returnNone
275288

276289
defattempt_request(attempt):
@@ -279,24 +292,57 @@ def attempt_request(attempt):
279292
# - non-None method_return -> success, return and be done
280293
# - non-None retry_delay -> sleep delay before retry
281294
# - error, error_message always set when available
295+
296+
error,error_message,retry_delay=None,None,None
282297
try:
283298
logger.debug("Sending request: {}".format(request))
284299
response=method(request)
285300
logger.debug("Received response: {}".format(response))
286301
returnresponse
287-
exceptExceptionaserror:
302+
exceptOSErroraserr:
303+
error=err
304+
error_message=str(err)
305+
306+
gos_name=TCLIServiceClient.GetOperationStatus.__name__
307+
ifmethod.__name__==gos_name:
308+
retry_delay=bound_retry_delay(attempt,self._retry_delay_default)
309+
310+
# fmt: off
311+
# The built-in errno package encapsulates OSError codes, which are OS-specific.
312+
# log.info for errors we believe are not unusual or unexpected. log.warn for
313+
# for others like EEXIST, EBADF, ERANGE which are not expected in this context.
314+
#
315+
# I manually tested this retry behaviour using mitmweb and confirmed that
316+
# GetOperationStatus requests are retried when I forced network connection
317+
# interruptions / timeouts / reconnects. See #24 for more info.
318+
# | Debian | Darwin |
319+
info_errs= [# |--------|--------|
320+
errno.ESHUTDOWN,# | 32 | 32 |
321+
errno.EAFNOSUPPORT,# | 97 | 47 |
322+
errno.ECONNRESET,# | 104 | 54 |
323+
errno.ETIMEDOUT,# | 110 | 60 |
324+
]
325+
326+
# fmt: on
327+
log_string=f"{gos_name} failed with code{err.errno} and will attempt to retry"
328+
iferr.errnoininfo_errs:
329+
logger.info(log_string)
330+
else:
331+
logger.warning(log_string)
332+
exceptExceptionaserr:
333+
error=err
288334
retry_delay=extract_retry_delay(attempt)
289335
error_message=ThriftBackend._extract_error_message_from_headers(
290336
getattr(self._transport,"headers", {})
291337
)
292-
returnRequestErrorInfo(
293-
error=error,
294-
error_message=error_message,
295-
retry_delay=retry_delay,
296-
http_code=getattr(self._transport,"code",None),
297-
method=method.__name__,
298-
request=request,
299-
)
338+
returnRequestErrorInfo(
339+
error=error,
340+
error_message=error_message,
341+
retry_delay=retry_delay,
342+
http_code=getattr(self._transport,"code",None),
343+
method=method.__name__,
344+
request=request,
345+
)
300346

301347
# The real work:
302348
# - for each available attempt:

‎tests/unit/test_thrift_backend.py‎

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def retry_policy_factory():
1919
"_retry_delay_max": (float,60,None,None),
2020
"_retry_stop_after_attempts_count": (int,30,None,None),
2121
"_retry_stop_after_attempts_duration": (float,900,None,None),
22+
"_retry_delay_default": (float,5,1,60)
2223
}
2324

2425

@@ -968,6 +969,62 @@ def test_handle_execute_response_sets_active_op_handle(self):
968969

969970
self.assertEqual(mock_resp.operationHandle,mock_cursor.active_op_handle)
970971

972+
@patch("thrift.transport.THttpClient.THttpClient")
973+
@patch("databricks.sql.thrift_api.TCLIService.TCLIService.Client.GetOperationStatus")
974+
@patch("databricks.sql.thrift_backend._retry_policy",new_callable=retry_policy_factory)
975+
deftest_make_request_will_retry_GetOperationStatus(
976+
self,mock_retry_policy,mock_GetOperationStatus,t_transport_class):
977+
978+
importthrift,errno
979+
fromdatabricks.sql.thrift_api.TCLIService.TCLIServiceimportClient
980+
fromdatabricks.sql.excimportRequestError
981+
fromdatabricks.sql.utilsimportNoRetryReason
982+
983+
this_gos_name="GetOperationStatus"
984+
mock_GetOperationStatus.__name__=this_gos_name
985+
mock_GetOperationStatus.side_effect=OSError(errno.ETIMEDOUT,"Connection timed out")
986+
987+
protocol=thrift.protocol.TBinaryProtocol.TBinaryProtocol(t_transport_class)
988+
client=Client(protocol)
989+
990+
req=ttypes.TGetOperationStatusReq(
991+
operationHandle=self.operation_handle,
992+
getProgressUpdate=False,
993+
)
994+
995+
EXPECTED_RETRIES=2
996+
997+
thrift_backend=ThriftBackend(
998+
"foobar",
999+
443,
1000+
"path", [],
1001+
_retry_stop_after_attempts_count=EXPECTED_RETRIES,
1002+
_retry_delay_default=1)
1003+
1004+
1005+
withself.assertRaises(RequestError)ascm:
1006+
thrift_backend.make_request(client.GetOperationStatus,req)
1007+
1008+
self.assertEqual(NoRetryReason.OUT_OF_ATTEMPTS.value,cm.exception.context["no-retry-reason"])
1009+
self.assertEqual(f'{EXPECTED_RETRIES}/{EXPECTED_RETRIES}',cm.exception.context["attempt"])
1010+
1011+
# Unusual OSError code
1012+
mock_GetOperationStatus.side_effect=OSError(errno.EEXIST,"File does not exist")
1013+
1014+
withself.assertLogs("databricks.sql.thrift_backend",level=logging.WARNING)ascm:
1015+
withself.assertRaises(RequestError):
1016+
thrift_backend.make_request(client.GetOperationStatus,req)
1017+
1018+
# There should be two warning log messages: one for each retry
1019+
self.assertEqual(len(cm.output),EXPECTED_RETRIES)
1020+
1021+
# The warnings should be identical
1022+
self.assertEqual(cm.output[1],cm.output[0])
1023+
1024+
# The warnings should include this text
1025+
self.assertIn(f"{this_gos_name} failed with code{errno.EEXIST} and will attempt to retry",cm.output[0])
1026+
1027+
9711028
@patch("thrift.transport.THttpClient.THttpClient")
9721029
deftest_make_request_wont_retry_if_headers_not_present(self,t_transport_class):
9731030
t_transport_instance=t_transport_class.return_value

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp