Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaf1851b

Browse files
committed
telemetry retry
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parentc6f4a27 commitaf1851b

File tree

5 files changed

+184
-5
lines changed

5 files changed

+184
-5
lines changed

‎src/databricks/sql/common/http.py‎

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
importthreading
66
fromdataclassesimportdataclass
77
fromcontextlibimportcontextmanager
8-
fromtypingimportGenerator
8+
fromtypingimportGenerator,Optional
99
importlogging
10+
fromrequests.adaptersimportHTTPAdapter
11+
fromdatabricks.sql.auth.retryimportDatabricksRetryPolicy,CommandType
1012

1113
logger=logging.getLogger(__name__)
1214

@@ -81,3 +83,70 @@ def execute(
8183

8284
defclose(self):
8385
self.session.close()
86+
87+
88+
classTelemetryHTTPAdapter(HTTPAdapter):
89+
"""
90+
Custom HTTP adapter to prepare our DatabricksRetryPolicy before each request.
91+
This ensures the retry timer is started and the command type is set correctly,
92+
allowing the policy to manage its state for the duration of the request retries.
93+
"""
94+
95+
defsend(self,request,**kwargs):
96+
self.max_retries.command_type=CommandType.OTHER
97+
self.max_retries.start_retry_timer()
98+
returnsuper().send(request,**kwargs)
99+
100+
101+
classTelemetryHttpClient:# TODO: Unify all the http clients in the PySQL Connector
102+
"""Singleton HTTP client for sending telemetry data."""
103+
104+
_instance:Optional["TelemetryHttpClient"]=None
105+
_lock=threading.Lock()
106+
107+
TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_COUNT=3
108+
TELEMETRY_RETRY_DELAY_MIN=1.0
109+
TELEMETRY_RETRY_DELAY_MAX=10.0
110+
TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_DURATION=30.0
111+
112+
def__init__(self):
113+
"""Initializes the session and mounts the custom retry adapter."""
114+
retry_policy=DatabricksRetryPolicy(
115+
delay_min=self.TELEMETRY_RETRY_DELAY_MIN,
116+
delay_max=self.TELEMETRY_RETRY_DELAY_MAX,
117+
stop_after_attempts_count=self.TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_COUNT,
118+
stop_after_attempts_duration=self.TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_DURATION,
119+
delay_default=1.0,
120+
force_dangerous_codes=[],
121+
)
122+
adapter=TelemetryHTTPAdapter(max_retries=retry_policy)
123+
self.session=requests.Session()
124+
self.session.mount("https://",adapter)
125+
self.session.mount("http://",adapter)
126+
127+
@classmethod
128+
defget_instance(cls)->"TelemetryHttpClient":
129+
"""Get the singleton instance of the TelemetryHttpClient."""
130+
ifcls._instanceisNone:
131+
withcls._lock:
132+
ifcls._instanceisNone:
133+
logger.debug("Initializing singleton TelemetryHttpClient")
134+
cls._instance=TelemetryHttpClient()
135+
returncls._instance
136+
137+
defpost(self,url:str,**kwargs)->requests.Response:
138+
"""
139+
Executes a POST request using the configured session.
140+
141+
This is a blocking call intended to be run in a background thread.
142+
"""
143+
logger.debug("Executing telemetry POST request to: %s",url)
144+
returnself.session.post(url,**kwargs)
145+
146+
defclose(self):
147+
"""Closes the underlying requests.Session."""
148+
logger.debug("Closing TelemetryHttpClient session.")
149+
self.session.close()
150+
# Clear the instance to allow for re-initialization if needed
151+
withTelemetryHttpClient._lock:
152+
TelemetryHttpClient._instance=None

‎src/databricks/sql/exc.py‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
importlogging
33

44
logger=logging.getLogger(__name__)
5-
fromdatabricks.sql.telemetry.telemetry_clientimportTelemetryClientFactory
6-
75

86
### PEP-249 Mandated ###
97
# https://peps.python.org/pep-0249/#exceptions
@@ -22,6 +20,8 @@ def __init__(
2220

2321
error_name=self.__class__.__name__
2422
ifsession_id_hex:
23+
fromdatabricks.sql.telemetry.telemetry_clientimportTelemetryClientFactory
24+
2525
telemetry_client=TelemetryClientFactory.get_telemetry_client(
2626
session_id_hex
2727
)

‎src/databricks/sql/telemetry/telemetry_client.py‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
importlogging
55
fromconcurrent.futuresimportThreadPoolExecutor
66
fromtypingimportDict,Optional
7+
fromdatabricks.sql.common.httpimportTelemetryHttpClient
78
fromdatabricks.sql.telemetry.models.eventimport (
89
TelemetryEvent,
910
DriverSystemConfiguration,
@@ -159,6 +160,7 @@ def __init__(
159160
self._driver_connection_params=None
160161
self._host_url=host_url
161162
self._executor=executor
163+
self._http_client=TelemetryHttpClient.get_instance()
162164

163165
def_export_event(self,event):
164166
"""Add an event to the batch queue and flush if batch is full"""
@@ -207,7 +209,7 @@ def _send_telemetry(self, events):
207209
try:
208210
logger.debug("Submitting telemetry request to thread pool")
209211
future=self._executor.submit(
210-
requests.post,
212+
self._http_client.post,
211213
url,
212214
data=request.to_json(),
213215
headers=headers,
@@ -433,6 +435,7 @@ def close(session_id_hex):
433435
)
434436
try:
435437
TelemetryClientFactory._executor.shutdown(wait=True)
438+
TelemetryHttpClient.close()
436439
exceptExceptionase:
437440
logger.debug("Failed to shutdown thread pool executor: %s",e)
438441
TelemetryClientFactory._executor=None

‎tests/e2e/test_telemetry_retry.py‎

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
importpytest
2+
fromunittest.mockimportpatch,MagicMock
3+
importio
4+
importtime
5+
6+
fromdatabricks.sql.telemetry.telemetry_clientimportTelemetryClientFactory
7+
fromdatabricks.sql.auth.retryimportDatabricksRetryPolicy
8+
9+
PATCH_TARGET='urllib3.connectionpool.HTTPSConnectionPool._get_conn'
10+
11+
defcreate_mock_conn(responses):
12+
"""Creates a mock connection object whose getresponse() method yields a series of responses."""
13+
mock_conn=MagicMock()
14+
mock_http_responses= []
15+
forrespinresponses:
16+
mock_http_response=MagicMock()
17+
mock_http_response.status=resp.get("status")
18+
mock_http_response.headers=resp.get("headers", {})
19+
body=resp.get("body",b'{}')
20+
mock_http_response.fp=io.BytesIO(body)
21+
defrelease():
22+
mock_http_response.fp.close()
23+
mock_http_response.release_conn=release
24+
mock_http_responses.append(mock_http_response)
25+
mock_conn.getresponse.side_effect=mock_http_responses
26+
returnmock_conn
27+
28+
classTestTelemetryClientRetries:
29+
@pytest.fixture(autouse=True)
30+
defsetup_and_teardown(self):
31+
TelemetryClientFactory._initialized=False
32+
TelemetryClientFactory._clients= {}
33+
TelemetryClientFactory._executor=None
34+
yield
35+
ifTelemetryClientFactory._executor:
36+
TelemetryClientFactory._executor.shutdown(wait=True)
37+
TelemetryClientFactory._initialized=False
38+
TelemetryClientFactory._clients= {}
39+
TelemetryClientFactory._executor=None
40+
41+
defget_client(self,session_id,num_retries=3):
42+
"""
43+
Configures a client with a specific number of retries.
44+
"""
45+
TelemetryClientFactory.initialize_telemetry_client(
46+
telemetry_enabled=True,
47+
session_id_hex=session_id,
48+
auth_provider=None,
49+
host_url="test.databricks.com",
50+
)
51+
client=TelemetryClientFactory.get_telemetry_client(session_id)
52+
53+
retry_policy=DatabricksRetryPolicy(
54+
delay_min=0.01,
55+
delay_max=0.02,
56+
stop_after_attempts_duration=2.0,
57+
stop_after_attempts_count=num_retries,
58+
delay_default=0.1,
59+
force_dangerous_codes=[],
60+
urllib3_kwargs={'total':num_retries}
61+
)
62+
adapter=client._http_client.session.adapters.get("https://")
63+
adapter.max_retries=retry_policy
64+
returnclient
65+
66+
@pytest.mark.parametrize(
67+
"status_code, description",
68+
[
69+
(401,"Unauthorized"),
70+
(403,"Forbidden"),
71+
(501,"Not Implemented"),
72+
(200,"Success"),
73+
],
74+
)
75+
deftest_non_retryable_status_codes_are_not_retried(self,status_code,description):
76+
"""
77+
Verifies that terminal error codes (401, 403, 501) and success codes (200) are not retried.
78+
"""
79+
# Use the status code in the session ID for easier debugging if it fails
80+
client=self.get_client(f"session-{status_code}")
81+
mock_responses= [{"status":status_code}]
82+
83+
withpatch(PATCH_TARGET,return_value=create_mock_conn(mock_responses))asmock_get_conn:
84+
client.export_failure_log("TestError","Test message")
85+
TelemetryClientFactory.close(client._session_id_hex)
86+
87+
mock_get_conn.return_value.getresponse.assert_called_once()
88+
89+
deftest_exceeds_retry_count_limit(self):
90+
"""
91+
Verifies that the client retries up to the specified number of times before giving up.
92+
Verifies that the client respects the Retry-After header and retries on 429, 502, 503.
93+
"""
94+
num_retries=3
95+
expected_total_calls=num_retries+1
96+
retry_after=1
97+
client=self.get_client("session-exceed-limit",num_retries=num_retries)
98+
mock_responses= [{"status":503,"headers": {"Retry-After":str(retry_after)}}, {"status":429}, {"status":502}, {"status":503}]
99+
100+
withpatch(PATCH_TARGET,return_value=create_mock_conn(mock_responses))asmock_get_conn:
101+
start_time=time.time()
102+
client.export_failure_log("TestError","Test message")
103+
TelemetryClientFactory.close(client._session_id_hex)
104+
end_time=time.time()
105+
106+
assertmock_get_conn.return_value.getresponse.call_count==expected_total_calls
107+
assertend_time-start_time>retry_after

‎tests/unit/test_telemetry.py‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def test_network_request_flow(self, mock_post, mock_telemetry_client):
9090
args,kwargs=client._executor.submit.call_args
9191

9292
# Verify correct function and URL
93-
assertargs[0]==requests.post
93+
assertargs[0]==client._http_client.post
9494
assertargs[1]=="https://test-host.com/telemetry-ext"
9595
assertkwargs["headers"]["Authorization"]=="Bearer test-token"
9696

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp