Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3dcbe49

Browse files
jprakash-dbvarun-edachali-dbx
authored andcommitted
Added example for async execute query (#537)
Added examples and fixed the async execute not working without pyarrowSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parentce7e242 commit3dcbe49

File tree

4 files changed

+58
-22
lines changed

4 files changed

+58
-22
lines changed

‎examples/query_async_execute.py‎

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
fromdatabricksimportsql
2+
importos
3+
importtime
4+
5+
withsql.connect(
6+
server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"),
7+
http_path=os.getenv("DATABRICKS_HTTP_PATH"),
8+
access_token=os.getenv("DATABRICKS_TOKEN"),
9+
)asconnection:
10+
11+
withconnection.cursor()ascursor:
12+
long_running_query="""
13+
SELECT COUNT(*) FROM RANGE(10000 * 16) x
14+
JOIN RANGE(10000) y
15+
ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'
16+
"""
17+
18+
# Non-blocking call
19+
cursor.execute_async(long_running_query)
20+
21+
# Polling every 5 seconds until the query is no longer pending
22+
whilecursor.is_query_pending():
23+
print("POLLING")
24+
time.sleep(5)
25+
26+
# Blocking call: fetch results when execution completes
27+
cursor.get_async_execution_result()
28+
29+
result=cursor.fetchall()
30+
31+
forresinresult:
32+
print(res)

‎src/databricks/sql/client.py‎

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,19 @@ def get_query_state(self) -> "TOperationState":
896896
self._check_not_closed()
897897
returnself.thrift_backend.get_query_state(self.active_op_handle)
898898

899+
defis_query_pending(self):
900+
"""
901+
Checks whether the async executing query is in pending state or not
902+
903+
:return:
904+
"""
905+
operation_state=self.get_query_state()
906+
907+
returnnotoperation_stateoroperation_statein [
908+
ttypes.TOperationState.RUNNING_STATE,
909+
ttypes.TOperationState.PENDING_STATE,
910+
]
911+
899912
defget_async_execution_result(self):
900913
"""
901914
@@ -905,13 +918,7 @@ def get_async_execution_result(self):
905918
"""
906919
self._check_not_closed()
907920

908-
defis_executing(operation_state)->"bool":
909-
returnnotoperation_stateoroperation_statein [
910-
ttypes.TOperationState.RUNNING_STATE,
911-
ttypes.TOperationState.PENDING_STATE,
912-
]
913-
914-
whileis_executing(self.get_query_state()):
921+
whileself.is_query_pending():
915922
# Poll after some default time
916923
time.sleep(self.ASYNC_DEFAULT_POLLING_INTERVAL)
917924

‎src/databricks/sql/thrift_backend.py‎

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -797,12 +797,15 @@ def get_execution_result(self, op_handle, cursor):
797797
t_result_set_metadata_resp.schema
798798
)
799799

800-
schema_bytes= (
801-
t_result_set_metadata_resp.arrowSchema
802-
orself._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema)
803-
.serialize()
804-
.to_pybytes()
805-
)
800+
ifpyarrow:
801+
schema_bytes= (
802+
t_result_set_metadata_resp.arrowSchema
803+
orself._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema)
804+
.serialize()
805+
.to_pybytes()
806+
)
807+
else:
808+
schema_bytes=None
806809

807810
queue=ResultSetQueueFactory.build_queue(
808811
row_set_type=resp.resultSetMetadata.resultFormat,

‎tests/e2e/test_driver.py‎

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -179,20 +179,14 @@ def test_cloud_fetch(self):
179179

180180

181181
classTestPySQLAsyncQueriesSuite(PySQLPytestTestCase):
182-
defisExecuting(self,operation_state):
183-
returnnotoperation_stateoroperation_statein [
184-
ttypes.TOperationState.RUNNING_STATE,
185-
ttypes.TOperationState.PENDING_STATE,
186-
]
187-
188182
deftest_execute_async__long_running(self):
189183

190184
long_running_query="SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'"
191185
withself.cursor()ascursor:
192186
cursor.execute_async(long_running_query)
193187

194188
## Polling after every POLLING_INTERVAL seconds
195-
whileself.isExecuting(cursor.get_query_state()):
189+
whilecursor.is_query_pending():
196190
time.sleep(self.POLLING_INTERVAL)
197191
log.info("Polling the status in test_execute_async")
198192

@@ -211,7 +205,7 @@ def test_execute_async__small_result(self):
211205
time.sleep(5)
212206

213207
## Polling after every POLLING_INTERVAL seconds
214-
whileself.isExecuting(cursor.get_query_state()):
208+
whilecursor.is_query_pending():
215209
time.sleep(self.POLLING_INTERVAL)
216210
log.info("Polling the status in test_execute_async")
217211

@@ -241,7 +235,7 @@ def test_execute_async__large_result(self):
241235
time.sleep(5)
242236

243237
## Polling after every POLLING_INTERVAL seconds
244-
whileself.isExecuting(cursor.get_query_state()):
238+
whilecursor.is_query_pending():
245239
time.sleep(self.POLLING_INTERVAL)
246240
log.info("Polling the status in test_execute_async")
247241

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp