Skip to content

Commit 7b835f0

Browse files
rcypher-databricksandrefurlan-db
authored andcommitted
Debug logging and temp code changes (databricks#350)
Signed-off-by: Raymond Cypher <[email protected]>
1 parent a737ef3 commit 7b835f0

File tree

3 files changed

+47
-7
lines changed

3 files changed

+47
-7
lines changed

src/databricks/sql/cloudfetch/downloader.py

+42-6
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
import logging
22
from dataclasses import dataclass
3-
3+
from datetime import datetime
44
import requests
55
import lz4.frame
66
import threading
77
import time
8-
8+
import os
9+
from threading import get_ident
910
from databricks.sql.thrift_api.TCLIService.ttypes import TSparkArrowResultLink
1011

12+
logging.basicConfig(format="%(asctime)s %(message)s")
1113
logger = logging.getLogger(__name__)
1214

15+
DEFAULT_CLOUD_FILE_TIMEOUT = int(os.getenv("DATABRICKS_CLOUD_FILE_TIMEOUT", 60))
16+
1317

1418
@dataclass
1519
class DownloadableResultSettings:
@@ -25,7 +29,7 @@ class DownloadableResultSettings:
2529

2630
is_lz4_compressed: bool
2731
link_expiry_buffer_secs: int = 0
28-
download_timeout: int = 60
32+
download_timeout: int = DEFAULT_CLOUD_FILE_TIMEOUT
2933
max_consecutive_file_download_retries: int = 0
3034

3135

@@ -57,16 +61,27 @@ def is_file_download_successful(self) -> bool:
5761
else None
5862
)
5963
try:
64+
msg = f"{datetime.now()} {(os.getpid(), get_ident())} wait for {timeout} for download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount} "
65+
logger.debug(
66+
f"{datetime.now()} {(os.getpid(), get_ident())} wait for {timeout} for download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount} "
67+
)
68+
6069
if not self.is_download_finished.wait(timeout=timeout):
6170
self.is_download_timedout = True
6271
logger.debug(
63-
"Cloud fetch download timed out after {} seconds for link representing rows {} to {}".format(
72+
"{} {} Cloud fetch download timed out after {} seconds for link representing rows {} to {}".format(
73+
datetime.now(),
74+
(os.getpid(), get_ident()),
6475
self.settings.download_timeout,
6576
self.result_link.startRowOffset,
6677
self.result_link.startRowOffset + self.result_link.rowCount,
6778
)
6879
)
6980
return False
81+
82+
logger.debug(
83+
f"{datetime.now()} {(os.getpid(), get_ident())} success wait for {timeout} for download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount} "
84+
)
7085
except Exception as e:
7186
logger.error(e)
7287
return False
@@ -92,10 +107,22 @@ def run(self):
92107
session.timeout = self.settings.download_timeout
93108

94109
try:
110+
logger.debug(
111+
f"{datetime.now()} {(os.getpid(), get_ident())} start download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}"
112+
)
113+
95114
# Get the file via HTTP request
96115
response = session.get(self.result_link.fileLink)
97116

117+
logger.debug(
118+
f"{datetime.now()} {(os.getpid(), get_ident())} finish download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}"
119+
)
120+
98121
if not response.ok:
122+
logger.error(
123+
f"{datetime.now()} {(os.getpid(), get_ident())} failed downloading file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}"
124+
)
125+
logger.error(response)
99126
self.is_file_downloaded_successfully = False
100127
return
101128

@@ -109,18 +136,27 @@ def run(self):
109136
self.result_file = decompressed_data
110137

111138
# The size of the downloaded file should match the size specified from TSparkArrowResultLink
112-
self.is_file_downloaded_successfully = (
113-
len(self.result_file) == self.result_link.bytesNum
139+
success = len(self.result_file) == self.result_link.bytesNum
140+
logger.debug(
141+
f"{datetime.now()} {(os.getpid(), get_ident())} download successful file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}"
114142
)
143+
self.is_file_downloaded_successfully = success
115144
except Exception as e:
145+
logger.debug(
146+
f"{datetime.now()} {(os.getpid(), get_ident())} exception download file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}"
147+
)
116148
logger.error(e)
117149
self.is_file_downloaded_successfully = False
118150

119151
finally:
120152
session and session.close()
153+
logger.debug(
154+
f"{datetime.now()} {(os.getpid(), get_ident())} signal finished file: startRow {self.result_link.startRowOffset}, rowCount{self.result_link.rowCount}"
155+
)
121156
# Awaken threads waiting for this to be true which signals the run is complete
122157
self.is_download_finished.set()
123158

159+
124160
def _reset(self):
125161
"""
126162
Reset download-related flags for every retry of run()

src/databricks/sql/thrift_backend.py

+1
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,7 @@ def fetch_results(
950950
arrow_schema_bytes,
951951
description,
952952
):
953+
logger.debug("ThriftBackend fetch_results")
953954
assert op_handle is not None
954955

955956
req = ttypes.TFetchResultsReq(

src/databricks/sql/utils.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,11 @@ def __init__(
146146
self.lz4_compressed = lz4_compressed
147147
self.description = description
148148

149+
# self.download_manager = ResultFileDownloadManager(
150+
# self.max_download_threads, self.lz4_compressed
151+
# )
149152
self.download_manager = ResultFileDownloadManager(
150-
self.max_download_threads, self.lz4_compressed
153+
1, self.lz4_compressed
151154
)
152155
self.download_manager.add_file_links(result_links)
153156

0 commit comments

Comments
 (0)