diff --git a/src/databricks/sql/cloudfetch/downloader.py b/src/databricks/sql/cloudfetch/downloader.py index 019c4ef9..697e09a5 100644 --- a/src/databricks/sql/cloudfetch/downloader.py +++ b/src/databricks/sql/cloudfetch/downloader.py @@ -1,15 +1,19 @@ import logging from dataclasses import dataclass - +from datetime import datetime import requests import lz4.frame import threading import time - +import os +from threading import get_ident from databricks.sql.thrift_api.TCLIService.ttypes import TSparkArrowResultLink +logging.basicConfig(format="%(asctime)s %(message)s") logger = logging.getLogger(__name__) +DEFAULT_CLOUD_FILE_TIMEOUT = int(os.getenv("DATABRICKS_CLOUD_FILE_TIMEOUT", 60)) + @dataclass class DownloadableResultSettings: @@ -25,7 +29,7 @@ class DownloadableResultSettings: is_lz4_compressed: bool link_expiry_buffer_secs: int = 0 - download_timeout: int = 60 + download_timeout: int = DEFAULT_CLOUD_FILE_TIMEOUT max_consecutive_file_download_retries: int = 0 @@ -57,16 +61,27 @@ def is_file_download_successful(self) -> bool: else None ) try: + msg = f"{datetime.now()} {(os.getpid(), get_ident())} wait for {timeout} for download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount} " + logger.debug( + f"{datetime.now()} {(os.getpid(), get_ident())} wait for {timeout} for download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount} " + ) + if not self.is_download_finished.wait(timeout=timeout): self.is_download_timedout = True logger.debug( - "Cloud fetch download timed out after {} seconds for link representing rows {} to {}".format( + "{} {} Cloud fetch download timed out after {} seconds for link representing rows {} to {}".format( + datetime.now(), + (os.getpid(), get_ident()), self.settings.download_timeout, self.result_link.startRowOffset, self.result_link.startRowOffset + self.result_link.rowCount, ) ) return False + + logger.debug( + f"{datetime.now()} {(os.getpid(), get_ident())} success wait for {timeout} for download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount} " + ) except Exception as e: logger.error(e) return False @@ -92,10 +107,22 @@ def run(self): session.timeout = self.settings.download_timeout try: + logger.debug( + f"{datetime.now()} {(os.getpid(), get_ident())} start download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}" + ) + # Get the file via HTTP request response = session.get(self.result_link.fileLink) + logger.debug( + f"{datetime.now()} {(os.getpid(), get_ident())} finish download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}" + ) + if not response.ok: + logger.error( + f"{datetime.now()} {(os.getpid(), get_ident())} failed downloading file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}" + ) + logger.error(response) self.is_file_downloaded_successfully = False return @@ -109,18 +136,27 @@ def run(self): self.result_file = decompressed_data # The size of the downloaded file should match the size specified from TSparkArrowResultLink - self.is_file_downloaded_successfully = ( - len(self.result_file) == self.result_link.bytesNum + success = len(self.result_file) == self.result_link.bytesNum + logger.debug( + f"{datetime.now()} {(os.getpid(), get_ident())} download successful file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}" ) + self.is_file_downloaded_successfully = success except Exception as e: + logger.debug( + f"{datetime.now()} {(os.getpid(), get_ident())} exception download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}" + ) logger.error(e) self.is_file_downloaded_successfully = False finally: session and session.close() + logger.debug( + f"{datetime.now()} {(os.getpid(), get_ident())} signal finished file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}" + ) # Awaken threads waiting for this to be true which signals the run is complete self.is_download_finished.set() + def _reset(self): """ Reset download-related flags for every retry of run() diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 69ac760a..4a9e80c4 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -975,6 +975,7 @@ def fetch_results( arrow_schema_bytes, description, ): + logger.debug("ThriftBackend fetch_results") assert op_handle is not None req = ttypes.TFetchResultsReq( diff --git a/src/databricks/sql/utils.py b/src/databricks/sql/utils.py index 7c3a014b..14fa06af 100644 --- a/src/databricks/sql/utils.py +++ b/src/databricks/sql/utils.py @@ -156,8 +156,11 @@ def __init__( self.lz4_compressed = lz4_compressed self.description = description + # self.download_manager = ResultFileDownloadManager( + # self.max_download_threads, self.lz4_compressed + # ) self.download_manager = ResultFileDownloadManager( - self.max_download_threads, self.lz4_compressed + 1, self.lz4_compressed ) self.download_manager.add_file_links(result_links)