Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfe8cd57

Browse files
authored
Testing for telemetry (#616)
* e2e test telemetrySigned-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>* assert session id, statement idSigned-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>* minor changes, added checks on server responseSigned-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>* finally blockSigned-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>* removed setup clean upSigned-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>* finally in test_complex_typesSigned-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>---------Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parente732e96 commitfe8cd57

File tree

2 files changed

+171
-3
lines changed

2 files changed

+171
-3
lines changed

‎tests/e2e/test_complex_types.py‎

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ def table_fixture(self, connection_details):
3939
)
4040
"""
4141
)
42-
yield
43-
# Clean up the table after the test
44-
cursor.execute("DELETE FROM pysql_test_complex_types_table")
42+
try:
43+
yield
44+
finally:
45+
# Clean up the table after the test
46+
cursor.execute("DELETE FROM pysql_test_complex_types_table")
4547

4648
@pytest.mark.parametrize(
4749
"field,expected_type",
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
importrandom
2+
importthreading
3+
importtime
4+
fromunittest.mockimportpatch
5+
importpytest
6+
7+
fromdatabricks.sql.telemetry.models.enumsimportStatementType
8+
fromdatabricks.sql.telemetry.telemetry_clientimportTelemetryClient,TelemetryClientFactory
9+
fromtests.e2e.test_driverimportPySQLPytestTestCase
10+
11+
defrun_in_threads(target,num_threads,pass_index=False):
12+
"""Helper to run target function in multiple threads."""
13+
threads= [
14+
threading.Thread(target=target,args=(i,)ifpass_indexelse ())
15+
foriinrange(num_threads)
16+
]
17+
fortinthreads:
18+
t.start()
19+
fortinthreads:
20+
t.join()
21+
22+
23+
classTestE2ETelemetry(PySQLPytestTestCase):
24+
25+
@pytest.fixture(autouse=True)
26+
deftelemetry_setup_teardown(self):
27+
"""
28+
This fixture ensures the TelemetryClientFactory is in a clean state
29+
before each test and shuts it down afterward. Using a fixture makes
30+
this robust and automatic.
31+
"""
32+
try:
33+
yield
34+
finally:
35+
ifTelemetryClientFactory._executor:
36+
TelemetryClientFactory._executor.shutdown(wait=True)
37+
TelemetryClientFactory._executor=None
38+
TelemetryClientFactory._initialized=False
39+
40+
deftest_concurrent_queries_sends_telemetry(self):
41+
"""
42+
An E2E test where concurrent threads execute real queries against
43+
the staging endpoint, while we capture and verify the generated telemetry.
44+
"""
45+
num_threads=30
46+
capture_lock=threading.Lock()
47+
captured_telemetry= []
48+
captured_session_ids= []
49+
captured_statement_ids= []
50+
captured_responses= []
51+
captured_exceptions= []
52+
53+
original_send_telemetry=TelemetryClient._send_telemetry
54+
original_callback=TelemetryClient._telemetry_request_callback
55+
56+
defsend_telemetry_wrapper(self_client,events):
57+
withcapture_lock:
58+
captured_telemetry.extend(events)
59+
original_send_telemetry(self_client,events)
60+
61+
defcallback_wrapper(self_client,future,sent_count):
62+
"""
63+
Wraps the original callback to capture the server's response
64+
or any exceptions from the async network call.
65+
"""
66+
try:
67+
original_callback(self_client,future,sent_count)
68+
69+
# Now, capture the result for our assertions
70+
response=future.result()
71+
response.raise_for_status()# Raise an exception for 4xx/5xx errors
72+
telemetry_response=response.json()
73+
withcapture_lock:
74+
captured_responses.append(telemetry_response)
75+
exceptExceptionase:
76+
withcapture_lock:
77+
captured_exceptions.append(e)
78+
79+
withpatch.object(TelemetryClient,"_send_telemetry",send_telemetry_wrapper), \
80+
patch.object(TelemetryClient,"_telemetry_request_callback",callback_wrapper):
81+
82+
defexecute_query_worker(thread_id):
83+
"""Each thread creates a connection and executes a query."""
84+
85+
time.sleep(random.uniform(0,0.05))
86+
87+
withself.connection(extra_params={"enable_telemetry":True})asconn:
88+
# Capture the session ID from the connection before executing the query
89+
session_id_hex=conn.get_session_id_hex()
90+
withcapture_lock:
91+
captured_session_ids.append(session_id_hex)
92+
93+
withconn.cursor()ascursor:
94+
cursor.execute(f"SELECT{thread_id}")
95+
# Capture the statement ID after executing the query
96+
statement_id=cursor.query_id
97+
withcapture_lock:
98+
captured_statement_ids.append(statement_id)
99+
cursor.fetchall()
100+
101+
# Run the workers concurrently
102+
run_in_threads(execute_query_worker,num_threads,pass_index=True)
103+
104+
ifTelemetryClientFactory._executor:
105+
TelemetryClientFactory._executor.shutdown(wait=True)
106+
107+
# --- VERIFICATION ---
108+
assertnotcaptured_exceptions
109+
assertlen(captured_responses)>0
110+
111+
total_successful_events=0
112+
forresponseincaptured_responses:
113+
assert"errors"notinresponseornotresponse["errors"]
114+
if"numProtoSuccess"inresponse:
115+
total_successful_events+=response["numProtoSuccess"]
116+
asserttotal_successful_events==num_threads*2
117+
118+
assertlen(captured_telemetry)==num_threads*2# 2 events per thread (initial_telemetry_log, latency_log (execute))
119+
assertlen(captured_session_ids)==num_threads# One session ID per thread
120+
assertlen(captured_statement_ids)==num_threads# One statement ID per thread (per query)
121+
122+
# Separate initial logs from latency logs
123+
initial_logs= [
124+
eforeincaptured_telemetry
125+
ife.entry.sql_driver_log.operation_latency_msisNone
126+
ande.entry.sql_driver_log.driver_connection_paramsisnotNone
127+
ande.entry.sql_driver_log.system_configurationisnotNone
128+
]
129+
latency_logs= [
130+
eforeincaptured_telemetry
131+
ife.entry.sql_driver_log.operation_latency_msisnotNone
132+
ande.entry.sql_driver_log.sql_statement_idisnotNone
133+
ande.entry.sql_driver_log.sql_operation.statement_type==StatementType.QUERY
134+
]
135+
136+
# Verify counts
137+
assertlen(initial_logs)==num_threads
138+
assertlen(latency_logs)==num_threads
139+
140+
# Verify that telemetry events contain the exact session IDs we captured from connections
141+
telemetry_session_ids=set()
142+
foreventincaptured_telemetry:
143+
session_id=event.entry.sql_driver_log.session_id
144+
assertsession_idisnotNone
145+
telemetry_session_ids.add(session_id)
146+
147+
captured_session_ids_set=set(captured_session_ids)
148+
asserttelemetry_session_ids==captured_session_ids_set
149+
assertlen(captured_session_ids_set)==num_threads
150+
151+
# Verify that telemetry latency logs contain the exact statement IDs we captured from cursors
152+
telemetry_statement_ids=set()
153+
foreventinlatency_logs:
154+
statement_id=event.entry.sql_driver_log.sql_statement_id
155+
assertstatement_idisnotNone
156+
telemetry_statement_ids.add(statement_id)
157+
158+
captured_statement_ids_set=set(captured_statement_ids)
159+
asserttelemetry_statement_ids==captured_statement_ids_set
160+
assertlen(captured_statement_ids_set)==num_threads
161+
162+
# Verify that each latency log has a statement ID from our captured set
163+
foreventinlatency_logs:
164+
log=event.entry.sql_driver_log
165+
assertlog.sql_statement_idincaptured_statement_ids
166+
assertlog.session_idincaptured_session_ids

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp