From 5ef946f28f01b7667268c2d7731473c293f07862 Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Wed, 5 Jun 2024 22:33:53 +0300 Subject: [PATCH] Add more debug logging for CloudFetch Signed-off-by: Levko Kravets --- .../sql/cloudfetch/download_manager.py | 49 +++++++++++++++++++ src/databricks/sql/utils.py | 37 +++++++++++++- 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/src/databricks/sql/cloudfetch/download_manager.py b/src/databricks/sql/cloudfetch/download_manager.py index 9a997f39..60fa3c75 100644 --- a/src/databricks/sql/cloudfetch/download_manager.py +++ b/src/databricks/sql/cloudfetch/download_manager.py @@ -49,6 +49,11 @@ def add_file_links( for link in t_spark_arrow_result_links: if link.rowCount <= 0: continue + logger.debug( + "ResultFileDownloadManager.add_file_links: start offset {}, row count: {}".format( + link.startRowOffset, link.rowCount + ) + ) self.download_handlers.append( ResultSetDownloadHandler(self.downloadable_result_settings, link) ) @@ -88,6 +93,12 @@ def get_next_downloaded_file( # Check (and wait) for download status if self._check_if_download_successful(handler): + link = handler.result_link + logger.debug( + "ResultFileDownloadManager: file found for row index {}: start {}, row count: {}".format( + next_row_offset, link.startRowOffset, link.rowCount + ) + ) # Buffer should be empty so set buffer to new ArrowQueue with result_file result = DownloadedFile( handler.result_file, @@ -97,15 +108,32 @@ def get_next_downloaded_file( self.download_handlers.pop(idx) # Return True upon successful download to continue loop and not force a retry return result + else: + logger.debug( + "ResultFileDownloadManager: cannot find file for row index {}".format( + next_row_offset + ) + ) + # Download was not successful for next download item, force a retry self._shutdown_manager() return None def _remove_past_handlers(self, next_row_offset: int): + logger.debug( + "ResultFileDownloadManager: removing past handlers, current offset: {}".format( + next_row_offset + ) + ) # Any link in which its start to end range doesn't include the next row to be fetched does not need downloading i = 0 while i < len(self.download_handlers): result_link = self.download_handlers[i].result_link + logger.debug( + "- checking result link: start {}, row count: {}, current offset: {}".format( + result_link.startRowOffset, result_link.rowCount, next_row_offset + ) + ) if result_link.startRowOffset + result_link.rowCount > next_row_offset: i += 1 continue @@ -113,10 +141,16 @@ def _remove_past_handlers(self, next_row_offset: int): def _schedule_downloads(self): # Schedule downloads for all download handlers if not already scheduled. + logger.debug("ResultFileDownloadManager: schedule downloads") for handler in self.download_handlers: if handler.is_download_scheduled: continue try: + logger.debug( + "- start: {}, row count: {}".format( + handler.result_link.startRowOffset, handler.result_link.rowCount + ) + ) self.thread_pool.submit(handler.run) except Exception as e: logger.error(e) @@ -124,13 +158,28 @@ def _schedule_downloads(self): handler.is_download_scheduled = True def _find_next_file_index(self, next_row_offset: int): + logger.debug( + "ResultFileDownloadManager: trying to find file for row {}".format( + next_row_offset + ) + ) # Get the handler index of the next file in order next_indices = [ i for i, handler in enumerate(self.download_handlers) if handler.is_download_scheduled + # TODO: shouldn't `next_row_offset` be tested against the range, not just start row offset? and handler.result_link.startRowOffset == next_row_offset ] + + for i in next_indices: + link = self.download_handlers[i].result_link + logger.debug( + "- found file: start {}, row count {}".format( + link.startRowOffset, link.rowCount + ) + ) + return next_indices[0] if len(next_indices) > 0 else None def _check_if_download_successful(self, handler: ResultSetDownloadHandler): diff --git a/src/databricks/sql/utils.py b/src/databricks/sql/utils.py index 7c3a014b..9f21a8eb 100644 --- a/src/databricks/sql/utils.py +++ b/src/databricks/sql/utils.py @@ -156,6 +156,19 @@ def __init__( self.lz4_compressed = lz4_compressed self.description = description + logger.debug( + "Initialize CloudFetch loader, row set start offset: {}, file list:".format( + start_row_offset + ) + ) + if result_links is not None: + for result_link in result_links: + logger.debug( + "- start row offset: {}, row count: {}".format( + result_link.startRowOffset, result_link.rowCount + ) + ) + self.download_manager = ResultFileDownloadManager( self.max_download_threads, self.lz4_compressed ) @@ -175,8 +188,10 @@ def next_n_rows(self, num_rows: int) -> pyarrow.Table: pyarrow.Table """ if not self.table: + logger.debug("CloudFetchQueue: no more rows available") # Return empty pyarrow table to cause retry of fetch return self._create_empty_table() + logger.debug("CloudFetchQueue: trying to get {} next rows".format(num_rows)) results = self.table.slice(0, 0) while num_rows > 0 and self.table: # Get remaining of num_rows or the rest of the current table, whichever is smaller @@ -190,6 +205,8 @@ def next_n_rows(self, num_rows: int) -> pyarrow.Table: self.table = self._create_next_table() self.table_row_index = 0 num_rows -= table_slice.num_rows + + logger.debug("CloudFetchQueue: collected {} next rows".format(results.num_rows)) return results def remaining_rows(self) -> pyarrow.Table: @@ -214,11 +231,21 @@ def remaining_rows(self) -> pyarrow.Table: return results def _create_next_table(self) -> Union[pyarrow.Table, None]: + logger.debug( + "CloudFetchQueue: Trying to get downloaded file for row {}".format( + self.start_row_index + ) + ) # Create next table by retrieving the logical next downloaded file, or return None to signal end of queue downloaded_file = self.download_manager.get_next_downloaded_file( self.start_row_index ) if not downloaded_file: + logger.debug( + "CloudFetchQueue: Cannot find downloaded file for row {}".format( + self.start_row_index + ) + ) # None signals no more Arrow tables can be built from the remaining handlers if any remain return None arrow_table = create_arrow_table_from_arrow_file( @@ -228,12 +255,18 @@ def _create_next_table(self) -> Union[pyarrow.Table, None]: # The server rarely prepares the exact number of rows requested by the client in cloud fetch. # Subsequently, we drop the extraneous rows in the last file if more rows are retrieved than requested if arrow_table.num_rows > downloaded_file.row_count: - self.start_row_index += downloaded_file.row_count - return arrow_table.slice(0, downloaded_file.row_count) + arrow_table = arrow_table.slice(0, downloaded_file.row_count) # At this point, whether the file has extraneous rows or not, the arrow table should have the correct num rows assert downloaded_file.row_count == arrow_table.num_rows self.start_row_index += arrow_table.num_rows + + logger.debug( + "CloudFetchQueue: Found downloaded file, row count: {}, new start offset: {}".format( + arrow_table.num_rows, self.start_row_index + ) + ) + return arrow_table def _create_empty_table(self) -> pyarrow.Table: