1
1
import logging
2
2
from dataclasses import dataclass
3
-
3
+ from datetime import datetime
4
4
import requests
5
5
import lz4 .frame
6
6
import threading
7
7
import time
8
-
8
+ import os
9
+ from threading import get_ident
9
10
from databricks .sql .thrift_api .TCLIService .ttypes import TSparkArrowResultLink
10
11
12
+ logging .basicConfig (format = "%(asctime)s %(message)s" )
11
13
logger = logging .getLogger (__name__ )
12
14
15
+ DEFAULT_CLOUD_FILE_TIMEOUT = int (os .getenv ("DATABRICKS_CLOUD_FILE_TIMEOUT" , 60 ))
16
+
13
17
14
18
@dataclass
15
19
class DownloadableResultSettings :
@@ -25,7 +29,7 @@ class DownloadableResultSettings:
25
29
26
30
is_lz4_compressed : bool
27
31
link_expiry_buffer_secs : int = 0
28
- download_timeout : int = 60
32
+ download_timeout : int = DEFAULT_CLOUD_FILE_TIMEOUT
29
33
max_consecutive_file_download_retries : int = 0
30
34
31
35
@@ -57,16 +61,27 @@ def is_file_download_successful(self) -> bool:
57
61
else None
58
62
)
59
63
try :
64
+ msg = f"{ datetime .now ()} { (os .getpid (), get_ident ())} wait for { timeout } for download file: startRow { self .result_link .startRowOffset } , rowCount{ self .result_link .rowCount } "
65
+ logger .debug (
66
+ f"{ datetime .now ()} { (os .getpid (), get_ident ())} wait for { timeout } for download file: startRow { self .result_link .startRowOffset } , rowCount{ self .result_link .rowCount } "
67
+ )
68
+
60
69
if not self .is_download_finished .wait (timeout = timeout ):
61
70
self .is_download_timedout = True
62
71
logger .debug (
63
- "Cloud fetch download timed out after {} seconds for link representing rows {} to {}" .format (
72
+ "{} {} Cloud fetch download timed out after {} seconds for link representing rows {} to {}" .format (
73
+ datetime .now (),
74
+ (os .getpid (), get_ident ()),
64
75
self .settings .download_timeout ,
65
76
self .result_link .startRowOffset ,
66
77
self .result_link .startRowOffset + self .result_link .rowCount ,
67
78
)
68
79
)
69
80
return False
81
+
82
+ logger .debug (
83
+ f"{ datetime .now ()} { (os .getpid (), get_ident ())} success wait for { timeout } for download file: startRow { self .result_link .startRowOffset } , rowCount{ self .result_link .rowCount } "
84
+ )
70
85
except Exception as e :
71
86
logger .error (e )
72
87
return False
@@ -92,10 +107,22 @@ def run(self):
92
107
session .timeout = self .settings .download_timeout
93
108
94
109
try :
110
+ logger .debug (
111
+ f"{ datetime .now ()} { (os .getpid (), get_ident ())} start download file: startRow { self .result_link .startRowOffset } , rowCount{ self .result_link .rowCount } "
112
+ )
113
+
95
114
# Get the file via HTTP request
96
115
response = session .get (self .result_link .fileLink )
97
116
117
+ logger .debug (
118
+ f"{ datetime .now ()} { (os .getpid (), get_ident ())} finish download file: startRow { self .result_link .startRowOffset } , rowCount{ self .result_link .rowCount } "
119
+ )
120
+
98
121
if not response .ok :
122
+ logger .error (
123
+ f"{ datetime .now ()} { (os .getpid (), get_ident ())} failed downloading file: startRow { self .result_link .startRowOffset } , rowCount{ self .result_link .rowCount } "
124
+ )
125
+ logger .error (response )
99
126
self .is_file_downloaded_successfully = False
100
127
return
101
128
@@ -109,18 +136,27 @@ def run(self):
109
136
self .result_file = decompressed_data
110
137
111
138
# The size of the downloaded file should match the size specified from TSparkArrowResultLink
112
- self .is_file_downloaded_successfully = (
113
- len (self .result_file ) == self .result_link .bytesNum
139
+ success = len (self .result_file ) == self .result_link .bytesNum
140
+ logger .debug (
141
+ f"{ datetime .now ()} { (os .getpid (), get_ident ())} download successful file: startRow { self .result_link .startRowOffset } , rowCount{ self .result_link .rowCount } "
114
142
)
143
+ self .is_file_downloaded_successfully = success
115
144
except Exception as e :
145
+ logger .debug (
146
+ f"{ datetime .now ()} { (os .getpid (), get_ident ())} exception download file: startRow { self .result_link .startRowOffset } , rowCount{ self .result_link .rowCount } "
147
+ )
116
148
logger .error (e )
117
149
self .is_file_downloaded_successfully = False
118
150
119
151
finally :
120
152
session and session .close ()
153
+ logger .debug (
154
+ f"{ datetime .now ()} { (os .getpid (), get_ident ())} signal finished file: startRow { self .result_link .startRowOffset } , rowCount{ self .result_link .rowCount } "
155
+ )
121
156
# Awaken threads waiting for this to be true which signals the run is complete
122
157
self .is_download_finished .set ()
123
158
159
+
124
160
def _reset (self ):
125
161
"""
126
162
Reset download-related flags for every retry of run()
0 commit comments