Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit01b7a8d

Browse files
authored
Cloud Fetch download manager (#146)
* Cloud Fetch download managerSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* Bug fix: submit handler.runSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* Type annotationsSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* Namedtuple -> dataclassSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* Shutdown thread pool and clear handlersSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* Docstrings and commentsSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* handler.run is the correct callSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* Link expiry buffer in secsSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* Adding type annotations for download_handlers and downloadable_result_settingsSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* Move DownloadableResultSettings to downloader.py to avoid circular importSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* Black lintingSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>* Timeout is never NoneSigned-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>---------Signed-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com>
1 parent64be9bc commit01b7a8d

File tree

4 files changed

+399
-4
lines changed

4 files changed

+399
-4
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
importlogging
2+
3+
fromconcurrent.futuresimportThreadPoolExecutor
4+
fromdataclassesimportdataclass
5+
fromtypingimportList,Union
6+
7+
fromdatabricks.sql.cloudfetch.downloaderimport (
8+
ResultSetDownloadHandler,
9+
DownloadableResultSettings,
10+
)
11+
fromdatabricks.sql.thrift_api.TCLIService.ttypesimportTSparkArrowResultLink
12+
13+
logger=logging.getLogger(__name__)
14+
15+
16+
@dataclass
17+
classDownloadedFile:
18+
"""
19+
Class for the result file and metadata.
20+
21+
Attributes:
22+
file_bytes (bytes): Downloaded file in bytes.
23+
start_row_offset (int): The offset of the starting row in relation to the full result.
24+
row_count (int): Number of rows the file represents in the result.
25+
"""
26+
27+
file_bytes:bytes
28+
start_row_offset:int
29+
row_count:int
30+
31+
32+
classResultFileDownloadManager:
33+
def__init__(self,max_download_threads:int,lz4_compressed:bool):
34+
self.download_handlers:List[ResultSetDownloadHandler]= []
35+
self.thread_pool=ThreadPoolExecutor(max_workers=max_download_threads+1)
36+
self.downloadable_result_settings=DownloadableResultSettings(lz4_compressed)
37+
self.fetch_need_retry=False
38+
self.num_consecutive_result_file_download_retries=0
39+
40+
defadd_file_links(
41+
self,t_spark_arrow_result_links:List[TSparkArrowResultLink]
42+
)->None:
43+
"""
44+
Create download handler for each cloud fetch link.
45+
46+
Args:
47+
t_spark_arrow_result_links: List of cloud fetch links consisting of file URL and metadata.
48+
"""
49+
forlinkint_spark_arrow_result_links:
50+
iflink.rowCount<=0:
51+
continue
52+
self.download_handlers.append(
53+
ResultSetDownloadHandler(self.downloadable_result_settings,link)
54+
)
55+
56+
defget_next_downloaded_file(
57+
self,next_row_offset:int
58+
)->Union[DownloadedFile,None]:
59+
"""
60+
Get next file that starts at given offset.
61+
62+
This function gets the next downloaded file in which its rows start at the specified next_row_offset
63+
in relation to the full result. File downloads are scheduled if not already, and once the correct
64+
download handler is located, the function waits for the download status and returns the resulting file.
65+
If there are no more downloads, a download was not successful, or the correct file could not be located,
66+
this function shuts down the thread pool and returns None.
67+
68+
Args:
69+
next_row_offset (int): The offset of the starting row of the next file we want data from.
70+
"""
71+
# No more files to download from this batch of links
72+
ifnotself.download_handlers:
73+
self._shutdown_manager()
74+
returnNone
75+
76+
# Remove handlers we don't need anymore
77+
self._remove_past_handlers(next_row_offset)
78+
79+
# Schedule the downloads
80+
self._schedule_downloads()
81+
82+
# Find next file
83+
idx=self._find_next_file_index(next_row_offset)
84+
ifidxisNone:
85+
self._shutdown_manager()
86+
returnNone
87+
handler=self.download_handlers[idx]
88+
89+
# Check (and wait) for download status
90+
ifself._check_if_download_successful(handler):
91+
# Buffer should be empty so set buffer to new ArrowQueue with result_file
92+
result=DownloadedFile(
93+
handler.result_file,
94+
handler.result_link.startRowOffset,
95+
handler.result_link.rowCount,
96+
)
97+
self.download_handlers.pop(idx)
98+
# Return True upon successful download to continue loop and not force a retry
99+
returnresult
100+
# Download was not successful for next download item, force a retry
101+
self._shutdown_manager()
102+
returnNone
103+
104+
def_remove_past_handlers(self,next_row_offset:int):
105+
# Any link in which its start to end range doesn't include the next row to be fetched does not need downloading
106+
i=0
107+
whilei<len(self.download_handlers):
108+
result_link=self.download_handlers[i].result_link
109+
ifresult_link.startRowOffset+result_link.rowCount>next_row_offset:
110+
i+=1
111+
continue
112+
self.download_handlers.pop(i)
113+
114+
def_schedule_downloads(self):
115+
# Schedule downloads for all download handlers if not already scheduled.
116+
forhandlerinself.download_handlers:
117+
ifhandler.is_download_scheduled:
118+
continue
119+
try:
120+
self.thread_pool.submit(handler.run)
121+
exceptExceptionase:
122+
logger.error(e)
123+
break
124+
handler.is_download_scheduled=True
125+
126+
def_find_next_file_index(self,next_row_offset:int):
127+
# Get the handler index of the next file in order
128+
next_indices= [
129+
i
130+
fori,handlerinenumerate(self.download_handlers)
131+
ifhandler.is_download_scheduled
132+
andhandler.result_link.startRowOffset==next_row_offset
133+
]
134+
returnnext_indices[0]iflen(next_indices)>0elseNone
135+
136+
def_check_if_download_successful(self,handler:ResultSetDownloadHandler):
137+
# Check (and wait until download finishes) if download was successful
138+
ifnothandler.is_file_download_successful():
139+
ifhandler.is_link_expired:
140+
self.fetch_need_retry=True
141+
returnFalse
142+
elifhandler.is_download_timedout:
143+
# Consecutive file retries should not exceed threshold in settings
144+
if (
145+
self.num_consecutive_result_file_download_retries
146+
>=self.downloadable_result_settings.max_consecutive_file_download_retries
147+
):
148+
self.fetch_need_retry=True
149+
returnFalse
150+
self.num_consecutive_result_file_download_retries+=1
151+
152+
# Re-submit handler run to thread pool and recursively check download status
153+
self.thread_pool.submit(handler.run)
154+
returnself._check_if_download_successful(handler)
155+
else:
156+
self.fetch_need_retry=True
157+
returnFalse
158+
159+
self.num_consecutive_result_file_download_retries=0
160+
self.fetch_need_retry=False
161+
returnTrue
162+
163+
def_shutdown_manager(self):
164+
# Clear download handlers and shutdown the thread pool to cancel pending futures
165+
self.download_handlers= []
166+
self.thread_pool.shutdown(wait=False,cancel_futures=True)

‎src/databricks/sql/cloudfetch/downloader.py‎

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
importlogging
2+
fromdataclassesimportdataclass
23

34
importrequests
45
importlz4.frame
@@ -10,10 +11,28 @@
1011
logger=logging.getLogger(__name__)
1112

1213

14+
@dataclass
15+
classDownloadableResultSettings:
16+
"""
17+
Class for settings common to each download handler.
18+
19+
Attributes:
20+
is_lz4_compressed (bool): Whether file is expected to be lz4 compressed.
21+
link_expiry_buffer_secs (int): Time in seconds to prevent download of a link before it expires. Default 0 secs.
22+
download_timeout (int): Timeout for download requests. Default 60 secs.
23+
max_consecutive_file_download_retries (int): Number of consecutive download retries before shutting down.
24+
"""
25+
26+
is_lz4_compressed:bool
27+
link_expiry_buffer_secs:int=0
28+
download_timeout:int=60
29+
max_consecutive_file_download_retries:int=0
30+
31+
1332
classResultSetDownloadHandler(threading.Thread):
1433
def__init__(
1534
self,
16-
downloadable_result_settings,
35+
downloadable_result_settings:DownloadableResultSettings,
1736
t_spark_arrow_result_link:TSparkArrowResultLink,
1837
):
1938
super().__init__()
@@ -32,8 +51,11 @@ def is_file_download_successful(self) -> bool:
3251
3352
This function will block until a file download finishes or until a timeout.
3453
"""
35-
timeout=self.settings.download_timeout
36-
timeout=timeoutiftimeoutandtimeout>0elseNone
54+
timeout= (
55+
self.settings.download_timeout
56+
ifself.settings.download_timeout>0
57+
elseNone
58+
)
3759
try:
3860
ifnotself.is_download_finished.wait(timeout=timeout):
3961
self.is_download_timedout=True

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp