Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita47f2ee

Browse files
committed
Add more debug logging for CloudFetch
Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent6c16b70 commita47f2ee

File tree

2 files changed

+83
-2
lines changed

2 files changed

+83
-2
lines changed

‎src/databricks/sql/cloudfetch/download_manager.py‎

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ def add_file_links(
4949
forlinkint_spark_arrow_result_links:
5050
iflink.rowCount<=0:
5151
continue
52+
logger.debug(
53+
"ResultFileDownloadManager.add_file_links: start offset {}, row count: {}".format(
54+
link.startRowOffset,link.rowCount
55+
)
56+
)
5257
self.download_handlers.append(
5358
ResultSetDownloadHandler(self.downloadable_result_settings,link)
5459
)
@@ -88,6 +93,12 @@ def get_next_downloaded_file(
8893

8994
# Check (and wait) for download status
9095
ifself._check_if_download_successful(handler):
96+
link=handler.result_link
97+
logger.debug(
98+
"ResultFileDownloadManager: file found for row index {}: start {}, row count: {}".format(
99+
next_row_offset,link.startRowOffset,link.rowCount
100+
)
101+
)
91102
# Buffer should be empty so set buffer to new ArrowQueue with result_file
92103
result=DownloadedFile(
93104
handler.result_file,
@@ -97,40 +108,78 @@ def get_next_downloaded_file(
97108
self.download_handlers.pop(idx)
98109
# Return True upon successful download to continue loop and not force a retry
99110
returnresult
111+
else:
112+
logger.debug(
113+
"ResultFileDownloadManager: cannot find file for row index {}".format(
114+
next_row_offset
115+
)
116+
)
117+
100118
# Download was not successful for next download item, force a retry
101119
self._shutdown_manager()
102120
returnNone
103121

104122
def_remove_past_handlers(self,next_row_offset:int):
123+
logger.debug(
124+
"ResultFileDownloadManager: removing past handlers, current offset: {}".format(
125+
next_row_offset
126+
)
127+
)
105128
# Any link in which its start to end range doesn't include the next row to be fetched does not need downloading
106129
i=0
107130
whilei<len(self.download_handlers):
108131
result_link=self.download_handlers[i].result_link
132+
logger.debug(
133+
"- checking result link: start {}, row count: {}, current offset: {}".format(
134+
result_link.startRowOffset,result_link.rowCount,next_row_offset
135+
)
136+
)
109137
ifresult_link.startRowOffset+result_link.rowCount>next_row_offset:
110138
i+=1
111139
continue
112140
self.download_handlers.pop(i)
113141

114142
def_schedule_downloads(self):
115143
# Schedule downloads for all download handlers if not already scheduled.
144+
logger.debug("ResultFileDownloadManager: schedule downloads")
116145
forhandlerinself.download_handlers:
117146
ifhandler.is_download_scheduled:
118147
continue
119148
try:
149+
logger.debug(
150+
"- start: {}, row count: {}".format(
151+
handler.result_link.startRowOffset,handler.result_link.rowCount
152+
)
153+
)
120154
self.thread_pool.submit(handler.run)
121155
exceptExceptionase:
122156
logger.error(e)
123157
break
124158
handler.is_download_scheduled=True
125159

126160
def_find_next_file_index(self,next_row_offset:int):
161+
logger.debug(
162+
"ResultFileDownloadManager: trying to find file for row {}".format(
163+
next_row_offset
164+
)
165+
)
127166
# Get the handler index of the next file in order
128167
next_indices= [
129168
i
130169
fori,handlerinenumerate(self.download_handlers)
131170
ifhandler.is_download_scheduled
171+
# TODO: shouldn't `next_row_offset` be tested against the range, not just start row offset?
132172
andhandler.result_link.startRowOffset==next_row_offset
133173
]
174+
175+
foriinnext_indices:
176+
link=self.download_handlers[i].result_link
177+
logger.debug(
178+
"- found file: start {}, row count {}".format(
179+
link.startRowOffset,link.rowCount
180+
)
181+
)
182+
134183
returnnext_indices[0]iflen(next_indices)>0elseNone
135184

136185
def_check_if_download_successful(self,handler:ResultSetDownloadHandler):

‎src/databricks/sql/utils.py‎

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,18 @@ def __init__(
156156
self.lz4_compressed=lz4_compressed
157157
self.description=description
158158

159+
logger.debug(
160+
"Initialize CloudFetch loader, row set start offset: {}, file list:".format(
161+
start_row_offset
162+
)
163+
)
164+
forresult_linkinresult_links:
165+
logger.debug(
166+
"- start row offset: {}, row count: {}".format(
167+
result_link.startRowOffset,result_link.rowCount
168+
)
169+
)
170+
159171
self.download_manager=ResultFileDownloadManager(
160172
self.max_download_threads,self.lz4_compressed
161173
)
@@ -175,8 +187,10 @@ def next_n_rows(self, num_rows: int) -> pyarrow.Table:
175187
pyarrow.Table
176188
"""
177189
ifnotself.table:
190+
logger.debug("CloudFetchQueue: no more rows available")
178191
# Return empty pyarrow table to cause retry of fetch
179192
returnself._create_empty_table()
193+
logger.debug("CloudFetchQueue: trying to get {} next rows".format(num_rows))
180194
results=self.table.slice(0,0)
181195
whilenum_rows>0andself.table:
182196
# Get remaining of num_rows or the rest of the current table, whichever is smaller
@@ -190,6 +204,8 @@ def next_n_rows(self, num_rows: int) -> pyarrow.Table:
190204
self.table=self._create_next_table()
191205
self.table_row_index=0
192206
num_rows-=table_slice.num_rows
207+
208+
logger.debug("CloudFetchQueue: collected {} next rows".format(results.num_rows))
193209
returnresults
194210

195211
defremaining_rows(self)->pyarrow.Table:
@@ -214,11 +230,21 @@ def remaining_rows(self) -> pyarrow.Table:
214230
returnresults
215231

216232
def_create_next_table(self)->Union[pyarrow.Table,None]:
233+
logger.debug(
234+
"CloudFetchQueue: Trying to get downloaded file for row {}".format(
235+
self.start_row_index
236+
)
237+
)
217238
# Create next table by retrieving the logical next downloaded file, or return None to signal end of queue
218239
downloaded_file=self.download_manager.get_next_downloaded_file(
219240
self.start_row_index
220241
)
221242
ifnotdownloaded_file:
243+
logger.debug(
244+
"CloudFetchQueue: Cannot find downloaded file for row {}".format(
245+
self.start_row_index
246+
)
247+
)
222248
# None signals no more Arrow tables can be built from the remaining handlers if any remain
223249
returnNone
224250
arrow_table=create_arrow_table_from_arrow_file(
@@ -228,12 +254,18 @@ def _create_next_table(self) -> Union[pyarrow.Table, None]:
228254
# The server rarely prepares the exact number of rows requested by the client in cloud fetch.
229255
# Subsequently, we drop the extraneous rows in the last file if more rows are retrieved than requested
230256
ifarrow_table.num_rows>downloaded_file.row_count:
231-
self.start_row_index+=downloaded_file.row_count
232-
returnarrow_table.slice(0,downloaded_file.row_count)
257+
arrow_table=arrow_table.slice(0,downloaded_file.row_count)
233258

234259
# At this point, whether the file has extraneous rows or not, the arrow table should have the correct num rows
235260
assertdownloaded_file.row_count==arrow_table.num_rows
236261
self.start_row_index+=arrow_table.num_rows
262+
263+
logger.debug(
264+
"CloudFetchQueue: Found downloaded file, row count: {}, new start offset: {}".format(
265+
arrow_table.num_rows,self.start_row_index
266+
)
267+
)
268+
237269
returnarrow_table
238270

239271
def_create_empty_table(self)->pyarrow.Table:

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp