Skip to content

Forward porting all changes into 4.x.x. uptil v3.7.3 #529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/code-quality-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
#----------------------------------------------
- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: .venv-pyarrow
key: venv-pyarrow-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }}
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@
- Split the connector into two separate packages: `databricks-sql-connector` and `databricks-sqlalchemy`. The `databricks-sql-connector` package contains the core functionality of the connector, while the `databricks-sqlalchemy` package contains the SQLAlchemy dialect for the connector.
- Pyarrow dependency is now optional in `databricks-sql-connector`. Users needing arrow are supposed to explicitly install pyarrow

# 3.7.3 (2025-03-28)

- Fix: Unable to poll small results in execute_async function (databricks/databricks-sql-python#515 by @jprakash-db)
- Updated log messages to show the status code and error messages of requests (databricks/databricks-sql-python#511 by @jprakash-db)
- Fix: Incorrect metadata was fetched in case of queries with the same alias (databricks/databricks-sql-python#505 by @jprakash-db)

# 3.7.2 (2025-01-31)

- Updated the retry_dela_max and retry_timeout (databricks/databricks-sql-python#497 by @jprakash-db)

# 3.7.1 (2025-01-07)

- Relaxed the number of Http retry attempts (databricks/databricks-sql-python#486 by @jprakash-db)
Expand Down
21 changes: 12 additions & 9 deletions src/databricks/sql/auth/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,9 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool:
else:
proposed_wait = self.get_backoff_time()

proposed_wait = min(proposed_wait, self.delay_max)
proposed_wait = max(proposed_wait, self.delay_max)
self.check_proposed_wait(proposed_wait)
logger.debug(f"Retrying after {proposed_wait} seconds")
time.sleep(proposed_wait)
return True

Expand Down Expand Up @@ -344,23 +345,24 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
if a retry would violate the configured policy.
"""

logger.info(f"Received status code {status_code} for {method} request")

# Request succeeded. Don't retry.
if status_code == 200:
return False, "200 codes are not retried"

if status_code == 401:
raise NonRecoverableNetworkError(
"Received 401 - UNAUTHORIZED. Confirm your authentication credentials."
return (
False,
"Received 401 - UNAUTHORIZED. Confirm your authentication credentials.",
)

if status_code == 403:
raise NonRecoverableNetworkError(
"Received 403 - FORBIDDEN. Confirm your authentication credentials."
)
return False, "403 codes are not retried"

# Request failed and server said NotImplemented. This isn't recoverable. Don't retry.
if status_code == 501:
raise NonRecoverableNetworkError("Received code 501 from server.")
return False, "Received code 501 from server."

# Request failed and this method is not retryable. We only retry POST requests.
if not self._is_method_retryable(method):
Expand Down Expand Up @@ -399,8 +401,9 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
and status_code not in self.status_forcelist
and status_code not in self.force_dangerous_codes
):
raise UnsafeToRetryError(
"ExecuteStatement command can only be retried for codes 429 and 503. Received code: {status_code}"
return (
False,
"ExecuteStatement command can only be retried for codes 429 and 503",
)

# Request failed with a dangerous code, was an ExecuteStatement, but user forced retries for this
Expand Down
6 changes: 6 additions & 0 deletions src/databricks/sql/auth/thrift_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ def flush(self):
self.message = self.__resp.reason
self.headers = self.__resp.headers

logger.info(
"HTTP Response with status code {}, message: {}".format(
self.code, self.message
)
)

@staticmethod
def basic_proxy_auth_headers(proxy):
if proxy is None or not proxy.username:
Expand Down
4 changes: 4 additions & 0 deletions src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ def execute(
self,
operation: str,
parameters: Optional[TParameterCollection] = None,
enforce_embedded_schema_correctness=False,
) -> "Cursor":
"""
Execute a query and wait for execution to complete.
Expand Down Expand Up @@ -808,6 +809,7 @@ def execute(
use_cloud_fetch=self.connection.use_cloud_fetch,
parameters=prepared_params,
async_op=False,
enforce_embedded_schema_correctness=enforce_embedded_schema_correctness,
)
self.active_result_set = ResultSet(
self.connection,
Expand All @@ -829,6 +831,7 @@ def execute_async(
self,
operation: str,
parameters: Optional[TParameterCollection] = None,
enforce_embedded_schema_correctness=False,
) -> "Cursor":
"""

Expand Down Expand Up @@ -869,6 +872,7 @@ def execute_async(
use_cloud_fetch=self.connection.use_cloud_fetch,
parameters=prepared_params,
async_op=True,
enforce_embedded_schema_correctness=enforce_embedded_schema_correctness,
)

return self
Expand Down
12 changes: 9 additions & 3 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
# - 900s attempts-duration lines up w ODBC/JDBC drivers (for cluster startup > 10 mins)
_retry_policy = { # (type, default, min, max)
"_retry_delay_min": (float, 1, 0.1, 60),
"_retry_delay_max": (float, 30, 5, 3600),
"_retry_delay_max": (float, 60, 5, 3600),
"_retry_stop_after_attempts_count": (int, 30, 1, 60),
"_retry_stop_after_attempts_duration": (float, 900, 1, 86400),
"_retry_delay_default": (float, 5, 1, 60),
Expand Down Expand Up @@ -883,6 +883,7 @@ def execute_command(
use_cloud_fetch=True,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
):
assert session_handle is not None

Expand All @@ -898,8 +899,12 @@ def execute_command(
sessionHandle=session_handle,
statement=operation,
runAsync=True,
getDirectResults=ttypes.TSparkGetDirectResults(
maxRows=max_rows, maxBytes=max_bytes
# For async operation we don't want the direct results
getDirectResults=None
if async_op
else ttypes.TSparkGetDirectResults(
maxRows=max_rows,
maxBytes=max_bytes,
),
canReadArrowResult=True if pyarrow else False,
canDecompressLZ4Result=lz4_compression,
Expand All @@ -910,6 +915,7 @@ def execute_command(
},
useArrowNativeTypes=spark_arrow_types,
parameters=parameters,
enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness,
)
resp = self.make_request(self._client.ExecuteStatement, req)

Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/common/large_queries_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_long_running_query(self):
scale_factor = 1
with self.cursor() as cursor:
while duration < min_duration:
assert scale_factor < 512, "Detected infinite loop"
assert scale_factor < 1024, "Detected infinite loop"
start = time.time()

cursor.execute(
Expand Down
20 changes: 10 additions & 10 deletions tests/e2e/common/retry_test_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ class PySQLRetryTestsMixin:
# For testing purposes
_retry_policy = {
"_retry_delay_min": 0.1,
"_retry_delay_max": 5,
"_retry_delay_max": 3,
"_retry_stop_after_attempts_count": 5,
"_retry_stop_after_attempts_duration": 10,
"_retry_stop_after_attempts_duration": 30,
"_retry_delay_default": 0.5,
}

Expand All @@ -135,7 +135,7 @@ def test_retry_urllib3_settings_are_honored(self):
urllib3_config = {"connect": 10, "read": 11, "redirect": 12}
rp = DatabricksRetryPolicy(
delay_min=0.1,
delay_max=10.0,
delay_max=3,
stop_after_attempts_count=10,
stop_after_attempts_duration=10.0,
delay_default=1.0,
Expand Down Expand Up @@ -174,14 +174,14 @@ def test_retry_max_count_not_exceeded(self):
def test_retry_exponential_backoff(self):
"""GIVEN the retry policy is configured for reasonable exponential backoff
WHEN the server sends nothing but 429 responses with retry-afters
THEN the connector will use those retry-afters values as delay
THEN the connector will use those retry-afters values as floor
"""
retry_policy = self._retry_policy.copy()
retry_policy["_retry_delay_min"] = 1

time_start = time.time()
with mocked_server_response(
status=429, headers={"Retry-After": "3"}
status=429, headers={"Retry-After": "8"}
) as mock_obj:
with pytest.raises(RequestError) as cm:
with self.connection(extra_params=retry_policy) as conn:
Expand All @@ -191,14 +191,14 @@ def test_retry_exponential_backoff(self):
assert isinstance(cm.value.args[1], MaxRetryDurationError)

# With setting delay_min to 1, the expected retry delays should be:
# 3, 3, 3, 3
# 8, 8, 8, 8
# The first 3 retries are allowed, the 4th retry puts the total duration over the limit
# of 10 seconds
# of 30 seconds
assert mock_obj.return_value.getresponse.call_count == 4
assert duration > 6
assert duration > 24

# Should be less than 7, but this is a safe margin for CI/CD slowness
assert duration < 10
# Should be less than 26, but this is a safe margin for CI/CD slowness
assert duration < 30

def test_retry_max_duration_not_exceeded(self):
"""GIVEN the max attempt duration of 10 seconds
Expand Down
66 changes: 59 additions & 7 deletions tests/e2e/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,22 @@ def test_cloud_fetch(self):
for i in range(len(cf_result)):
assert cf_result[i] == noop_result[i]

def test_execute_async(self):
def isExecuting(operation_state):
return not operation_state or operation_state in [
ttypes.TOperationState.RUNNING_STATE,
ttypes.TOperationState.PENDING_STATE,
]

class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase):
def isExecuting(self, operation_state):
return not operation_state or operation_state in [
ttypes.TOperationState.RUNNING_STATE,
ttypes.TOperationState.PENDING_STATE,
]

def test_execute_async__long_running(self):

long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'"
with self.cursor() as cursor:
cursor.execute_async(long_running_query)

## Polling after every POLLING_INTERVAL seconds
while isExecuting(cursor.get_query_state()):
while self.isExecuting(cursor.get_query_state()):
time.sleep(self.POLLING_INTERVAL)
log.info("Polling the status in test_execute_async")

Expand All @@ -198,6 +201,55 @@ def isExecuting(operation_state):

assert result[0].asDict() == {"count(1)": 0}

def test_execute_async__small_result(self):
small_result_query = "SELECT 1"

with self.cursor() as cursor:
cursor.execute_async(small_result_query)

## Fake sleep for 5 secs
time.sleep(5)

## Polling after every POLLING_INTERVAL seconds
while self.isExecuting(cursor.get_query_state()):
time.sleep(self.POLLING_INTERVAL)
log.info("Polling the status in test_execute_async")

cursor.get_async_execution_result()
result = cursor.fetchall()

assert result[0].asDict() == {"1": 1}

def test_execute_async__large_result(self):
x_dimension = 1000
y_dimension = 1000
large_result_query = f"""
SELECT
x.id AS x_id,
y.id AS y_id,
FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') AS date
FROM
RANGE({x_dimension}) x
JOIN
RANGE({y_dimension}) y
"""

with self.cursor() as cursor:
cursor.execute_async(large_result_query)

## Fake sleep for 5 secs
time.sleep(5)

## Polling after every POLLING_INTERVAL seconds
while self.isExecuting(cursor.get_query_state()):
time.sleep(self.POLLING_INTERVAL)
log.info("Polling the status in test_execute_async")

cursor.get_async_execution_result()
result = cursor.fetchall()

assert len(result) == x_dimension * y_dimension


# Exclude Retry tests because they require specific setups, and LargeQueries too slow for core
# tests
Expand Down
21 changes: 10 additions & 11 deletions tests/unit/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ def test_sleep__no_retry_after(self, t_mock, retry_policy, error_history):
retry_policy.history = [error_history, error_history]
retry_policy.sleep(HTTPResponse(status=503))

expected_backoff_time = self.calculate_backoff_time(
0, retry_policy.delay_min, retry_policy.delay_max
expected_backoff_time = max(
self.calculate_backoff_time(
0, retry_policy.delay_min, retry_policy.delay_max
),
retry_policy.delay_max,
)
t_mock.assert_called_with(expected_backoff_time)

Expand All @@ -54,8 +57,11 @@ def test_sleep__no_retry_after_header__multiple_retries(self, t_mock, retry_poli
expected_backoff_times = []
for attempt in range(num_attempts):
expected_backoff_times.append(
self.calculate_backoff_time(
attempt, retry_policy.delay_min, retry_policy.delay_max
max(
self.calculate_backoff_time(
attempt, retry_policy.delay_min, retry_policy.delay_max
),
retry_policy.delay_max,
)
)

Expand All @@ -77,10 +83,3 @@ def test_excessive_retry_attempts_error(self, t_mock, retry_policy):
retry_policy.sleep(HTTPResponse(status=503))
# Internally urllib3 calls the increment function generating a new instance for every retry
retry_policy = retry_policy.increment()

@patch("time.sleep")
def test_sleep__retry_after_present(self, t_mock, retry_policy, error_history):
retry_policy._retry_start_time = time.time()
retry_policy.history = [error_history, error_history, error_history]
retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"}))
t_mock.assert_called_with(3)
Loading