From e637408a1121644df2d351959e6699f965b513d5 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Wed, 30 Oct 2024 00:11:16 +0530 Subject: [PATCH 1/9] Built the basic flow for the async pipeline - testing is remaining --- src/databricks/sql/client.py | 7 +++++++ src/databricks/sql/thrift_backend.py | 14 +++++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 4e0ab941..5560557c 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -733,6 +733,7 @@ def execute( self, operation: str, parameters: Optional[TParameterCollection] = None, + perform_async = True ) -> "Cursor": """ Execute a query and wait for execution to complete. @@ -796,6 +797,7 @@ def execute( cursor=self, use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, + perform_async=perform_async, ) self.active_result_set = ResultSet( self.connection, @@ -812,6 +814,11 @@ def execute( return self + def executeAsync(self, + operation: str, + parameters: Optional[TParameterCollection] = None,): + return execute(operation, parameters, True) + def executemany(self, operation, seq_of_parameters): """ Execute the operation once for every set of passed in parameters. diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index cf5cd906..6916278b 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -817,6 +817,7 @@ def execute_command( cursor, use_cloud_fetch=True, parameters=[], + perform_async=False, ): assert session_handle is not None @@ -846,7 +847,8 @@ def execute_command( parameters=parameters, ) resp = self.make_request(self._client.ExecuteStatement, req) - return self._handle_execute_response(resp, cursor) + + return self._handle_execute_response(resp, cursor, perform_async) def get_catalogs(self, session_handle, max_rows, max_bytes, cursor): assert session_handle is not None @@ -934,14 +936,16 @@ def get_columns( resp = self.make_request(self._client.GetColumns, req) return self._handle_execute_response(resp, cursor) - def _handle_execute_response(self, resp, cursor): + def _handle_execute_response(self, resp, cursor, perform_async=False): cursor.active_op_handle = resp.operationHandle self._check_direct_results_for_error(resp.directResults) - final_operation_state = self._wait_until_command_done( + if perform_async: + final_operation_state=ttypes.TStatusCode.STILL_EXECUTING_STATUS + else: + final_operation_state=self._wait_until_command_done( resp.operationHandle, - resp.directResults and resp.directResults.operationStatus, - ) + resp.directResults and resp.directResults.operationStatus) return self._results_message_to_execute_response(resp, final_operation_state) From a1743704f5d910354c842b85eca7156e9a901466 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Sat, 2 Nov 2024 16:23:44 +0530 Subject: [PATCH 2/9] Implemented the flow for the get_execution_result, but the problem of invalid operation handle still persists --- src/databricks/sql/constants.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 src/databricks/sql/constants.py diff --git a/src/databricks/sql/constants.py b/src/databricks/sql/constants.py new file mode 100644 index 00000000..e69de29b From 925b2a3039295aedecd057e829c8b5ac19148a75 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Sat, 2 Nov 2024 16:25:02 +0530 Subject: [PATCH 3/9] Missed adding some files in previous commit --- src/databricks/sql/client.py | 35 ++++++++-- src/databricks/sql/constants.py | 12 ++++ src/databricks/sql/thrift_backend.py | 98 ++++++++++++++++++++++++++-- 3 files changed, 134 insertions(+), 11 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 5560557c..be43ae06 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -733,7 +733,7 @@ def execute( self, operation: str, parameters: Optional[TParameterCollection] = None, - perform_async = True + perform_async = False ) -> "Cursor": """ Execute a query and wait for execution to complete. @@ -814,10 +814,37 @@ def execute( return self - def executeAsync(self, + def execute_async(self, operation: str, parameters: Optional[TParameterCollection] = None,): - return execute(operation, parameters, True) + return self.execute(operation, parameters, True) + + def get_query_status(self): + self._check_not_closed() + return self.thrift_backend.get_query_status(self.active_op_handle) + + def get_execution_result(self): + self._check_not_closed() + + operation_state = self.get_query_status() + if operation_state.statusCode == ttypes.TStatusCode.SUCCESS_STATUS or operation_state.statusCode == ttypes.TStatusCode.SUCCESS_WITH_INFO_STATUS: + execute_response=self.thrift_backend.get_execution_result(self.active_op_handle) + self.active_result_set = ResultSet( + self.connection, + execute_response, + self.thrift_backend, + self.buffer_size_bytes, + self.arraysize, + ) + + if execute_response.is_staging_operation: + self._handle_staging_operation( + staging_allowed_local_path=self.thrift_backend.staging_allowed_local_path + ) + + return self + else: + raise Error(f"get_execution_result failed with status code {operation_state.statusCode}") def executemany(self, operation, seq_of_parameters): """ @@ -1126,7 +1153,7 @@ def __init__( self._arrow_schema_bytes = execute_response.arrow_schema_bytes self._next_row_index = 0 - if execute_response.arrow_queue: + if execute_response.arrow_queue or True: # In this case the server has taken the fast path and returned an initial batch of # results self.results = execute_response.arrow_queue diff --git a/src/databricks/sql/constants.py b/src/databricks/sql/constants.py index e69de29b..2245f4f7 100644 --- a/src/databricks/sql/constants.py +++ b/src/databricks/sql/constants.py @@ -0,0 +1,12 @@ +from databricks.sql.thrift_api.TCLIService import ttypes + +class QueryExecutionStatus: + INITIALIZED_STATE=ttypes.TOperationState.INITIALIZED_STATE + RUNNING_STATE = ttypes.TOperationState.RUNNING_STATE + FINISHED_STATE = ttypes.TOperationState.FINISHED_STATE + CANCELED_STATE = ttypes.TOperationState.CANCELED_STATE + CLOSED_STATE = ttypes.TOperationState.CLOSED_STATE + ERROR_STATE = ttypes.TOperationState.ERROR_STATE + UKNOWN_STATE = ttypes.TOperationState.UKNOWN_STATE + PENDING_STATE = ttypes.TOperationState.PENDING_STATE + TIMEDOUT_STATE = ttypes.TOperationState.TIMEDOUT_STATE \ No newline at end of file diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 6916278b..bbb90f1d 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -769,6 +769,66 @@ def _results_message_to_execute_response(self, resp, operation_state): arrow_schema_bytes=schema_bytes, ) + def get_execution_result(self, op_handle): + + assert op_handle is not None + + req = ttypes.TFetchResultsReq( + operationHandle=ttypes.TOperationHandle( + op_handle.operationId, + op_handle.operationType, + False, + op_handle.modifiedRowCount, + ), + maxRows=max_rows, + maxBytes=max_bytes, + orientation=ttypes.TFetchOrientation.FETCH_NEXT, + includeResultSetMetadata=True, + ) + + resp = self.make_request(self._client.FetchResults, req) + + t_result_set_metadata_resp = resp.resultSetMetaData + + lz4_compressed = t_result_set_metadata_resp.lz4Compressed + is_staging_operation = t_result_set_metadata_resp.isStagingOperation + has_more_rows = resp.hasMoreRows + description = self._hive_schema_to_description( + t_result_set_metadata_resp.schema + ) + + if pyarrow: + schema_bytes = ( + t_result_set_metadata_resp.arrowSchema + or self._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema) + .serialize() + .to_pybytes() + ) + else: + schema_bytes = None + + queue = ResultSetQueueFactory.build_queue( + row_set_type=resp.resultSetMetadata.resultFormat, + t_row_set=resp.results, + arrow_schema_bytes=schema_bytes, + max_download_threads=self.max_download_threads, + lz4_compressed=lz4_compressed, + description=description, + ssl_options=self._ssl_options, + ) + + return ExecuteResponse( + arrow_queue=queue, + status=resp.status, + has_been_closed_server_side=has_been_closed_server_side, + has_more_rows=has_more_rows, + lz4_compressed=lz4_compressed, + is_staging_operation=is_staging_operation, + command_handle=resp.operationHandle, + description=description, + arrow_schema_bytes=schema_bytes, + ) + def _wait_until_command_done(self, op_handle, initial_operation_status_resp): if initial_operation_status_resp: self._check_command_not_in_error_or_closed_state( @@ -787,6 +847,12 @@ def _wait_until_command_done(self, op_handle, initial_operation_status_resp): self._check_command_not_in_error_or_closed_state(op_handle, poll_resp) return operation_state + def get_query_status(self, op_handle): + poll_resp = self._poll_for_status(op_handle) + operation_state = poll_resp.status + self._check_command_not_in_error_or_closed_state(op_handle, poll_resp) + return operation_state + @staticmethod def _check_direct_results_for_error(t_spark_direct_results): if t_spark_direct_results: @@ -848,7 +914,10 @@ def execute_command( ) resp = self.make_request(self._client.ExecuteStatement, req) - return self._handle_execute_response(resp, cursor, perform_async) + if perform_async: + return self._handle_execute_response_async(resp, cursor) + else: + return self._handle_execute_response(resp, cursor) def get_catalogs(self, session_handle, max_rows, max_bytes, cursor): assert session_handle is not None @@ -936,19 +1005,34 @@ def get_columns( resp = self.make_request(self._client.GetColumns, req) return self._handle_execute_response(resp, cursor) - def _handle_execute_response(self, resp, cursor, perform_async=False): + def _handle_execute_response(self, resp, cursor): cursor.active_op_handle = resp.operationHandle self._check_direct_results_for_error(resp.directResults) - if perform_async: - final_operation_state=ttypes.TStatusCode.STILL_EXECUTING_STATUS - else: - final_operation_state=self._wait_until_command_done( + final_operation_state = self._wait_until_command_done( resp.operationHandle, - resp.directResults and resp.directResults.operationStatus) + resp.directResults and resp.directResults.operationStatus, + ) return self._results_message_to_execute_response(resp, final_operation_state) + def _handle_execute_response_async(self, resp, cursor): + cursor.active_op_handle = resp.operationHandle + self._check_direct_results_for_error(resp.directResults) + operation_status = resp.status.statusCode + + return ExecuteResponse( + arrow_queue=None, + status=operation_status, + has_been_closed_server_side=None, + has_more_rows=None, + lz4_compressed=None, + is_staging_operation=None, + command_handle=resp.operationHandle, + description=None, + arrow_schema_bytes=None, + ) + def fetch_results( self, op_handle, From 756ac17def5d3c60bff7d49e58fbeaf3a2c34e4f Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 4 Nov 2024 11:18:23 +0530 Subject: [PATCH 4/9] Working prototype of execute_async, get_query_state and get_execution_result --- src/databricks/sql/client.py | 32 +++++++++++++++--------- src/databricks/sql/constants.py | 12 --------- src/databricks/sql/thrift_backend.py | 37 +++++++++++++--------------- 3 files changed, 37 insertions(+), 44 deletions(-) delete mode 100644 src/databricks/sql/constants.py diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index be43ae06..7373be38 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -733,7 +733,7 @@ def execute( self, operation: str, parameters: Optional[TParameterCollection] = None, - perform_async = False + async_op=False, ) -> "Cursor": """ Execute a query and wait for execution to complete. @@ -797,7 +797,7 @@ def execute( cursor=self, use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, - perform_async=perform_async, + async_op=async_op, ) self.active_result_set = ResultSet( self.connection, @@ -805,6 +805,7 @@ def execute( self.thrift_backend, self.buffer_size_bytes, self.arraysize, + async_op, ) if execute_response.is_staging_operation: @@ -814,21 +815,25 @@ def execute( return self - def execute_async(self, - operation: str, - parameters: Optional[TParameterCollection] = None,): + def execute_async( + self, + operation: str, + parameters: Optional[TParameterCollection] = None, + ): return self.execute(operation, parameters, True) - def get_query_status(self): + def get_query_state(self): self._check_not_closed() - return self.thrift_backend.get_query_status(self.active_op_handle) + return self.thrift_backend.get_query_state(self.active_op_handle) def get_execution_result(self): self._check_not_closed() - operation_state = self.get_query_status() - if operation_state.statusCode == ttypes.TStatusCode.SUCCESS_STATUS or operation_state.statusCode == ttypes.TStatusCode.SUCCESS_WITH_INFO_STATUS: - execute_response=self.thrift_backend.get_execution_result(self.active_op_handle) + operation_state = self.get_query_state() + if operation_state == ttypes.TOperationState.FINISHED_STATE: + execute_response = self.thrift_backend.get_execution_result( + self.active_op_handle, self + ) self.active_result_set = ResultSet( self.connection, execute_response, @@ -844,7 +849,9 @@ def get_execution_result(self): return self else: - raise Error(f"get_execution_result failed with status code {operation_state.statusCode}") + raise Error( + f"get_execution_result failed with Operation status {operation_state}" + ) def executemany(self, operation, seq_of_parameters): """ @@ -1131,6 +1138,7 @@ def __init__( thrift_backend: ThriftBackend, result_buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES, arraysize: int = 10000, + async_op=False, ): """ A ResultSet manages the results of a single command. @@ -1153,7 +1161,7 @@ def __init__( self._arrow_schema_bytes = execute_response.arrow_schema_bytes self._next_row_index = 0 - if execute_response.arrow_queue or True: + if execute_response.arrow_queue or async_op: # In this case the server has taken the fast path and returned an initial batch of # results self.results = execute_response.arrow_queue diff --git a/src/databricks/sql/constants.py b/src/databricks/sql/constants.py deleted file mode 100644 index 2245f4f7..00000000 --- a/src/databricks/sql/constants.py +++ /dev/null @@ -1,12 +0,0 @@ -from databricks.sql.thrift_api.TCLIService import ttypes - -class QueryExecutionStatus: - INITIALIZED_STATE=ttypes.TOperationState.INITIALIZED_STATE - RUNNING_STATE = ttypes.TOperationState.RUNNING_STATE - FINISHED_STATE = ttypes.TOperationState.FINISHED_STATE - CANCELED_STATE = ttypes.TOperationState.CANCELED_STATE - CLOSED_STATE = ttypes.TOperationState.CLOSED_STATE - ERROR_STATE = ttypes.TOperationState.ERROR_STATE - UKNOWN_STATE = ttypes.TOperationState.UKNOWN_STATE - PENDING_STATE = ttypes.TOperationState.PENDING_STATE - TIMEDOUT_STATE = ttypes.TOperationState.TIMEDOUT_STATE \ No newline at end of file diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index bbb90f1d..f6c10649 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -769,7 +769,7 @@ def _results_message_to_execute_response(self, resp, operation_state): arrow_schema_bytes=schema_bytes, ) - def get_execution_result(self, op_handle): + def get_execution_result(self, op_handle, cursor): assert op_handle is not None @@ -780,15 +780,15 @@ def get_execution_result(self, op_handle): False, op_handle.modifiedRowCount, ), - maxRows=max_rows, - maxBytes=max_bytes, + maxRows=cursor.arraysize, + maxBytes=cursor.buffer_size_bytes, orientation=ttypes.TFetchOrientation.FETCH_NEXT, includeResultSetMetadata=True, ) resp = self.make_request(self._client.FetchResults, req) - t_result_set_metadata_resp = resp.resultSetMetaData + t_result_set_metadata_resp = resp.resultSetMetadata lz4_compressed = t_result_set_metadata_resp.lz4Compressed is_staging_operation = t_result_set_metadata_resp.isStagingOperation @@ -797,15 +797,12 @@ def get_execution_result(self, op_handle): t_result_set_metadata_resp.schema ) - if pyarrow: - schema_bytes = ( - t_result_set_metadata_resp.arrowSchema - or self._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema) - .serialize() - .to_pybytes() - ) - else: - schema_bytes = None + schema_bytes = ( + t_result_set_metadata_resp.arrowSchema + or self._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema) + .serialize() + .to_pybytes() + ) queue = ResultSetQueueFactory.build_queue( row_set_type=resp.resultSetMetadata.resultFormat, @@ -820,11 +817,11 @@ def get_execution_result(self, op_handle): return ExecuteResponse( arrow_queue=queue, status=resp.status, - has_been_closed_server_side=has_been_closed_server_side, + has_been_closed_server_side=False, has_more_rows=has_more_rows, lz4_compressed=lz4_compressed, is_staging_operation=is_staging_operation, - command_handle=resp.operationHandle, + command_handle=op_handle, description=description, arrow_schema_bytes=schema_bytes, ) @@ -847,9 +844,9 @@ def _wait_until_command_done(self, op_handle, initial_operation_status_resp): self._check_command_not_in_error_or_closed_state(op_handle, poll_resp) return operation_state - def get_query_status(self, op_handle): + def get_query_state(self, op_handle): poll_resp = self._poll_for_status(op_handle) - operation_state = poll_resp.status + operation_state = poll_resp.operationState self._check_command_not_in_error_or_closed_state(op_handle, poll_resp) return operation_state @@ -883,7 +880,7 @@ def execute_command( cursor, use_cloud_fetch=True, parameters=[], - perform_async=False, + async_op=False, ): assert session_handle is not None @@ -914,7 +911,7 @@ def execute_command( ) resp = self.make_request(self._client.ExecuteStatement, req) - if perform_async: + if async_op: return self._handle_execute_response_async(resp, cursor) else: return self._handle_execute_response(resp, cursor) @@ -1012,7 +1009,7 @@ def _handle_execute_response(self, resp, cursor): final_operation_state = self._wait_until_command_done( resp.operationHandle, resp.directResults and resp.directResults.operationStatus, - ) + ) return self._results_message_to_execute_response(resp, final_operation_state) From beffa2fb9cc3a86238105e40d6e2567f4aa07053 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 4 Nov 2024 12:02:29 +0530 Subject: [PATCH 5/9] Added integration tests for execute_async --- tests/e2e/test_driver.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index cfd1e969..0d6d3d29 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -36,6 +36,7 @@ compare_dbr_versions, is_thrift_v5_plus, ) +from databricks.sql.thrift_api.TCLIService import ttypes from tests.e2e.common.core_tests import CoreTestMixin, SmokeTestMixin from tests.e2e.common.large_queries_mixin import LargeQueriesMixin from tests.e2e.common.timestamp_tests import TimestampTestsMixin @@ -175,6 +176,26 @@ def test_cloud_fetch(self): for i in range(len(cf_result)): assert cf_result[i] == noop_result[i] + def test_execute_async(self): + def isExecuting(operation_state): + return not operation_state or operation_state in [ + ttypes.TOperationState.RUNNING_STATE, + ttypes.TOperationState.PENDING_STATE, + ] + + long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'" + with self.cursor() as cursor: + cursor.execute_async(long_running_query) + + ## Polling + while isExecuting(cursor.get_query_state()): + log.info("Polling the status in test_execute_async") + + cursor.get_execution_result() + result = cursor.fetchall() + + assert result[0].asDict() == {"count(1)": 0} + # Exclude Retry tests because they require specific setups, and LargeQueries too slow for core # tests From 8bf4442b58d3bd78553e938ef224e397d2d8aa48 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Sun, 17 Nov 2024 16:43:52 +0530 Subject: [PATCH 6/9] add docs for functions --- src/databricks/sql/client.py | 34 ++++++++++++++++++++++++---- src/databricks/sql/thrift_backend.py | 4 +++- tests/e2e/test_driver.py | 3 ++- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 7373be38..9ade21fc 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -46,7 +46,7 @@ from databricks.sql.experimental.oauth_persistence import OAuthPersistence from databricks.sql.thrift_api.TCLIService.ttypes import ( - TSparkParameter, + TSparkParameter, TOperationState, ) @@ -764,6 +764,11 @@ def execute( Both will result in the query equivalent to "SELECT * FROM table WHERE field = 'foo' being sent to the server + async_op: + Denotes whether the execute command will execute the request asynchronously or not + By default it is set to False, if set True the execution request will be submitted and the code + will be non-blocking. User can later poll and request the result when ready + :returns self """ @@ -819,14 +824,35 @@ def execute_async( self, operation: str, parameters: Optional[TParameterCollection] = None, - ): - return self.execute(operation, parameters, True) + ) -> "Cursor": + """ + + Execute a query and do not wait for it to complete and just move ahead + + Internally it calls execute function with async_op=True + :param operation: + :param parameters: + :return: + """ + self.execute(operation, parameters, True) + return self - def get_query_state(self): + def get_query_state(self) -> "TOperationState": + """ + Get the state of the async executing query or basically poll the status of the query + + :return: + """ self._check_not_closed() return self.thrift_backend.get_query_state(self.active_op_handle) def get_execution_result(self): + """ + + Checks for the status of the async executing query and fetches the result if the query is finished + If executed sets the active_result_set to the obtained result + :return: + """ self._check_not_closed() operation_state = self.get_query_state() diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index f6c10649..ad7d41e3 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -7,6 +7,8 @@ import threading from typing import List, Union +from databricks.sql.thrift_api.TCLIService.ttypes import TOperationState + try: import pyarrow except ImportError: @@ -844,7 +846,7 @@ def _wait_until_command_done(self, op_handle, initial_operation_status_resp): self._check_command_not_in_error_or_closed_state(op_handle, poll_resp) return operation_state - def get_query_state(self, op_handle): + def get_query_state(self, op_handle) -> "TOperationState": poll_resp = self._poll_for_status(op_handle) operation_state = poll_resp.operationState self._check_command_not_in_error_or_closed_state(op_handle, poll_resp) diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 0d6d3d29..54fc7a38 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -187,8 +187,9 @@ def isExecuting(operation_state): with self.cursor() as cursor: cursor.execute_async(long_running_query) - ## Polling + ## Polling after every 10 seconds while isExecuting(cursor.get_query_state()): + time.sleep(10) log.info("Polling the status in test_execute_async") cursor.get_execution_result() From b44b298a83c62375e133e7edf053c47e2366b372 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Sun, 24 Nov 2024 12:02:25 +0530 Subject: [PATCH 7/9] Refractored the async code --- src/databricks/sql/client.py | 59 +++++++++++++++++++++++----- src/databricks/sql/thrift_backend.py | 15 +------ tests/e2e/test_driver.py | 7 ++-- 3 files changed, 55 insertions(+), 26 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 9ade21fc..c72b100d 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1,3 +1,4 @@ +import time from typing import Dict, Tuple, List, Optional, Any, Union, Sequence import pandas @@ -430,6 +431,8 @@ def __init__( self.escaper = ParamEscaper() self.lastrowid = None + self.ASYNC_DEFAULT_POLLING_INTERVAL = 2 + # The ideal return type for this method is perhaps Self, but that was not added until 3.11, and we support pre-3.11 pythons, currently. def __enter__(self) -> "Cursor": return self @@ -733,7 +736,6 @@ def execute( self, operation: str, parameters: Optional[TParameterCollection] = None, - async_op=False, ) -> "Cursor": """ Execute a query and wait for execution to complete. @@ -802,7 +804,7 @@ def execute( cursor=self, use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, - async_op=async_op, + async_op=False, ) self.active_result_set = ResultSet( self.connection, @@ -810,7 +812,6 @@ def execute( self.thrift_backend, self.buffer_size_bytes, self.arraysize, - async_op, ) if execute_response.is_staging_operation: @@ -829,12 +830,43 @@ def execute_async( Execute a query and do not wait for it to complete and just move ahead - Internally it calls execute function with async_op=True :param operation: :param parameters: :return: """ - self.execute(operation, parameters, True) + param_approach = self._determine_parameter_approach(parameters) + if param_approach == ParameterApproach.NONE: + prepared_params = NO_NATIVE_PARAMS + prepared_operation = operation + + elif param_approach == ParameterApproach.INLINE: + prepared_operation, prepared_params = self._prepare_inline_parameters( + operation, parameters + ) + elif param_approach == ParameterApproach.NATIVE: + normalized_parameters = self._normalize_tparametercollection(parameters) + param_structure = self._determine_parameter_structure(normalized_parameters) + transformed_operation = transform_paramstyle( + operation, normalized_parameters, param_structure + ) + prepared_operation, prepared_params = self._prepare_native_parameters( + transformed_operation, normalized_parameters, param_structure + ) + + self._check_not_closed() + self._close_and_clear_active_result_set() + self.thrift_backend.execute_command( + operation=prepared_operation, + session_handle=self.connection._session_handle, + max_rows=self.arraysize, + max_bytes=self.buffer_size_bytes, + lz4_compression=self.connection.lz4_compression, + cursor=self, + use_cloud_fetch=self.connection.use_cloud_fetch, + parameters=prepared_params, + async_op=True, + ) + return self def get_query_state(self) -> "TOperationState": @@ -846,15 +878,25 @@ def get_query_state(self) -> "TOperationState": self._check_not_closed() return self.thrift_backend.get_query_state(self.active_op_handle) - def get_execution_result(self): + def get_async_execution_result(self): """ Checks for the status of the async executing query and fetches the result if the query is finished - If executed sets the active_result_set to the obtained result + Otherwise it will keep polling the status of the query till there is a Not pending state :return: """ self._check_not_closed() + def is_executing(operation_state) -> "bool": + return not operation_state or operation_state in [ + ttypes.TOperationState.RUNNING_STATE, + ttypes.TOperationState.PENDING_STATE, + ] + + while(is_executing(self.get_query_state())): + # Poll after some default time + time.sleep(self.ASYNC_DEFAULT_POLLING_INTERVAL) + operation_state = self.get_query_state() if operation_state == ttypes.TOperationState.FINISHED_STATE: execute_response = self.thrift_backend.get_execution_result( @@ -1164,7 +1206,6 @@ def __init__( thrift_backend: ThriftBackend, result_buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES, arraysize: int = 10000, - async_op=False, ): """ A ResultSet manages the results of a single command. @@ -1187,7 +1228,7 @@ def __init__( self._arrow_schema_bytes = execute_response.arrow_schema_bytes self._next_row_index = 0 - if execute_response.arrow_queue or async_op: + if execute_response.arrow_queue: # In this case the server has taken the fast path and returned an initial batch of # results self.results = execute_response.arrow_queue diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index ad7d41e3..dbfd5936 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -914,7 +914,7 @@ def execute_command( resp = self.make_request(self._client.ExecuteStatement, req) if async_op: - return self._handle_execute_response_async(resp, cursor) + self._handle_execute_response_async(resp, cursor) else: return self._handle_execute_response(resp, cursor) @@ -1018,19 +1018,6 @@ def _handle_execute_response(self, resp, cursor): def _handle_execute_response_async(self, resp, cursor): cursor.active_op_handle = resp.operationHandle self._check_direct_results_for_error(resp.directResults) - operation_status = resp.status.statusCode - - return ExecuteResponse( - arrow_queue=None, - status=operation_status, - has_been_closed_server_side=None, - has_more_rows=None, - lz4_compressed=None, - is_staging_operation=None, - command_handle=resp.operationHandle, - description=None, - arrow_schema_bytes=None, - ) def fetch_results( self, diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 54fc7a38..2f0881cd 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -79,6 +79,7 @@ class PySQLPytestTestCase: } arraysize = 1000 buffer_size_bytes = 104857600 + POLLING_INTERVAL = 2 @pytest.fixture(autouse=True) def get_details(self, connection_details): @@ -187,12 +188,12 @@ def isExecuting(operation_state): with self.cursor() as cursor: cursor.execute_async(long_running_query) - ## Polling after every 10 seconds + ## Polling after every POLLING_INTERVAL seconds while isExecuting(cursor.get_query_state()): - time.sleep(10) + time.sleep(self.POLLING_INTERVAL) log.info("Polling the status in test_execute_async") - cursor.get_execution_result() + cursor.get_async_execution_result() result = cursor.fetchall() assert result[0].asDict() == {"count(1)": 0} From 69b32e9f742247c019cb5fbf912a925450d70b7d Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 25 Nov 2024 22:57:28 +0530 Subject: [PATCH 8/9] Fixed java doc --- src/databricks/sql/client.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index c72b100d..39fa67df 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -766,11 +766,6 @@ def execute( Both will result in the query equivalent to "SELECT * FROM table WHERE field = 'foo' being sent to the server - async_op: - Denotes whether the execute command will execute the request asynchronously or not - By default it is set to False, if set True the execution request will be submitted and the code - will be non-blocking. User can later poll and request the result when ready - :returns self """ From 051169096ad2fed9d138b62113f79efc9621d5b7 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Tue, 26 Nov 2024 01:13:33 +0530 Subject: [PATCH 9/9] Reformatted --- src/databricks/sql/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 39fa67df..8ea81e12 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -47,7 +47,8 @@ from databricks.sql.experimental.oauth_persistence import OAuthPersistence from databricks.sql.thrift_api.TCLIService.ttypes import ( - TSparkParameter, TOperationState, + TSparkParameter, + TOperationState, ) @@ -888,7 +889,7 @@ def is_executing(operation_state) -> "bool": ttypes.TOperationState.PENDING_STATE, ] - while(is_executing(self.get_query_state())): + while is_executing(self.get_query_state()): # Poll after some default time time.sleep(self.ASYNC_DEFAULT_POLLING_INTERVAL)