From 79d21972b4ae281cc4cf212a22dc4890b97ff523 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Fri, 7 Mar 2025 00:35:01 +0530 Subject: [PATCH 1/5] Base changes --- CHANGELOG.md | 10 +++ src/databricks/sql/auth/retry.py | 21 +++--- src/databricks/sql/auth/thrift_http_client.py | 6 ++ src/databricks/sql/client.py | 4 ++ src/databricks/sql/thrift_backend.py | 12 +++- tests/e2e/common/large_queries_mixin.py | 2 +- tests/e2e/test_driver.py | 68 ++++++++++++++++--- 7 files changed, 102 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d64f4d9..6753b60f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ - Split the connector into two separate packages: `databricks-sql-connector` and `databricks-sqlalchemy`. The `databricks-sql-connector` package contains the core functionality of the connector, while the `databricks-sqlalchemy` package contains the SQLAlchemy dialect for the connector. - Pyarrow dependency is now optional in `databricks-sql-connector`. Users needing arrow are supposed to explicitly install pyarrow +# 3.7.3 (2025-03-28) + +- Fix: Unable to poll small results in execute_async function (databricks/databricks-sql-python#515 by @jprakash-db) +- Updated log messages to show the status code and error messages of requests (databricks/databricks-sql-python#511 by @jprakash-db) +- Fix: Incorrect metadata was fetched in case of queries with the same alias (databricks/databricks-sql-python#505 by @jprakash-db) + +# 3.7.2 (2025-01-31) + +- Updated the retry_dela_max and retry_timeout (databricks/databricks-sql-python#497 by @jprakash-db) + # 3.7.1 (2025-01-07) - Relaxed the number of Http retry attempts (databricks/databricks-sql-python#486 by @jprakash-db) diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index 1c6644ff..57cfeed5 100755 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -290,8 +290,9 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool: else: proposed_wait = self.get_backoff_time() - proposed_wait = min(proposed_wait, self.delay_max) + proposed_wait = max(proposed_wait, self.delay_max) self.check_proposed_wait(proposed_wait) + logger.debug(f"Retrying after {proposed_wait} seconds") time.sleep(proposed_wait) return True @@ -344,23 +345,24 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]: if a retry would violate the configured policy. """ + logger.info(f"Received status code {status_code} for {method} request") + # Request succeeded. Don't retry. if status_code == 200: return False, "200 codes are not retried" if status_code == 401: - raise NonRecoverableNetworkError( - "Received 401 - UNAUTHORIZED. Confirm your authentication credentials." + return ( + False, + "Received 401 - UNAUTHORIZED. Confirm your authentication credentials.", ) if status_code == 403: - raise NonRecoverableNetworkError( - "Received 403 - FORBIDDEN. Confirm your authentication credentials." - ) + return False, "403 codes are not retried" # Request failed and server said NotImplemented. This isn't recoverable. Don't retry. if status_code == 501: - raise NonRecoverableNetworkError("Received code 501 from server.") + return False, "Received code 501 from server." # Request failed and this method is not retryable. We only retry POST requests. if not self._is_method_retryable(method): @@ -399,8 +401,9 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]: and status_code not in self.status_forcelist and status_code not in self.force_dangerous_codes ): - raise UnsafeToRetryError( - "ExecuteStatement command can only be retried for codes 429 and 503. Received code: {status_code}" + return ( + False, + "ExecuteStatement command can only be retried for codes 429 and 503", ) # Request failed with a dangerous code, was an ExecuteStatement, but user forced retries for this diff --git a/src/databricks/sql/auth/thrift_http_client.py b/src/databricks/sql/auth/thrift_http_client.py index 6273ab28..f0daae16 100644 --- a/src/databricks/sql/auth/thrift_http_client.py +++ b/src/databricks/sql/auth/thrift_http_client.py @@ -198,6 +198,12 @@ def flush(self): self.message = self.__resp.reason self.headers = self.__resp.headers + logger.info( + "HTTP Response with status code {}, message: {}".format( + self.code, self.message + ) + ) + @staticmethod def basic_proxy_auth_headers(proxy): if proxy is None or not proxy.username: diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index dca286ef..9c16d69d 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -744,6 +744,7 @@ def execute( self, operation: str, parameters: Optional[TParameterCollection] = None, + enforce_embedded_schema_correctness=False ) -> "Cursor": """ Execute a query and wait for execution to complete. @@ -808,6 +809,7 @@ def execute( use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, async_op=False, + enforce_embedded_schema_correctness=enforce_embedded_schema_correctness ) self.active_result_set = ResultSet( self.connection, @@ -829,6 +831,7 @@ def execute_async( self, operation: str, parameters: Optional[TParameterCollection] = None, + enforce_embedded_schema_correctness=False ) -> "Cursor": """ @@ -869,6 +872,7 @@ def execute_async( use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, async_op=True, + enforce_embedded_schema_correctness=enforce_embedded_schema_correctness ) return self diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index f76350a2..03c44bef 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -66,7 +66,7 @@ # - 900s attempts-duration lines up w ODBC/JDBC drivers (for cluster startup > 10 mins) _retry_policy = { # (type, default, min, max) "_retry_delay_min": (float, 1, 0.1, 60), - "_retry_delay_max": (float, 30, 5, 3600), + "_retry_delay_max": (float, 60, 5, 3600), "_retry_stop_after_attempts_count": (int, 30, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), "_retry_delay_default": (float, 5, 1, 60), @@ -883,6 +883,7 @@ def execute_command( use_cloud_fetch=True, parameters=[], async_op=False, + enforce_embedded_schema_correctness=False ): assert session_handle is not None @@ -898,8 +899,12 @@ def execute_command( sessionHandle=session_handle, statement=operation, runAsync=True, - getDirectResults=ttypes.TSparkGetDirectResults( - maxRows=max_rows, maxBytes=max_bytes + # For async operation we don't want the direct results + getDirectResults=None + if async_op + else ttypes.TSparkGetDirectResults( + maxRows=max_rows, + maxBytes=max_bytes, ), canReadArrowResult=True if pyarrow else False, canDecompressLZ4Result=lz4_compression, @@ -910,6 +915,7 @@ def execute_command( }, useArrowNativeTypes=spark_arrow_types, parameters=parameters, + enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness ) resp = self.make_request(self._client.ExecuteStatement, req) diff --git a/tests/e2e/common/large_queries_mixin.py b/tests/e2e/common/large_queries_mixin.py index 41ef029b..ed8ac457 100644 --- a/tests/e2e/common/large_queries_mixin.py +++ b/tests/e2e/common/large_queries_mixin.py @@ -94,7 +94,7 @@ def test_long_running_query(self): scale_factor = 1 with self.cursor() as cursor: while duration < min_duration: - assert scale_factor < 512, "Detected infinite loop" + assert scale_factor < 1024, "Detected infinite loop" start = time.time() cursor.execute( diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 2f0881cd..22b84c87 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -177,19 +177,22 @@ def test_cloud_fetch(self): for i in range(len(cf_result)): assert cf_result[i] == noop_result[i] - def test_execute_async(self): - def isExecuting(operation_state): - return not operation_state or operation_state in [ - ttypes.TOperationState.RUNNING_STATE, - ttypes.TOperationState.PENDING_STATE, - ] + +class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase): + def isExecuting(self, operation_state): + return not operation_state or operation_state in [ + ttypes.TOperationState.RUNNING_STATE, + ttypes.TOperationState.PENDING_STATE, + ] + + def test_execute_async__long_running(self): long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'" with self.cursor() as cursor: cursor.execute_async(long_running_query) ## Polling after every POLLING_INTERVAL seconds - while isExecuting(cursor.get_query_state()): + while self.isExecuting(cursor.get_query_state()): time.sleep(self.POLLING_INTERVAL) log.info("Polling the status in test_execute_async") @@ -198,6 +201,55 @@ def isExecuting(operation_state): assert result[0].asDict() == {"count(1)": 0} + def test_execute_async__small_result(self): + small_result_query = "SELECT 1" + + with self.cursor() as cursor: + cursor.execute_async(small_result_query) + + ## Fake sleep for 5 secs + time.sleep(5) + + ## Polling after every POLLING_INTERVAL seconds + while self.isExecuting(cursor.get_query_state()): + time.sleep(self.POLLING_INTERVAL) + log.info("Polling the status in test_execute_async") + + cursor.get_async_execution_result() + result = cursor.fetchall() + + assert result[0].asDict() == {"1": 1} + + def test_execute_async__large_result(self): + x_dimension = 1000 + y_dimension = 1000 + large_result_query = f""" + SELECT + x.id AS x_id, + y.id AS y_id, + FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') AS date + FROM + RANGE({x_dimension}) x + JOIN + RANGE({y_dimension}) y + """ + + with self.cursor() as cursor: + cursor.execute_async(large_result_query) + + ## Fake sleep for 5 secs + time.sleep(5) + + ## Polling after every POLLING_INTERVAL seconds + while self.isExecuting(cursor.get_query_state()): + time.sleep(self.POLLING_INTERVAL) + log.info("Polling the status in test_execute_async") + + cursor.get_async_execution_result() + result = cursor.fetchall() + + assert len(result) == x_dimension * y_dimension + # Exclude Retry tests because they require specific setups, and LargeQueries too slow for core # tests @@ -828,4 +880,4 @@ def test_initial_namespace(self): cursor.execute("select current_catalog()") assert cursor.fetchone()[0] == self.arguments["catalog"] cursor.execute("select current_database()") - assert cursor.fetchone()[0] == table_name + assert cursor.fetchone()[0] == table_name \ No newline at end of file From 4cf8d5a81a3a363a401e1f7e80e712996f49d8d1 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Fri, 7 Mar 2025 00:35:49 +0530 Subject: [PATCH 2/5] Black formatter --- src/databricks/sql/client.py | 8 ++++---- src/databricks/sql/thrift_backend.py | 4 ++-- tests/e2e/test_driver.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 9c16d69d..b3cafea0 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -744,7 +744,7 @@ def execute( self, operation: str, parameters: Optional[TParameterCollection] = None, - enforce_embedded_schema_correctness=False + enforce_embedded_schema_correctness=False, ) -> "Cursor": """ Execute a query and wait for execution to complete. @@ -809,7 +809,7 @@ def execute( use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, async_op=False, - enforce_embedded_schema_correctness=enforce_embedded_schema_correctness + enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, ) self.active_result_set = ResultSet( self.connection, @@ -831,7 +831,7 @@ def execute_async( self, operation: str, parameters: Optional[TParameterCollection] = None, - enforce_embedded_schema_correctness=False + enforce_embedded_schema_correctness=False, ) -> "Cursor": """ @@ -872,7 +872,7 @@ def execute_async( use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, async_op=True, - enforce_embedded_schema_correctness=enforce_embedded_schema_correctness + enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, ) return self diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 03c44bef..972d9a9b 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -883,7 +883,7 @@ def execute_command( use_cloud_fetch=True, parameters=[], async_op=False, - enforce_embedded_schema_correctness=False + enforce_embedded_schema_correctness=False, ): assert session_handle is not None @@ -915,7 +915,7 @@ def execute_command( }, useArrowNativeTypes=spark_arrow_types, parameters=parameters, - enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness + enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness, ) resp = self.make_request(self._client.ExecuteStatement, req) diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 22b84c87..45fea480 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -880,4 +880,4 @@ def test_initial_namespace(self): cursor.execute("select current_catalog()") assert cursor.fetchone()[0] == self.arguments["catalog"] cursor.execute("select current_database()") - assert cursor.fetchone()[0] == table_name \ No newline at end of file + assert cursor.fetchone()[0] == table_name From 7ed4226fc7f581cc1676e44eb1f061e807427aea Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Fri, 7 Mar 2025 12:18:06 +0530 Subject: [PATCH 3/5] Cache version fix --- .github/workflows/code-quality-checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/code-quality-checks.yml b/.github/workflows/code-quality-checks.yml index 3f89265c..4889156b 100644 --- a/.github/workflows/code-quality-checks.yml +++ b/.github/workflows/code-quality-checks.yml @@ -89,7 +89,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv-pyarrow key: venv-pyarrow-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} From a162de314fa663bc48bc4d72a5861f3659688724 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Fri, 7 Mar 2025 12:22:39 +0530 Subject: [PATCH 4/5] Added the changed test_retry.py file --- tests/unit/test_retry.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index 1e18e1f4..897a1d11 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -34,8 +34,11 @@ def test_sleep__no_retry_after(self, t_mock, retry_policy, error_history): retry_policy.history = [error_history, error_history] retry_policy.sleep(HTTPResponse(status=503)) - expected_backoff_time = self.calculate_backoff_time( - 0, retry_policy.delay_min, retry_policy.delay_max + expected_backoff_time = max( + self.calculate_backoff_time( + 0, retry_policy.delay_min, retry_policy.delay_max + ), + retry_policy.delay_max, ) t_mock.assert_called_with(expected_backoff_time) @@ -54,8 +57,11 @@ def test_sleep__no_retry_after_header__multiple_retries(self, t_mock, retry_poli expected_backoff_times = [] for attempt in range(num_attempts): expected_backoff_times.append( - self.calculate_backoff_time( - attempt, retry_policy.delay_min, retry_policy.delay_max + max( + self.calculate_backoff_time( + attempt, retry_policy.delay_min, retry_policy.delay_max + ), + retry_policy.delay_max, ) ) @@ -77,10 +83,3 @@ def test_excessive_retry_attempts_error(self, t_mock, retry_policy): retry_policy.sleep(HTTPResponse(status=503)) # Internally urllib3 calls the increment function generating a new instance for every retry retry_policy = retry_policy.increment() - - @patch("time.sleep") - def test_sleep__retry_after_present(self, t_mock, retry_policy, error_history): - retry_policy._retry_start_time = time.time() - retry_policy.history = [error_history, error_history, error_history] - retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"})) - t_mock.assert_called_with(3) From 31946cf5e4c57deba92b55e5ccb9e0ccb3e66c80 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Fri, 7 Mar 2025 13:23:13 +0530 Subject: [PATCH 5/5] retry_test_mixins changes --- tests/e2e/common/retry_test_mixins.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/e2e/common/retry_test_mixins.py b/tests/e2e/common/retry_test_mixins.py index 942955ca..b5d01a45 100755 --- a/tests/e2e/common/retry_test_mixins.py +++ b/tests/e2e/common/retry_test_mixins.py @@ -121,9 +121,9 @@ class PySQLRetryTestsMixin: # For testing purposes _retry_policy = { "_retry_delay_min": 0.1, - "_retry_delay_max": 5, + "_retry_delay_max": 3, "_retry_stop_after_attempts_count": 5, - "_retry_stop_after_attempts_duration": 10, + "_retry_stop_after_attempts_duration": 30, "_retry_delay_default": 0.5, } @@ -135,7 +135,7 @@ def test_retry_urllib3_settings_are_honored(self): urllib3_config = {"connect": 10, "read": 11, "redirect": 12} rp = DatabricksRetryPolicy( delay_min=0.1, - delay_max=10.0, + delay_max=3, stop_after_attempts_count=10, stop_after_attempts_duration=10.0, delay_default=1.0, @@ -174,14 +174,14 @@ def test_retry_max_count_not_exceeded(self): def test_retry_exponential_backoff(self): """GIVEN the retry policy is configured for reasonable exponential backoff WHEN the server sends nothing but 429 responses with retry-afters - THEN the connector will use those retry-afters values as delay + THEN the connector will use those retry-afters values as floor """ retry_policy = self._retry_policy.copy() retry_policy["_retry_delay_min"] = 1 time_start = time.time() with mocked_server_response( - status=429, headers={"Retry-After": "3"} + status=429, headers={"Retry-After": "8"} ) as mock_obj: with pytest.raises(RequestError) as cm: with self.connection(extra_params=retry_policy) as conn: @@ -191,14 +191,14 @@ def test_retry_exponential_backoff(self): assert isinstance(cm.value.args[1], MaxRetryDurationError) # With setting delay_min to 1, the expected retry delays should be: - # 3, 3, 3, 3 + # 8, 8, 8, 8 # The first 3 retries are allowed, the 4th retry puts the total duration over the limit - # of 10 seconds + # of 30 seconds assert mock_obj.return_value.getresponse.call_count == 4 - assert duration > 6 + assert duration > 24 - # Should be less than 7, but this is a safe margin for CI/CD slowness - assert duration < 10 + # Should be less than 26, but this is a safe margin for CI/CD slowness + assert duration < 30 def test_retry_max_duration_not_exceeded(self): """GIVEN the max attempt duration of 10 seconds