Skip to content

Add more debug logging for CloudFetch #395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions src/databricks/sql/cloudfetch/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def add_file_links(
for link in t_spark_arrow_result_links:
if link.rowCount <= 0:
continue
logger.debug(
"ResultFileDownloadManager.add_file_links: start offset {}, row count: {}".format(
link.startRowOffset, link.rowCount
)
)
self.download_handlers.append(
ResultSetDownloadHandler(self.downloadable_result_settings, link)
)
Expand Down Expand Up @@ -88,6 +93,12 @@ def get_next_downloaded_file(

# Check (and wait) for download status
if self._check_if_download_successful(handler):
link = handler.result_link
logger.debug(
"ResultFileDownloadManager: file found for row index {}: start {}, row count: {}".format(
next_row_offset, link.startRowOffset, link.rowCount
)
)
# Buffer should be empty so set buffer to new ArrowQueue with result_file
result = DownloadedFile(
handler.result_file,
Expand All @@ -97,40 +108,78 @@ def get_next_downloaded_file(
self.download_handlers.pop(idx)
# Return True upon successful download to continue loop and not force a retry
return result
else:
logger.debug(
"ResultFileDownloadManager: cannot find file for row index {}".format(
next_row_offset
)
)

# Download was not successful for next download item, force a retry
self._shutdown_manager()
return None

def _remove_past_handlers(self, next_row_offset: int):
logger.debug(
"ResultFileDownloadManager: removing past handlers, current offset: {}".format(
next_row_offset
)
)
# Any link in which its start to end range doesn't include the next row to be fetched does not need downloading
i = 0
while i < len(self.download_handlers):
result_link = self.download_handlers[i].result_link
logger.debug(
"- checking result link: start {}, row count: {}, current offset: {}".format(
result_link.startRowOffset, result_link.rowCount, next_row_offset
)
)
if result_link.startRowOffset + result_link.rowCount > next_row_offset:
i += 1
continue
self.download_handlers.pop(i)

def _schedule_downloads(self):
# Schedule downloads for all download handlers if not already scheduled.
logger.debug("ResultFileDownloadManager: schedule downloads")
for handler in self.download_handlers:
if handler.is_download_scheduled:
continue
try:
logger.debug(
"- start: {}, row count: {}".format(
handler.result_link.startRowOffset, handler.result_link.rowCount
)
)
self.thread_pool.submit(handler.run)
except Exception as e:
logger.error(e)
break
handler.is_download_scheduled = True

def _find_next_file_index(self, next_row_offset: int):
logger.debug(
"ResultFileDownloadManager: trying to find file for row {}".format(
next_row_offset
)
)
# Get the handler index of the next file in order
next_indices = [
i
for i, handler in enumerate(self.download_handlers)
if handler.is_download_scheduled
# TODO: shouldn't `next_row_offset` be tested against the range, not just start row offset?
and handler.result_link.startRowOffset == next_row_offset
]

for i in next_indices:
link = self.download_handlers[i].result_link
logger.debug(
"- found file: start {}, row count {}".format(
link.startRowOffset, link.rowCount
)
)

return next_indices[0] if len(next_indices) > 0 else None

def _check_if_download_successful(self, handler: ResultSetDownloadHandler):
Expand Down
37 changes: 35 additions & 2 deletions src/databricks/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ def __init__(
self.lz4_compressed = lz4_compressed
self.description = description

logger.debug(
"Initialize CloudFetch loader, row set start offset: {}, file list:".format(
start_row_offset
)
)
if result_links is not None:
for result_link in result_links:
logger.debug(
"- start row offset: {}, row count: {}".format(
result_link.startRowOffset, result_link.rowCount
)
)

self.download_manager = ResultFileDownloadManager(
self.max_download_threads, self.lz4_compressed
)
Expand All @@ -175,8 +188,10 @@ def next_n_rows(self, num_rows: int) -> pyarrow.Table:
pyarrow.Table
"""
if not self.table:
logger.debug("CloudFetchQueue: no more rows available")
# Return empty pyarrow table to cause retry of fetch
return self._create_empty_table()
logger.debug("CloudFetchQueue: trying to get {} next rows".format(num_rows))
results = self.table.slice(0, 0)
while num_rows > 0 and self.table:
# Get remaining of num_rows or the rest of the current table, whichever is smaller
Expand All @@ -190,6 +205,8 @@ def next_n_rows(self, num_rows: int) -> pyarrow.Table:
self.table = self._create_next_table()
self.table_row_index = 0
num_rows -= table_slice.num_rows

logger.debug("CloudFetchQueue: collected {} next rows".format(results.num_rows))
return results

def remaining_rows(self) -> pyarrow.Table:
Expand All @@ -214,11 +231,21 @@ def remaining_rows(self) -> pyarrow.Table:
return results

def _create_next_table(self) -> Union[pyarrow.Table, None]:
logger.debug(
"CloudFetchQueue: Trying to get downloaded file for row {}".format(
self.start_row_index
)
)
# Create next table by retrieving the logical next downloaded file, or return None to signal end of queue
downloaded_file = self.download_manager.get_next_downloaded_file(
self.start_row_index
)
if not downloaded_file:
logger.debug(
"CloudFetchQueue: Cannot find downloaded file for row {}".format(
self.start_row_index
)
)
# None signals no more Arrow tables can be built from the remaining handlers if any remain
return None
arrow_table = create_arrow_table_from_arrow_file(
Expand All @@ -228,12 +255,18 @@ def _create_next_table(self) -> Union[pyarrow.Table, None]:
# The server rarely prepares the exact number of rows requested by the client in cloud fetch.
# Subsequently, we drop the extraneous rows in the last file if more rows are retrieved than requested
if arrow_table.num_rows > downloaded_file.row_count:
self.start_row_index += downloaded_file.row_count
return arrow_table.slice(0, downloaded_file.row_count)
arrow_table = arrow_table.slice(0, downloaded_file.row_count)

# At this point, whether the file has extraneous rows or not, the arrow table should have the correct num rows
assert downloaded_file.row_count == arrow_table.num_rows
self.start_row_index += arrow_table.num_rows

logger.debug(
"CloudFetchQueue: Found downloaded file, row count: {}, new start offset: {}".format(
arrow_table.num_rows, self.start_row_index
)
)

return arrow_table

def _create_empty_table(self) -> pyarrow.Table:
Expand Down
Loading