Skip to content

Commit 5148b07

Browse files
authored
Forward porting all changes into 4.x.x. uptil v3.7.3 (#529)
* Base changes * Black formatter * Cache version fix * Added the changed test_retry.py file * retry_test_mixins changes
1 parent 2ab9a5f commit 5148b07

File tree

10 files changed

+122
-42
lines changed

10 files changed

+122
-42
lines changed

.github/workflows/code-quality-checks.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ jobs:
8989
#----------------------------------------------
9090
- name: Load cached venv
9191
id: cached-poetry-dependencies
92-
uses: actions/cache@v2
92+
uses: actions/cache@v4
9393
with:
9494
path: .venv-pyarrow
9595
key: venv-pyarrow-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }}

CHANGELOG.md

+10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,16 @@
55
- Split the connector into two separate packages: `databricks-sql-connector` and `databricks-sqlalchemy`. The `databricks-sql-connector` package contains the core functionality of the connector, while the `databricks-sqlalchemy` package contains the SQLAlchemy dialect for the connector.
66
- Pyarrow dependency is now optional in `databricks-sql-connector`. Users needing arrow are supposed to explicitly install pyarrow
77

8+
# 3.7.3 (2025-03-28)
9+
10+
- Fix: Unable to poll small results in execute_async function (databricks/databricks-sql-python#515 by @jprakash-db)
11+
- Updated log messages to show the status code and error messages of requests (databricks/databricks-sql-python#511 by @jprakash-db)
12+
- Fix: Incorrect metadata was fetched in case of queries with the same alias (databricks/databricks-sql-python#505 by @jprakash-db)
13+
14+
# 3.7.2 (2025-01-31)
15+
16+
- Updated the retry_dela_max and retry_timeout (databricks/databricks-sql-python#497 by @jprakash-db)
17+
818
# 3.7.1 (2025-01-07)
919

1020
- Relaxed the number of Http retry attempts (databricks/databricks-sql-python#486 by @jprakash-db)

src/databricks/sql/auth/retry.py

+12-9
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,9 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool:
290290
else:
291291
proposed_wait = self.get_backoff_time()
292292

293-
proposed_wait = min(proposed_wait, self.delay_max)
293+
proposed_wait = max(proposed_wait, self.delay_max)
294294
self.check_proposed_wait(proposed_wait)
295+
logger.debug(f"Retrying after {proposed_wait} seconds")
295296
time.sleep(proposed_wait)
296297
return True
297298

@@ -344,23 +345,24 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
344345
if a retry would violate the configured policy.
345346
"""
346347

348+
logger.info(f"Received status code {status_code} for {method} request")
349+
347350
# Request succeeded. Don't retry.
348351
if status_code == 200:
349352
return False, "200 codes are not retried"
350353

351354
if status_code == 401:
352-
raise NonRecoverableNetworkError(
353-
"Received 401 - UNAUTHORIZED. Confirm your authentication credentials."
355+
return (
356+
False,
357+
"Received 401 - UNAUTHORIZED. Confirm your authentication credentials.",
354358
)
355359

356360
if status_code == 403:
357-
raise NonRecoverableNetworkError(
358-
"Received 403 - FORBIDDEN. Confirm your authentication credentials."
359-
)
361+
return False, "403 codes are not retried"
360362

361363
# Request failed and server said NotImplemented. This isn't recoverable. Don't retry.
362364
if status_code == 501:
363-
raise NonRecoverableNetworkError("Received code 501 from server.")
365+
return False, "Received code 501 from server."
364366

365367
# Request failed and this method is not retryable. We only retry POST requests.
366368
if not self._is_method_retryable(method):
@@ -399,8 +401,9 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
399401
and status_code not in self.status_forcelist
400402
and status_code not in self.force_dangerous_codes
401403
):
402-
raise UnsafeToRetryError(
403-
"ExecuteStatement command can only be retried for codes 429 and 503. Received code: {status_code}"
404+
return (
405+
False,
406+
"ExecuteStatement command can only be retried for codes 429 and 503",
404407
)
405408

406409
# Request failed with a dangerous code, was an ExecuteStatement, but user forced retries for this

src/databricks/sql/auth/thrift_http_client.py

+6
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ def flush(self):
198198
self.message = self.__resp.reason
199199
self.headers = self.__resp.headers
200200

201+
logger.info(
202+
"HTTP Response with status code {}, message: {}".format(
203+
self.code, self.message
204+
)
205+
)
206+
201207
@staticmethod
202208
def basic_proxy_auth_headers(proxy):
203209
if proxy is None or not proxy.username:

src/databricks/sql/client.py

+4
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,7 @@ def execute(
744744
self,
745745
operation: str,
746746
parameters: Optional[TParameterCollection] = None,
747+
enforce_embedded_schema_correctness=False,
747748
) -> "Cursor":
748749
"""
749750
Execute a query and wait for execution to complete.
@@ -808,6 +809,7 @@ def execute(
808809
use_cloud_fetch=self.connection.use_cloud_fetch,
809810
parameters=prepared_params,
810811
async_op=False,
812+
enforce_embedded_schema_correctness=enforce_embedded_schema_correctness,
811813
)
812814
self.active_result_set = ResultSet(
813815
self.connection,
@@ -829,6 +831,7 @@ def execute_async(
829831
self,
830832
operation: str,
831833
parameters: Optional[TParameterCollection] = None,
834+
enforce_embedded_schema_correctness=False,
832835
) -> "Cursor":
833836
"""
834837
@@ -869,6 +872,7 @@ def execute_async(
869872
use_cloud_fetch=self.connection.use_cloud_fetch,
870873
parameters=prepared_params,
871874
async_op=True,
875+
enforce_embedded_schema_correctness=enforce_embedded_schema_correctness,
872876
)
873877

874878
return self

src/databricks/sql/thrift_backend.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
# - 900s attempts-duration lines up w ODBC/JDBC drivers (for cluster startup > 10 mins)
6767
_retry_policy = { # (type, default, min, max)
6868
"_retry_delay_min": (float, 1, 0.1, 60),
69-
"_retry_delay_max": (float, 30, 5, 3600),
69+
"_retry_delay_max": (float, 60, 5, 3600),
7070
"_retry_stop_after_attempts_count": (int, 30, 1, 60),
7171
"_retry_stop_after_attempts_duration": (float, 900, 1, 86400),
7272
"_retry_delay_default": (float, 5, 1, 60),
@@ -883,6 +883,7 @@ def execute_command(
883883
use_cloud_fetch=True,
884884
parameters=[],
885885
async_op=False,
886+
enforce_embedded_schema_correctness=False,
886887
):
887888
assert session_handle is not None
888889

@@ -898,8 +899,12 @@ def execute_command(
898899
sessionHandle=session_handle,
899900
statement=operation,
900901
runAsync=True,
901-
getDirectResults=ttypes.TSparkGetDirectResults(
902-
maxRows=max_rows, maxBytes=max_bytes
902+
# For async operation we don't want the direct results
903+
getDirectResults=None
904+
if async_op
905+
else ttypes.TSparkGetDirectResults(
906+
maxRows=max_rows,
907+
maxBytes=max_bytes,
903908
),
904909
canReadArrowResult=True if pyarrow else False,
905910
canDecompressLZ4Result=lz4_compression,
@@ -910,6 +915,7 @@ def execute_command(
910915
},
911916
useArrowNativeTypes=spark_arrow_types,
912917
parameters=parameters,
918+
enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness,
913919
)
914920
resp = self.make_request(self._client.ExecuteStatement, req)
915921

tests/e2e/common/large_queries_mixin.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_long_running_query(self):
9494
scale_factor = 1
9595
with self.cursor() as cursor:
9696
while duration < min_duration:
97-
assert scale_factor < 512, "Detected infinite loop"
97+
assert scale_factor < 1024, "Detected infinite loop"
9898
start = time.time()
9999

100100
cursor.execute(

tests/e2e/common/retry_test_mixins.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,9 @@ class PySQLRetryTestsMixin:
121121
# For testing purposes
122122
_retry_policy = {
123123
"_retry_delay_min": 0.1,
124-
"_retry_delay_max": 5,
124+
"_retry_delay_max": 3,
125125
"_retry_stop_after_attempts_count": 5,
126-
"_retry_stop_after_attempts_duration": 10,
126+
"_retry_stop_after_attempts_duration": 30,
127127
"_retry_delay_default": 0.5,
128128
}
129129

@@ -135,7 +135,7 @@ def test_retry_urllib3_settings_are_honored(self):
135135
urllib3_config = {"connect": 10, "read": 11, "redirect": 12}
136136
rp = DatabricksRetryPolicy(
137137
delay_min=0.1,
138-
delay_max=10.0,
138+
delay_max=3,
139139
stop_after_attempts_count=10,
140140
stop_after_attempts_duration=10.0,
141141
delay_default=1.0,
@@ -174,14 +174,14 @@ def test_retry_max_count_not_exceeded(self):
174174
def test_retry_exponential_backoff(self):
175175
"""GIVEN the retry policy is configured for reasonable exponential backoff
176176
WHEN the server sends nothing but 429 responses with retry-afters
177-
THEN the connector will use those retry-afters values as delay
177+
THEN the connector will use those retry-afters values as floor
178178
"""
179179
retry_policy = self._retry_policy.copy()
180180
retry_policy["_retry_delay_min"] = 1
181181

182182
time_start = time.time()
183183
with mocked_server_response(
184-
status=429, headers={"Retry-After": "3"}
184+
status=429, headers={"Retry-After": "8"}
185185
) as mock_obj:
186186
with pytest.raises(RequestError) as cm:
187187
with self.connection(extra_params=retry_policy) as conn:
@@ -191,14 +191,14 @@ def test_retry_exponential_backoff(self):
191191
assert isinstance(cm.value.args[1], MaxRetryDurationError)
192192

193193
# With setting delay_min to 1, the expected retry delays should be:
194-
# 3, 3, 3, 3
194+
# 8, 8, 8, 8
195195
# The first 3 retries are allowed, the 4th retry puts the total duration over the limit
196-
# of 10 seconds
196+
# of 30 seconds
197197
assert mock_obj.return_value.getresponse.call_count == 4
198-
assert duration > 6
198+
assert duration > 24
199199

200-
# Should be less than 7, but this is a safe margin for CI/CD slowness
201-
assert duration < 10
200+
# Should be less than 26, but this is a safe margin for CI/CD slowness
201+
assert duration < 30
202202

203203
def test_retry_max_duration_not_exceeded(self):
204204
"""GIVEN the max attempt duration of 10 seconds

tests/e2e/test_driver.py

+59-7
Original file line numberDiff line numberDiff line change
@@ -177,19 +177,22 @@ def test_cloud_fetch(self):
177177
for i in range(len(cf_result)):
178178
assert cf_result[i] == noop_result[i]
179179

180-
def test_execute_async(self):
181-
def isExecuting(operation_state):
182-
return not operation_state or operation_state in [
183-
ttypes.TOperationState.RUNNING_STATE,
184-
ttypes.TOperationState.PENDING_STATE,
185-
]
180+
181+
class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase):
182+
def isExecuting(self, operation_state):
183+
return not operation_state or operation_state in [
184+
ttypes.TOperationState.RUNNING_STATE,
185+
ttypes.TOperationState.PENDING_STATE,
186+
]
187+
188+
def test_execute_async__long_running(self):
186189

187190
long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'"
188191
with self.cursor() as cursor:
189192
cursor.execute_async(long_running_query)
190193

191194
## Polling after every POLLING_INTERVAL seconds
192-
while isExecuting(cursor.get_query_state()):
195+
while self.isExecuting(cursor.get_query_state()):
193196
time.sleep(self.POLLING_INTERVAL)
194197
log.info("Polling the status in test_execute_async")
195198

@@ -198,6 +201,55 @@ def isExecuting(operation_state):
198201

199202
assert result[0].asDict() == {"count(1)": 0}
200203

204+
def test_execute_async__small_result(self):
205+
small_result_query = "SELECT 1"
206+
207+
with self.cursor() as cursor:
208+
cursor.execute_async(small_result_query)
209+
210+
## Fake sleep for 5 secs
211+
time.sleep(5)
212+
213+
## Polling after every POLLING_INTERVAL seconds
214+
while self.isExecuting(cursor.get_query_state()):
215+
time.sleep(self.POLLING_INTERVAL)
216+
log.info("Polling the status in test_execute_async")
217+
218+
cursor.get_async_execution_result()
219+
result = cursor.fetchall()
220+
221+
assert result[0].asDict() == {"1": 1}
222+
223+
def test_execute_async__large_result(self):
224+
x_dimension = 1000
225+
y_dimension = 1000
226+
large_result_query = f"""
227+
SELECT
228+
x.id AS x_id,
229+
y.id AS y_id,
230+
FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') AS date
231+
FROM
232+
RANGE({x_dimension}) x
233+
JOIN
234+
RANGE({y_dimension}) y
235+
"""
236+
237+
with self.cursor() as cursor:
238+
cursor.execute_async(large_result_query)
239+
240+
## Fake sleep for 5 secs
241+
time.sleep(5)
242+
243+
## Polling after every POLLING_INTERVAL seconds
244+
while self.isExecuting(cursor.get_query_state()):
245+
time.sleep(self.POLLING_INTERVAL)
246+
log.info("Polling the status in test_execute_async")
247+
248+
cursor.get_async_execution_result()
249+
result = cursor.fetchall()
250+
251+
assert len(result) == x_dimension * y_dimension
252+
201253

202254
# Exclude Retry tests because they require specific setups, and LargeQueries too slow for core
203255
# tests

tests/unit/test_retry.py

+10-11
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ def test_sleep__no_retry_after(self, t_mock, retry_policy, error_history):
3434
retry_policy.history = [error_history, error_history]
3535
retry_policy.sleep(HTTPResponse(status=503))
3636

37-
expected_backoff_time = self.calculate_backoff_time(
38-
0, retry_policy.delay_min, retry_policy.delay_max
37+
expected_backoff_time = max(
38+
self.calculate_backoff_time(
39+
0, retry_policy.delay_min, retry_policy.delay_max
40+
),
41+
retry_policy.delay_max,
3942
)
4043
t_mock.assert_called_with(expected_backoff_time)
4144

@@ -54,8 +57,11 @@ def test_sleep__no_retry_after_header__multiple_retries(self, t_mock, retry_poli
5457
expected_backoff_times = []
5558
for attempt in range(num_attempts):
5659
expected_backoff_times.append(
57-
self.calculate_backoff_time(
58-
attempt, retry_policy.delay_min, retry_policy.delay_max
60+
max(
61+
self.calculate_backoff_time(
62+
attempt, retry_policy.delay_min, retry_policy.delay_max
63+
),
64+
retry_policy.delay_max,
5965
)
6066
)
6167

@@ -77,10 +83,3 @@ def test_excessive_retry_attempts_error(self, t_mock, retry_policy):
7783
retry_policy.sleep(HTTPResponse(status=503))
7884
# Internally urllib3 calls the increment function generating a new instance for every retry
7985
retry_policy = retry_policy.increment()
80-
81-
@patch("time.sleep")
82-
def test_sleep__retry_after_present(self, t_mock, retry_policy, error_history):
83-
retry_policy._retry_start_time = time.time()
84-
retry_policy.history = [error_history, error_history, error_history]
85-
retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"}))
86-
t_mock.assert_called_with(3)

0 commit comments

Comments
 (0)