Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Incorrect rows in inline fetch result#479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
jprakash-db merged 12 commits intomainfromjprakash-db/inline-incorrect-rows
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletionssrc/databricks/sql/client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -808,6 +808,7 @@ def execute(
self.thrift_backend,
self.buffer_size_bytes,
self.arraysize,
self.connection.use_cloud_fetch,
)

if execute_response.is_staging_operation:
Expand DownExpand Up@@ -1202,6 +1203,7 @@ def __init__(
thrift_backend: ThriftBackend,
result_buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES,
arraysize: int = 10000,
use_cloud_fetch: bool = True,
):
"""
A ResultSet manages the results of a single command.
Expand All@@ -1223,6 +1225,7 @@ def __init__(
self.description = execute_response.description
self._arrow_schema_bytes = execute_response.arrow_schema_bytes
self._next_row_index = 0
self._use_cloud_fetch = use_cloud_fetch

if execute_response.arrow_queue:
# In this case the server has taken the fast path and returned an initial batch of
Expand DownExpand Up@@ -1250,6 +1253,7 @@ def _fill_results_buffer(self):
lz4_compressed=self.lz4_compressed,
arrow_schema_bytes=self._arrow_schema_bytes,
description=self.description,
use_cloud_fetch=self._use_cloud_fetch,
)
self.results = results
self.has_more_rows = has_more_rows
Expand Down
12 changes: 7 additions & 5 deletionssrc/databricks/sql/thrift_backend.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -321,7 +321,7 @@ def _handle_request_error(self, error_info, attempt, elapsed):

# FUTURE: Consider moving to https://github.com/litl/backoff or
# https://github.com/jd/tenacity for retry logic.
def make_request(self, method, request):
def make_request(self, method, request, retryable=True):
"""Execute given request, attempting retries when
1. Receiving HTTP 429/503 from server
2. OSError is raised during a GetOperationStatus
Expand DownExpand Up@@ -460,7 +460,7 @@ def attempt_request(attempt):
# return on success
# if available: bounded delay and retry
# if not: raise error
max_attempts = self._retry_stop_after_attempts_count
max_attempts = self._retry_stop_after_attempts_count if retryable else 1

# use index-1 counting for logging/human consistency
for attempt in range(1, max_attempts + 1):
Expand DownExpand Up@@ -1028,6 +1028,7 @@ def fetch_results(
lz4_compressed,
arrow_schema_bytes,
description,
use_cloud_fetch=True,
):
assert op_handle is not None

Expand All@@ -1044,10 +1045,11 @@ def fetch_results(
includeResultSetMetadata=True,
)

resp = self.make_request(self._client.FetchResults, req)
# Fetch results in Inline mode with FETCH_NEXT orientation are not idempotent and hence not retried
resp = self.make_request(self._client.FetchResults, req, use_cloud_fetch)
if resp.results.startRowOffset > expected_row_start_offset:
logger.warning(
"Expected results to start from {} but they instead start at {}".format(
raise DataError(
"fetch_results failed due to inconsistency in the state between the client and the server.Expected results to start from {} but they instead start at {}, some result batches must have been skipped".format(
expected_row_start_offset, resp.results.startRowOffset
)
)
Expand Down
1 change: 1 addition & 0 deletionstests/unit/test_fetches.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -66,6 +66,7 @@ def fetch_results(
lz4_compressed,
arrow_schema_bytes,
description,
use_cloud_fetch=True,
):
nonlocal batch_index
results = FetchTests.make_arrow_queue(batch_list[batch_index])
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp