Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitab4b73b

Browse files
authored
Incorrect rows in inline fetch result (#479)
* Raised error when incorrect Row offset it returned* Changed error type* grammar fix* Added unit tests and modified the code* Updated error message* Updated the non retying to only inline case* Updated fix* Changed the flow* Minor update* Updated the retryable condition* Minor test fix* Added extra space
1 parent680b3b6 commitab4b73b

File tree

3 files changed

+12
-5
lines changed

3 files changed

+12
-5
lines changed

‎src/databricks/sql/client.py‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,7 @@ def execute(
808808
self.thrift_backend,
809809
self.buffer_size_bytes,
810810
self.arraysize,
811+
self.connection.use_cloud_fetch,
811812
)
812813

813814
ifexecute_response.is_staging_operation:
@@ -1202,6 +1203,7 @@ def __init__(
12021203
thrift_backend:ThriftBackend,
12031204
result_buffer_size_bytes:int=DEFAULT_RESULT_BUFFER_SIZE_BYTES,
12041205
arraysize:int=10000,
1206+
use_cloud_fetch:bool=True,
12051207
):
12061208
"""
12071209
A ResultSet manages the results of a single command.
@@ -1223,6 +1225,7 @@ def __init__(
12231225
self.description=execute_response.description
12241226
self._arrow_schema_bytes=execute_response.arrow_schema_bytes
12251227
self._next_row_index=0
1228+
self._use_cloud_fetch=use_cloud_fetch
12261229

12271230
ifexecute_response.arrow_queue:
12281231
# In this case the server has taken the fast path and returned an initial batch of
@@ -1250,6 +1253,7 @@ def _fill_results_buffer(self):
12501253
lz4_compressed=self.lz4_compressed,
12511254
arrow_schema_bytes=self._arrow_schema_bytes,
12521255
description=self.description,
1256+
use_cloud_fetch=self._use_cloud_fetch,
12531257
)
12541258
self.results=results
12551259
self.has_more_rows=has_more_rows

‎src/databricks/sql/thrift_backend.py‎

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ def _handle_request_error(self, error_info, attempt, elapsed):
321321

322322
# FUTURE: Consider moving to https://github.com/litl/backoff or
323323
# https://github.com/jd/tenacity for retry logic.
324-
defmake_request(self,method,request):
324+
defmake_request(self,method,request,retryable=True):
325325
"""Execute given request, attempting retries when
326326
1. Receiving HTTP 429/503 from server
327327
2. OSError is raised during a GetOperationStatus
@@ -460,7 +460,7 @@ def attempt_request(attempt):
460460
# return on success
461461
# if available: bounded delay and retry
462462
# if not: raise error
463-
max_attempts=self._retry_stop_after_attempts_count
463+
max_attempts=self._retry_stop_after_attempts_countifretryableelse1
464464

465465
# use index-1 counting for logging/human consistency
466466
forattemptinrange(1,max_attempts+1):
@@ -1028,6 +1028,7 @@ def fetch_results(
10281028
lz4_compressed,
10291029
arrow_schema_bytes,
10301030
description,
1031+
use_cloud_fetch=True,
10311032
):
10321033
assertop_handleisnotNone
10331034

@@ -1044,10 +1045,11 @@ def fetch_results(
10441045
includeResultSetMetadata=True,
10451046
)
10461047

1047-
resp=self.make_request(self._client.FetchResults,req)
1048+
# Fetch results in Inline mode with FETCH_NEXT orientation are not idempotent and hence not retried
1049+
resp=self.make_request(self._client.FetchResults,req,use_cloud_fetch)
10481050
ifresp.results.startRowOffset>expected_row_start_offset:
1049-
logger.warning(
1050-
"Expected results to start from {} but they instead start at {}".format(
1051+
raiseDataError(
1052+
"fetch_results failed due to inconsistency in the state between the client and the server.Expected results to start from {} but they instead start at {}, some result batches must have been skipped".format(
10511053
expected_row_start_offset,resp.results.startRowOffset
10521054
)
10531055
)

‎tests/unit/test_fetches.py‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def fetch_results(
6666
lz4_compressed,
6767
arrow_schema_bytes,
6868
description,
69+
use_cloud_fetch=True,
6970
):
7071
nonlocalbatch_index
7172
results=FetchTests.make_arrow_queue(batch_list[batch_index])

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp