Skip to content

Commit 1e6c2e9

Browse files
Add more debug logging for CloudFetch (#395)
Signed-off-by: Levko Kravets <[email protected]>
1 parent 6c16b70 commit 1e6c2e9

File tree

2 files changed

+84
-2
lines changed

2 files changed

+84
-2
lines changed

src/databricks/sql/cloudfetch/download_manager.py

+49
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ def add_file_links(
4949
for link in t_spark_arrow_result_links:
5050
if link.rowCount <= 0:
5151
continue
52+
logger.debug(
53+
"ResultFileDownloadManager.add_file_links: start offset {}, row count: {}".format(
54+
link.startRowOffset, link.rowCount
55+
)
56+
)
5257
self.download_handlers.append(
5358
ResultSetDownloadHandler(self.downloadable_result_settings, link)
5459
)
@@ -88,6 +93,12 @@ def get_next_downloaded_file(
8893

8994
# Check (and wait) for download status
9095
if self._check_if_download_successful(handler):
96+
link = handler.result_link
97+
logger.debug(
98+
"ResultFileDownloadManager: file found for row index {}: start {}, row count: {}".format(
99+
next_row_offset, link.startRowOffset, link.rowCount
100+
)
101+
)
91102
# Buffer should be empty so set buffer to new ArrowQueue with result_file
92103
result = DownloadedFile(
93104
handler.result_file,
@@ -97,40 +108,78 @@ def get_next_downloaded_file(
97108
self.download_handlers.pop(idx)
98109
# Return True upon successful download to continue loop and not force a retry
99110
return result
111+
else:
112+
logger.debug(
113+
"ResultFileDownloadManager: cannot find file for row index {}".format(
114+
next_row_offset
115+
)
116+
)
117+
100118
# Download was not successful for next download item, force a retry
101119
self._shutdown_manager()
102120
return None
103121

104122
def _remove_past_handlers(self, next_row_offset: int):
123+
logger.debug(
124+
"ResultFileDownloadManager: removing past handlers, current offset: {}".format(
125+
next_row_offset
126+
)
127+
)
105128
# Any link in which its start to end range doesn't include the next row to be fetched does not need downloading
106129
i = 0
107130
while i < len(self.download_handlers):
108131
result_link = self.download_handlers[i].result_link
132+
logger.debug(
133+
"- checking result link: start {}, row count: {}, current offset: {}".format(
134+
result_link.startRowOffset, result_link.rowCount, next_row_offset
135+
)
136+
)
109137
if result_link.startRowOffset + result_link.rowCount > next_row_offset:
110138
i += 1
111139
continue
112140
self.download_handlers.pop(i)
113141

114142
def _schedule_downloads(self):
115143
# Schedule downloads for all download handlers if not already scheduled.
144+
logger.debug("ResultFileDownloadManager: schedule downloads")
116145
for handler in self.download_handlers:
117146
if handler.is_download_scheduled:
118147
continue
119148
try:
149+
logger.debug(
150+
"- start: {}, row count: {}".format(
151+
handler.result_link.startRowOffset, handler.result_link.rowCount
152+
)
153+
)
120154
self.thread_pool.submit(handler.run)
121155
except Exception as e:
122156
logger.error(e)
123157
break
124158
handler.is_download_scheduled = True
125159

126160
def _find_next_file_index(self, next_row_offset: int):
161+
logger.debug(
162+
"ResultFileDownloadManager: trying to find file for row {}".format(
163+
next_row_offset
164+
)
165+
)
127166
# Get the handler index of the next file in order
128167
next_indices = [
129168
i
130169
for i, handler in enumerate(self.download_handlers)
131170
if handler.is_download_scheduled
171+
# TODO: shouldn't `next_row_offset` be tested against the range, not just start row offset?
132172
and handler.result_link.startRowOffset == next_row_offset
133173
]
174+
175+
for i in next_indices:
176+
link = self.download_handlers[i].result_link
177+
logger.debug(
178+
"- found file: start {}, row count {}".format(
179+
link.startRowOffset, link.rowCount
180+
)
181+
)
182+
134183
return next_indices[0] if len(next_indices) > 0 else None
135184

136185
def _check_if_download_successful(self, handler: ResultSetDownloadHandler):

src/databricks/sql/utils.py

+35-2
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,19 @@ def __init__(
156156
self.lz4_compressed = lz4_compressed
157157
self.description = description
158158

159+
logger.debug(
160+
"Initialize CloudFetch loader, row set start offset: {}, file list:".format(
161+
start_row_offset
162+
)
163+
)
164+
if result_links is not None:
165+
for result_link in result_links:
166+
logger.debug(
167+
"- start row offset: {}, row count: {}".format(
168+
result_link.startRowOffset, result_link.rowCount
169+
)
170+
)
171+
159172
self.download_manager = ResultFileDownloadManager(
160173
self.max_download_threads, self.lz4_compressed
161174
)
@@ -175,8 +188,10 @@ def next_n_rows(self, num_rows: int) -> pyarrow.Table:
175188
pyarrow.Table
176189
"""
177190
if not self.table:
191+
logger.debug("CloudFetchQueue: no more rows available")
178192
# Return empty pyarrow table to cause retry of fetch
179193
return self._create_empty_table()
194+
logger.debug("CloudFetchQueue: trying to get {} next rows".format(num_rows))
180195
results = self.table.slice(0, 0)
181196
while num_rows > 0 and self.table:
182197
# Get remaining of num_rows or the rest of the current table, whichever is smaller
@@ -190,6 +205,8 @@ def next_n_rows(self, num_rows: int) -> pyarrow.Table:
190205
self.table = self._create_next_table()
191206
self.table_row_index = 0
192207
num_rows -= table_slice.num_rows
208+
209+
logger.debug("CloudFetchQueue: collected {} next rows".format(results.num_rows))
193210
return results
194211

195212
def remaining_rows(self) -> pyarrow.Table:
@@ -214,11 +231,21 @@ def remaining_rows(self) -> pyarrow.Table:
214231
return results
215232

216233
def _create_next_table(self) -> Union[pyarrow.Table, None]:
234+
logger.debug(
235+
"CloudFetchQueue: Trying to get downloaded file for row {}".format(
236+
self.start_row_index
237+
)
238+
)
217239
# Create next table by retrieving the logical next downloaded file, or return None to signal end of queue
218240
downloaded_file = self.download_manager.get_next_downloaded_file(
219241
self.start_row_index
220242
)
221243
if not downloaded_file:
244+
logger.debug(
245+
"CloudFetchQueue: Cannot find downloaded file for row {}".format(
246+
self.start_row_index
247+
)
248+
)
222249
# None signals no more Arrow tables can be built from the remaining handlers if any remain
223250
return None
224251
arrow_table = create_arrow_table_from_arrow_file(
@@ -228,12 +255,18 @@ def _create_next_table(self) -> Union[pyarrow.Table, None]:
228255
# The server rarely prepares the exact number of rows requested by the client in cloud fetch.
229256
# Subsequently, we drop the extraneous rows in the last file if more rows are retrieved than requested
230257
if arrow_table.num_rows > downloaded_file.row_count:
231-
self.start_row_index += downloaded_file.row_count
232-
return arrow_table.slice(0, downloaded_file.row_count)
258+
arrow_table = arrow_table.slice(0, downloaded_file.row_count)
233259

234260
# At this point, whether the file has extraneous rows or not, the arrow table should have the correct num rows
235261
assert downloaded_file.row_count == arrow_table.num_rows
236262
self.start_row_index += arrow_table.num_rows
263+
264+
logger.debug(
265+
"CloudFetchQueue: Found downloaded file, row count: {}, new start offset: {}".format(
266+
arrow_table.num_rows, self.start_row_index
267+
)
268+
)
269+
237270
return arrow_table
238271

239272
def _create_empty_table(self) -> pyarrow.Table:

0 commit comments

Comments
 (0)