Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Download Manager: Stop shutdown in case of empty download tasks Queue#641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
varun-edachali-dbx wants to merge78 commits intomain
base:main
Choose a base branch
Loading
fromless-defensive-download
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
78 commits
Select commitHold shift + click to select a range
5bf5d4c
Separate Session related functionality from Connection class (#571)
varun-edachali-dbxMay 28, 2025
400a8bd
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbxMay 30, 2025
3c78ed7
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbxJun 3, 2025
9625229
Introduce Sea HTTP Client and test script (#583)
varun-edachali-dbxJun 4, 2025
0887bc1
Introduce `SeaDatabricksClient` (Session Implementation) (#582)
varun-edachali-dbxJun 9, 2025
6d63df0
Normalise Execution Response (clean backend interfaces) (#587)
varun-edachali-dbxJun 11, 2025
ba8d9fd
Introduce models for `SeaDatabricksClient` (#595)
varun-edachali-dbxJun 12, 2025
bb3f15a
Introduce preliminary SEA Result Set (#588)
varun-edachali-dbxJun 12, 2025
19f1fae
Merge branch 'main' into sea-migration
varun-edachali-dbxJun 17, 2025
6c5ba6d
remove invalid ExecuteResponse import
varun-edachali-dbxJun 17, 2025
5e5147b
Separate Session related functionality from Connection class (#571)
varun-edachali-dbxMay 28, 2025
57370b3
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbxMay 30, 2025
75752bf
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbxJun 3, 2025
450b80d
remove un-necessary initialisation assertions
varun-edachali-dbxJun 18, 2025
a926f02
remove un-necessary line break s
varun-edachali-dbxJun 18, 2025
55ad001
more un-necessary line breaks
varun-edachali-dbxJun 18, 2025
fa15730
constrain diff of test_closing_connection_closes_commands
varun-edachali-dbxJun 18, 2025
019c7fb
reduce diff of test_closing_connection_closes_commands
varun-edachali-dbxJun 18, 2025
726abe7
use pytest-like assertions for test_closing_connection_closes_commands
varun-edachali-dbxJun 18, 2025
bf6d41c
ensure command_id is not None
varun-edachali-dbxJun 18, 2025
5afa733
line breaks after multi-line pyfocs
varun-edachali-dbxJun 18, 2025
e3dfd36
ensure non null operationHandle for commandId creation
varun-edachali-dbxJun 18, 2025
63360b3
use command_id methods instead of explicit guid_to_hex_id conversion
varun-edachali-dbxJun 18, 2025
13ffb8d
remove un-necessary artifacts in test_session, add back assertion
varun-edachali-dbxJun 18, 2025
a74d279
Implement SeaDatabricksClient (Complete Execution Spec) (#590)
varun-edachali-dbxJun 18, 2025
d759050
add from __future__ import annotations to remove string literals arou…
varun-edachali-dbxJun 19, 2025
1e21434
move docstring of DatabricksClient within class
varun-edachali-dbxJun 24, 2025
cd4015b
move ThriftResultSet import to top of file
varun-edachali-dbxJun 24, 2025
ed8b610
make backend/utils __init__ file empty
varun-edachali-dbxJun 24, 2025
94d951e
use from __future__ import annotations to remove string literals arou…
varun-edachali-dbxJun 24, 2025
c20058e
use lazy logging
varun-edachali-dbxJun 24, 2025
fe3acb1
replace getters with property tag
varun-edachali-dbxJun 24, 2025
9fb6a76
Merge branch 'main' into backend-refactors
varun-edachali-dbxJun 24, 2025
61dfc4d
set active_command_id to None, not active_op_handle
varun-edachali-dbxJun 24, 2025
64fb9b2
align test_session with pytest instead of unittest
varun-edachali-dbxJun 24, 2025
cbf63f9
Merge branch 'main' into sea-migration
varun-edachali-dbxJun 26, 2025
59b4825
remove duplicate test, correct active_command_id attribute
varun-edachali-dbxJun 26, 2025
e380654
SeaDatabricksClient: Add Metadata Commands (#593)
varun-edachali-dbxJun 26, 2025
677a7b0
SEA volume operations fix: assign `manifest.is_volume_operation` to `…
varun-edachali-dbxJun 26, 2025
45585d4
Introduce manual SEA test scripts for Exec Phase (#589)
varun-edachali-dbxJun 27, 2025
70c7dc8
Complete Fetch Phase (for `INLINE` disposition and `JSON_ARRAY` forma…
varun-edachali-dbxJul 2, 2025
abf9aab
Merge branch 'main' into sea-migration
varun-edachali-dbxJul 3, 2025
9b4b606
Merge branch 'main' into backend-refactors
varun-edachali-dbxJul 3, 2025
4f11ff0
Introduce `row_limit` param (#607)
varun-edachali-dbxJul 7, 2025
45f5c26
Merge branch 'main' into backend-refactors
varun-edachali-dbxJul 10, 2025
2c9368a
formatting (black)
varun-edachali-dbxJul 10, 2025
9b1b1f5
remove repetition from Session.__init__
varun-edachali-dbxJul 10, 2025
77e23d3
Merge branch 'backend-refactors' into sea-migration
varun-edachali-dbxJul 11, 2025
3bd3aef
fix merge artifacts
varun-edachali-dbxJul 11, 2025
6d4701f
correct patch paths
varun-edachali-dbxJul 11, 2025
dc1cb6d
fix type issues
varun-edachali-dbxJul 14, 2025
5d04cd0
Merge branch 'main' into sea-migration
varun-edachali-dbxJul 15, 2025
922c448
explicitly close result queue
varun-edachali-dbxJul 15, 2025
1a0575a
Complete Fetch Phase (`EXTERNAL_LINKS` disposition and `ARROW` format…
varun-edachali-dbxJul 16, 2025
c07beb1
SEA Session Configuration Fix: Explicitly convert values to `str` (#…
varun-edachali-dbxJul 16, 2025
640cc82
SEA: add support for `Hybrid` disposition (#631)
varun-edachali-dbxJul 17, 2025
8fbca9d
SEA: Reduce network calls for synchronous commands (#633)
varun-edachali-dbxJul 19, 2025
806e5f5
SEA: Decouple Link Fetching (#632)
varun-edachali-dbxJul 21, 2025
2bb8328
make download manager less defensive
varun-edachali-dbxJul 21, 2025
ac52428
pa is not defined
varun-edachali-dbxJul 21, 2025
746df87
skip some pyarrow dependent tests if no pa
varun-edachali-dbxJul 21, 2025
e11c065
notify listeners on scheduling downloads
varun-edachali-dbxJul 21, 2025
3c1ff9b
ensure total links is maintained
varun-edachali-dbxJul 21, 2025
4b0f483
acquite download_condition lock
varun-edachali-dbxJul 21, 2025
feb387a
acquite lock before notifying
varun-edachali-dbxJul 21, 2025
b57c3f3
Chunk download latency (#634)
saishreeeeeJul 21, 2025
57f8e48
constrain calls to get_next_downloaded_file
varun-edachali-dbxJul 21, 2025
7872e18
Merge branch 'sea-migration' into less-defensive-download
varun-edachali-dbxJul 21, 2025
620906b
formatting
varun-edachali-dbxJul 21, 2025
ef5836b
acquire lock before notif + formatting (black)
varun-edachali-dbxJul 21, 2025
4bb213c
remove redundant docstring
varun-edachali-dbxJul 21, 2025
cd8389f
Merge branch 'sea-migration' into less-defensive-download
varun-edachali-dbxJul 21, 2025
aab8ce5
Merge branch 'main' into less-defensive-download
varun-edachali-dbxJul 28, 2025
62ed2a2
description, partial results (small fixes)
varun-edachali-dbxJul 28, 2025
efe5e82
random newline remove
varun-edachali-dbxJul 28, 2025
21b4631
remove duplicate defn
varun-edachali-dbxJul 28, 2025
b8cce65
Merge branch 'main' into less-defensive-download
varun-edachali-dbxAug 6, 2025
345a9fc
reduce diff
varun-edachali-dbxAug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletionssrc/databricks/sql/backend/sea/queue.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -364,14 +364,14 @@ def __init__(
# Initialize table and position
self.table = self._create_next_table()

def _create_next_table(self) ->Union["pyarrow.Table", None]:
def _create_next_table(self) -> "pyarrow.Table":
"""Create next table by retrieving the logical next downloaded file."""
if self.link_fetcher is None:
returnNone
returnself._create_empty_table()

chunk_link = self.link_fetcher.get_chunk_link(self._current_chunk_index)
if chunk_link is None:
returnNone
returnself._create_empty_table()

row_offset = chunk_link.row_offset
# NOTE: link has already been submitted to download manager at this point
Expand Down
28 changes: 18 additions & 10 deletionssrc/databricks/sql/cloudfetch/download_manager.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
import logging

from concurrent.futures import ThreadPoolExecutor, Future
import threading
from typing import List, Union, Tuple, Optional

from databricks.sql.cloudfetch.downloader import (
ResultSetDownloadHandler,
DownloadableResultSettings,
DownloadedFile,
)
from databricks.sql.exc import Error
from databricks.sql.types import SSLOptions
from databricks.sql.telemetry.models.event import StatementType
from databricks.sql.thrift_api.TCLIService.ttypes import TSparkArrowResultLink
Expand DownExpand Up@@ -39,26 +41,24 @@ def __init__(
self._pending_links.append((i, link))
self.chunk_id += len(links)

self._download_tasks: List[Future[DownloadedFile]] = []
self._max_download_threads: int = max_download_threads

self._download_condition = threading.Condition()
self._download_tasks: List[Future[DownloadedFile]] = []
self._thread_pool = ThreadPoolExecutor(max_workers=self._max_download_threads)

self._downloadable_result_settings = DownloadableResultSettings(lz4_compressed)
self._ssl_options = ssl_options
self.session_id_hex = session_id_hex
self.statement_id = statement_id

def get_next_downloaded_file(
self, next_row_offset: int
) -> Union[DownloadedFile, None]:
def get_next_downloaded_file(self, next_row_offset: int) -> DownloadedFile:
"""
Get next file that starts at given offset.

This function gets the next downloaded file in which its rows start at the specified next_row_offset
in relation to the full result. File downloads are scheduled if not already, and once the correct
download handler is located, the function waits for the download status and returns the resulting file.
If there are no more downloads, a download was not successful, or the correct file could not be located,
this function shuts down the thread pool and returns None.

Args:
next_row_offset (int): The offset of the starting row of the next file we want data from.
Expand All@@ -67,10 +67,11 @@ def get_next_downloaded_file(
# Make sure the download queue is always full
self._schedule_downloads()

# No more files to download from this batch of links
if len(self._download_tasks) == 0:
self._shutdown_manager()
return None
while len(self._download_tasks) == 0:
if self._thread_pool._shutdown:
raise Error("download manager shut down before file was ready")
with self._download_condition:
self._download_condition.wait()

task = self._download_tasks.pop(0)
# Future's `result()` method will wait for the call to complete, and return
Expand DownExpand Up@@ -113,6 +114,9 @@ def _schedule_downloads(self):
task = self._thread_pool.submit(handler.run)
self._download_tasks.append(task)

with self._download_condition:
self._download_condition.notify_all()

def add_link(self, link: TSparkArrowResultLink):
"""
Add more links to the download manager.
Expand All@@ -132,8 +136,12 @@ def add_link(self, link: TSparkArrowResultLink):
self._pending_links.append((self.chunk_id, link))
self.chunk_id += 1

self._schedule_downloads()

def _shutdown_manager(self):
# Clear download handlers and shutdown the thread pool
self._pending_links = []
self._download_tasks = []
self._thread_pool.shutdown(wait=False)
with self._download_condition:
self._download_condition.notify_all()
55 changes: 23 additions & 32 deletionssrc/databricks/sql/utils.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -249,7 +249,7 @@ def __init__(
self.chunk_id = chunk_id

# Table state
self.table =None
self.table =self._create_empty_table()
self.table_row_index = 0

# Initialize download manager
Expand All@@ -273,24 +273,20 @@ def next_n_rows(self, num_rows: int) -> "pyarrow.Table":
pyarrow.Table
"""

if not self.table:
logger.debug("CloudFetchQueue: no more rows available")
# Return empty pyarrow table to cause retry of fetch
return self._create_empty_table()
logger.debug("CloudFetchQueue: trying to get {} next rows".format(num_rows))
results = self.table.slice(0, 0)
partial_result_chunks = [results]
while num_rows > 0 and self.table:
while num_rows > 0 and self.table.num_rows > 0:
# Replace current table with the next table if we are at the end of the current table
if self.table_row_index == self.table.num_rows:
self.table = self._create_next_table()
self.table_row_index = 0

# Get remaining of num_rows or the rest of the current table, whichever is smaller
length = min(num_rows, self.table.num_rows - self.table_row_index)
table_slice = self.table.slice(self.table_row_index, length)
partial_result_chunks.append(table_slice)
self.table_row_index += table_slice.num_rows

# Replace current table with the next table if we are at the end of the current table
if self.table_row_index == self.table.num_rows:
self.table = self._create_next_table()
self.table_row_index = 0
num_rows -= table_slice.num_rows

logger.debug("CloudFetchQueue: collected {} next rows".format(results.num_rows))
Expand All@@ -304,12 +300,9 @@ def remaining_rows(self) -> "pyarrow.Table":
pyarrow.Table
"""

if not self.table:
# Return empty pyarrow table to cause retry of fetch
return self._create_empty_table()
results = self.table.slice(0, 0)
partial_result_chunks = [results]
while self.table:
while self.table.num_rows > 0:
table_slice = self.table.slice(
self.table_row_index, self.table.num_rows - self.table_row_index
)
Expand All@@ -319,17 +312,11 @@ def remaining_rows(self) -> "pyarrow.Table":
self.table_row_index = 0
return pyarrow.concat_tables(partial_result_chunks, use_threads=True)

def _create_table_at_offset(self, offset: int) ->Union["pyarrow.Table", None]:
def _create_table_at_offset(self, offset: int) -> "pyarrow.Table":
"""Create next table at the given row offset"""

# Create next table by retrieving the logical next downloaded file, or return None to signal end of queue
downloaded_file = self.download_manager.get_next_downloaded_file(offset)
if not downloaded_file:
logger.debug(
"CloudFetchQueue: Cannot find downloaded file for row {}".format(offset)
)
# None signals no more Arrow tables can be built from the remaining handlers if any remain
return None
arrow_table = create_arrow_table_from_arrow_file(
downloaded_file.file_bytes, self.description
)
Expand All@@ -345,7 +332,7 @@ def _create_table_at_offset(self, offset: int) -> Union["pyarrow.Table", None]:
return arrow_table

@abstractmethod
def _create_next_table(self) ->Union["pyarrow.Table", None]:
def _create_next_table(self) -> "pyarrow.Table":
"""Create next table by retrieving the logical next downloaded file."""
pass

Expand All@@ -364,7 +351,7 @@ class ThriftCloudFetchQueue(CloudFetchQueue):

def __init__(
self,
schema_bytes,
schema_bytes: Optional[bytes],
max_download_threads: int,
ssl_options: SSLOptions,
session_id_hex: Optional[str],
Expand DownExpand Up@@ -398,6 +385,8 @@ def __init__(
chunk_id=chunk_id,
)

self.num_links_downloaded = 0

self.start_row_index = start_row_offset
self.result_links = result_links or []
self.session_id_hex = session_id_hex
Expand All@@ -421,20 +410,23 @@ def __init__(
# Initialize table and position
self.table = self._create_next_table()

def _create_next_table(self) -> Union["pyarrow.Table", None]:
def _create_next_table(self) -> "pyarrow.Table":
if self.num_links_downloaded >= len(self.result_links):
return self._create_empty_table()

logger.debug(
"ThriftCloudFetchQueue: Trying to get downloaded file for row {}".format(
self.start_row_index
)
)
arrow_table = self._create_table_at_offset(self.start_row_index)
if arrow_table:
self.start_row_index += arrow_table.num_rows
logger.debug(
"ThriftCloudFetchQueue: Found downloaded file, row count: {}, new start offset: {}".format(
arrow_table.num_rows, self.start_row_index
)
self.num_links_downloaded += 1
self.start_row_index += arrow_table.num_rows
logger.debug(
"ThriftCloudFetchQueue: Found downloaded file, row count: {}, new start offset: {}".format(
arrow_table.num_rows, self.start_row_index
)
)
return arrow_table


Expand DownExpand Up@@ -740,7 +732,6 @@ def convert_decimals_in_arrow_table(table, description) -> "pyarrow.Table":


def convert_to_assigned_datatypes_in_column_table(column_table, description):

converted_column_table = []
for i, col in enumerate(column_table):
if description[i][1] == "decimal":
Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp