Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1e6c2e9

Browse files
Add more debug logging for CloudFetch (#395)
Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent6c16b70 commit1e6c2e9

File tree

2 files changed

+84
-2
lines changed

2 files changed

+84
-2
lines changed

‎src/databricks/sql/cloudfetch/download_manager.py‎

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ def add_file_links(
4949
forlinkint_spark_arrow_result_links:
5050
iflink.rowCount<=0:
5151
continue
52+
logger.debug(
53+
"ResultFileDownloadManager.add_file_links: start offset {}, row count: {}".format(
54+
link.startRowOffset,link.rowCount
55+
)
56+
)
5257
self.download_handlers.append(
5358
ResultSetDownloadHandler(self.downloadable_result_settings,link)
5459
)
@@ -88,6 +93,12 @@ def get_next_downloaded_file(
8893

8994
# Check (and wait) for download status
9095
ifself._check_if_download_successful(handler):
96+
link=handler.result_link
97+
logger.debug(
98+
"ResultFileDownloadManager: file found for row index {}: start {}, row count: {}".format(
99+
next_row_offset,link.startRowOffset,link.rowCount
100+
)
101+
)
91102
# Buffer should be empty so set buffer to new ArrowQueue with result_file
92103
result=DownloadedFile(
93104
handler.result_file,
@@ -97,40 +108,78 @@ def get_next_downloaded_file(
97108
self.download_handlers.pop(idx)
98109
# Return True upon successful download to continue loop and not force a retry
99110
returnresult
111+
else:
112+
logger.debug(
113+
"ResultFileDownloadManager: cannot find file for row index {}".format(
114+
next_row_offset
115+
)
116+
)
117+
100118
# Download was not successful for next download item, force a retry
101119
self._shutdown_manager()
102120
returnNone
103121

104122
def_remove_past_handlers(self,next_row_offset:int):
123+
logger.debug(
124+
"ResultFileDownloadManager: removing past handlers, current offset: {}".format(
125+
next_row_offset
126+
)
127+
)
105128
# Any link in which its start to end range doesn't include the next row to be fetched does not need downloading
106129
i=0
107130
whilei<len(self.download_handlers):
108131
result_link=self.download_handlers[i].result_link
132+
logger.debug(
133+
"- checking result link: start {}, row count: {}, current offset: {}".format(
134+
result_link.startRowOffset,result_link.rowCount,next_row_offset
135+
)
136+
)
109137
ifresult_link.startRowOffset+result_link.rowCount>next_row_offset:
110138
i+=1
111139
continue
112140
self.download_handlers.pop(i)
113141

114142
def_schedule_downloads(self):
115143
# Schedule downloads for all download handlers if not already scheduled.
144+
logger.debug("ResultFileDownloadManager: schedule downloads")
116145
forhandlerinself.download_handlers:
117146
ifhandler.is_download_scheduled:
118147
continue
119148
try:
149+
logger.debug(
150+
"- start: {}, row count: {}".format(
151+
handler.result_link.startRowOffset,handler.result_link.rowCount
152+
)
153+
)
120154
self.thread_pool.submit(handler.run)
121155
exceptExceptionase:
122156
logger.error(e)
123157
break
124158
handler.is_download_scheduled=True
125159

126160
def_find_next_file_index(self,next_row_offset:int):
161+
logger.debug(
162+
"ResultFileDownloadManager: trying to find file for row {}".format(
163+
next_row_offset
164+
)
165+
)
127166
# Get the handler index of the next file in order
128167
next_indices= [
129168
i
130169
fori,handlerinenumerate(self.download_handlers)
131170
ifhandler.is_download_scheduled
171+
# TODO: shouldn't `next_row_offset` be tested against the range, not just start row offset?
132172
andhandler.result_link.startRowOffset==next_row_offset
133173
]
174+
175+
foriinnext_indices:
176+
link=self.download_handlers[i].result_link
177+
logger.debug(
178+
"- found file: start {}, row count {}".format(
179+
link.startRowOffset,link.rowCount
180+
)
181+
)
182+
134183
returnnext_indices[0]iflen(next_indices)>0elseNone
135184

136185
def_check_if_download_successful(self,handler:ResultSetDownloadHandler):

‎src/databricks/sql/utils.py‎

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,19 @@ def __init__(
156156
self.lz4_compressed=lz4_compressed
157157
self.description=description
158158

159+
logger.debug(
160+
"Initialize CloudFetch loader, row set start offset: {}, file list:".format(
161+
start_row_offset
162+
)
163+
)
164+
ifresult_linksisnotNone:
165+
forresult_linkinresult_links:
166+
logger.debug(
167+
"- start row offset: {}, row count: {}".format(
168+
result_link.startRowOffset,result_link.rowCount
169+
)
170+
)
171+
159172
self.download_manager=ResultFileDownloadManager(
160173
self.max_download_threads,self.lz4_compressed
161174
)
@@ -175,8 +188,10 @@ def next_n_rows(self, num_rows: int) -> pyarrow.Table:
175188
pyarrow.Table
176189
"""
177190
ifnotself.table:
191+
logger.debug("CloudFetchQueue: no more rows available")
178192
# Return empty pyarrow table to cause retry of fetch
179193
returnself._create_empty_table()
194+
logger.debug("CloudFetchQueue: trying to get {} next rows".format(num_rows))
180195
results=self.table.slice(0,0)
181196
whilenum_rows>0andself.table:
182197
# Get remaining of num_rows or the rest of the current table, whichever is smaller
@@ -190,6 +205,8 @@ def next_n_rows(self, num_rows: int) -> pyarrow.Table:
190205
self.table=self._create_next_table()
191206
self.table_row_index=0
192207
num_rows-=table_slice.num_rows
208+
209+
logger.debug("CloudFetchQueue: collected {} next rows".format(results.num_rows))
193210
returnresults
194211

195212
defremaining_rows(self)->pyarrow.Table:
@@ -214,11 +231,21 @@ def remaining_rows(self) -> pyarrow.Table:
214231
returnresults
215232

216233
def_create_next_table(self)->Union[pyarrow.Table,None]:
234+
logger.debug(
235+
"CloudFetchQueue: Trying to get downloaded file for row {}".format(
236+
self.start_row_index
237+
)
238+
)
217239
# Create next table by retrieving the logical next downloaded file, or return None to signal end of queue
218240
downloaded_file=self.download_manager.get_next_downloaded_file(
219241
self.start_row_index
220242
)
221243
ifnotdownloaded_file:
244+
logger.debug(
245+
"CloudFetchQueue: Cannot find downloaded file for row {}".format(
246+
self.start_row_index
247+
)
248+
)
222249
# None signals no more Arrow tables can be built from the remaining handlers if any remain
223250
returnNone
224251
arrow_table=create_arrow_table_from_arrow_file(
@@ -228,12 +255,18 @@ def _create_next_table(self) -> Union[pyarrow.Table, None]:
228255
# The server rarely prepares the exact number of rows requested by the client in cloud fetch.
229256
# Subsequently, we drop the extraneous rows in the last file if more rows are retrieved than requested
230257
ifarrow_table.num_rows>downloaded_file.row_count:
231-
self.start_row_index+=downloaded_file.row_count
232-
returnarrow_table.slice(0,downloaded_file.row_count)
258+
arrow_table=arrow_table.slice(0,downloaded_file.row_count)
233259

234260
# At this point, whether the file has extraneous rows or not, the arrow table should have the correct num rows
235261
assertdownloaded_file.row_count==arrow_table.num_rows
236262
self.start_row_index+=arrow_table.num_rows
263+
264+
logger.debug(
265+
"CloudFetchQueue: Found downloaded file, row count: {}, new start offset: {}".format(
266+
arrow_table.num_rows,self.start_row_index
267+
)
268+
)
269+
237270
returnarrow_table
238271

239272
def_create_empty_table(self)->pyarrow.Table:

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp