From 1c5831313c61d436d654e2bda6063927c8b43314 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Fri, 19 Jan 2024 20:43:42 -0500 Subject: [PATCH 1/6] Attempt to completely remove query_secret and send an empty UUID. Signed-off-by: Jesse Whitehouse --- src/databricks/sql/ae.py | 19 +++++++------------ src/databricks/sql/client.py | 9 +-------- tests/e2e/test_execute_async.py | 33 ++++++++++++++------------------- 3 files changed, 22 insertions(+), 39 deletions(-) diff --git a/src/databricks/sql/ae.py b/src/databricks/sql/ae.py index 0751e1bb..04acb32d 100644 --- a/src/databricks/sql/ae.py +++ b/src/databricks/sql/ae.py @@ -87,7 +87,6 @@ def __init__( thrift_backend: "ThriftBackend", connection: "Connection", query_id: UUID, - query_secret: UUID, status: Optional[AsyncExecutionStatus] = AsyncExecutionStatus.UNKNOWN, execute_statement_response: Optional[ Union[FakeExecuteStatementResponse, ttypes.TExecuteStatementResp] @@ -96,7 +95,6 @@ def __init__( self._connection = connection self._thrift_backend = thrift_backend self.query_id = query_id - self.query_secret = query_secret self.status = status if execute_statement_response: @@ -155,7 +153,7 @@ def _thrift_cancel_operation(self) -> None: def _thrift_get_operation_status(self) -> ttypes.TGetOperationStatusResp: """Execute TGetOperationStatusReq - Raises an AsyncExecutionError if the query_id:query_secret pair is not found on the server. + Raises an AsyncExecutionError if the query_id is not found on the server. """ try: return self._thrift_backend._poll_for_status(self.t_operation_handle) @@ -166,10 +164,10 @@ def _thrift_get_operation_status(self) -> ttypes.TGetOperationStatusResp: ) from e def serialize(self) -> str: - """Return a string representing the query_id and secret of this AsyncExecution. + """Return a hex string representing the query_id of this AsyncExecution. - Use this to preserve a reference to the query_id and query_secret.""" - return f"{self.query_id}:{self.query_secret}" + Use this to preserve a reference to the query_id""" + return f"{self.query_id}" def sync_status(self) -> None: """Synchronise the status of this AsyncExecution with the server query execution state.""" @@ -212,7 +210,7 @@ def t_operation_handle(self) -> ttypes.TOperationHandle: handle = ttypes.TOperationHandle( operationId=ttypes.THandleIdentifier( - guid=self.query_id.bytes, secret=self.query_secret.bytes + guid=self.query_id.bytes, secret=UUID(int=0).bytes ), operationType=ttypes.TOperationType.EXECUTE_STATEMENT, hasResultSet=True, @@ -238,7 +236,6 @@ def from_thrift_response( connection=connection, thrift_backend=thrift_backend, query_id=UUID(bytes=resp.operationHandle.operationId.guid), - query_secret=UUID(bytes=resp.operationHandle.operationId.secret), status=_toperationstate_to_ae_status( resp.directResults.operationStatus.operationState ), @@ -251,11 +248,10 @@ def from_query_id_and_secret( connection: "Connection", thrift_backend: "ThriftBackend", query_id: UUID, - query_secret: UUID, ) -> "AsyncExecution": - """Return a valid AsyncExecution object from a query_id and query_secret. + """Return a valid AsyncExecution object from a query_id. - Raises an AsyncExecutionException if the query_id:query_secret pair is not found on the server. + Raises an AsyncExecutionException if the query_id pair is not found on the server. """ # build a copy of this execution @@ -263,7 +259,6 @@ def from_query_id_and_secret( connection=connection, thrift_backend=thrift_backend, query_id=query_id, - query_secret=query_secret, ) # check to make sure this is a valid one ae.sync_status() diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 97b86ba0..48d40f70 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -405,13 +405,12 @@ def execute_async( ) def get_async_execution( - self, query_id: Union[str, UUID], query_secret: Union[str, UUID] + self, query_id: Union[str, UUID] ) -> "AsyncExecution": """Get an AsyncExecution object for an existing query. Args: query_id: The query id of the query to retrieve - query_secret: The query secret of the query to retrieve Returns: An AsyncExecution object that can be used to poll for status and retrieve results. @@ -422,16 +421,10 @@ def get_async_execution( else: _qid = UUID(hex=query_id) - if isinstance(query_secret, UUID): - _qs = query_secret - else: - _qs = UUID(hex=query_secret) - return AsyncExecution.from_query_id_and_secret( connection=self, thrift_backend=self.thrift_backend, query_id=_qid, - query_secret=_qs, ) diff --git a/tests/e2e/test_execute_async.py b/tests/e2e/test_execute_async.py index 6fde60ea..1eeec8de 100644 --- a/tests/e2e/test_execute_async.py +++ b/tests/e2e/test_execute_async.py @@ -82,28 +82,24 @@ def test_cant_get_results_after_cancel(self, long_running_ae: AsyncExecution): def test_get_async_execution_can_check_status( self, long_running_ae: AsyncExecution ): - query_id, query_secret = str(long_running_ae.query_id), str( - long_running_ae.query_secret - ) + query_id = long_running_ae.serialize() with self.connection() as conn: - ae = conn.get_async_execution(query_id, query_secret) + ae = conn.get_async_execution(query_id) assert ae.is_running def test_get_async_execution_can_cancel_across_threads( self, long_running_ae: AsyncExecution ): - query_id, query_secret = str(long_running_ae.query_id), str( - long_running_ae.query_secret - ) + query_id = long_running_ae.serialize() - def cancel_query_in_separate_thread(query_id, query_secret): + def cancel_query_in_separate_thread(query_id): with self.connection() as conn: - ae = conn.get_async_execution(query_id, query_secret) + ae = conn.get_async_execution(query_id) ae.cancel() threading.Thread( - target=cancel_query_in_separate_thread, args=(query_id, query_secret) + target=cancel_query_in_separate_thread, args=(query_id) ).start() time.sleep(5) @@ -154,11 +150,10 @@ def test_get_async_execution_with_badly_formed_query_id(self): ae = conn.get_async_execution("foo", "bar") def test_serialize(self, long_running_ae: AsyncExecution): - serialized = long_running_ae.serialize() - query_id, query_secret = serialized.split(":") + query_id = long_running_ae.serialize() with self.connection() as conn: - ae = conn.get_async_execution(query_id, query_secret) + ae = conn.get_async_execution(query_id) assert ae.is_running def test_get_async_execution_no_results_when_direct_results_were_sent(self): @@ -166,18 +161,18 @@ def test_get_async_execution_no_results_when_direct_results_were_sent(self): with self.connection() as conn: ae = conn.execute_async(DIRECT_RESULTS_QUERY, {"param": 1}) - query_id, query_secret = ae.serialize().split(":") + query_id = ae.serialize() ae.get_result() with self.connection() as conn: with pytest.raises(AsyncExecutionException, match="Query not found"): - ae_late = conn.get_async_execution(query_id, query_secret) + ae_late = conn.get_async_execution(query_id) def test_get_async_execution_and_fetch_results(self, long_ish_ae: AsyncExecution): - query_id, query_secret = long_ish_ae.serialize().split(":") + query_id = long_ish_ae.serialize() with self.connection() as conn: - ae = conn.get_async_execution(query_id, query_secret) + ae = conn.get_async_execution(query_id) while ae.is_running: time.sleep(1) @@ -194,8 +189,8 @@ def test_get_async_execution_twice(self): with self.connection() as conn_1, self.connection() as conn_2: ae_1 = conn_1.execute_async(LONG_ISH_QUERY) - query_id, query_secret = ae_1.serialize().split(":") - ae_2 = conn_2.get_async_execution(query_id, query_secret) + query_id = ae_1.serialize() + ae_2 = conn_2.get_async_execution(query_id) while ae_1.is_running: time.sleep(1) From e74c69327f116c371c0e21754e7ce0d8af1b3e94 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Fri, 19 Jan 2024 20:43:49 -0500 Subject: [PATCH 2/6] Revert "Attempt to completely remove query_secret and send an empty UUID." This reverts commit 1c5831313c61d436d654e2bda6063927c8b43314. Signed-off-by: Jesse Whitehouse --- src/databricks/sql/ae.py | 19 ++++++++++++------- src/databricks/sql/client.py | 9 ++++++++- tests/e2e/test_execute_async.py | 33 +++++++++++++++++++-------------- 3 files changed, 39 insertions(+), 22 deletions(-) diff --git a/src/databricks/sql/ae.py b/src/databricks/sql/ae.py index 04acb32d..0751e1bb 100644 --- a/src/databricks/sql/ae.py +++ b/src/databricks/sql/ae.py @@ -87,6 +87,7 @@ def __init__( thrift_backend: "ThriftBackend", connection: "Connection", query_id: UUID, + query_secret: UUID, status: Optional[AsyncExecutionStatus] = AsyncExecutionStatus.UNKNOWN, execute_statement_response: Optional[ Union[FakeExecuteStatementResponse, ttypes.TExecuteStatementResp] @@ -95,6 +96,7 @@ def __init__( self._connection = connection self._thrift_backend = thrift_backend self.query_id = query_id + self.query_secret = query_secret self.status = status if execute_statement_response: @@ -153,7 +155,7 @@ def _thrift_cancel_operation(self) -> None: def _thrift_get_operation_status(self) -> ttypes.TGetOperationStatusResp: """Execute TGetOperationStatusReq - Raises an AsyncExecutionError if the query_id is not found on the server. + Raises an AsyncExecutionError if the query_id:query_secret pair is not found on the server. """ try: return self._thrift_backend._poll_for_status(self.t_operation_handle) @@ -164,10 +166,10 @@ def _thrift_get_operation_status(self) -> ttypes.TGetOperationStatusResp: ) from e def serialize(self) -> str: - """Return a hex string representing the query_id of this AsyncExecution. + """Return a string representing the query_id and secret of this AsyncExecution. - Use this to preserve a reference to the query_id""" - return f"{self.query_id}" + Use this to preserve a reference to the query_id and query_secret.""" + return f"{self.query_id}:{self.query_secret}" def sync_status(self) -> None: """Synchronise the status of this AsyncExecution with the server query execution state.""" @@ -210,7 +212,7 @@ def t_operation_handle(self) -> ttypes.TOperationHandle: handle = ttypes.TOperationHandle( operationId=ttypes.THandleIdentifier( - guid=self.query_id.bytes, secret=UUID(int=0).bytes + guid=self.query_id.bytes, secret=self.query_secret.bytes ), operationType=ttypes.TOperationType.EXECUTE_STATEMENT, hasResultSet=True, @@ -236,6 +238,7 @@ def from_thrift_response( connection=connection, thrift_backend=thrift_backend, query_id=UUID(bytes=resp.operationHandle.operationId.guid), + query_secret=UUID(bytes=resp.operationHandle.operationId.secret), status=_toperationstate_to_ae_status( resp.directResults.operationStatus.operationState ), @@ -248,10 +251,11 @@ def from_query_id_and_secret( connection: "Connection", thrift_backend: "ThriftBackend", query_id: UUID, + query_secret: UUID, ) -> "AsyncExecution": - """Return a valid AsyncExecution object from a query_id. + """Return a valid AsyncExecution object from a query_id and query_secret. - Raises an AsyncExecutionException if the query_id pair is not found on the server. + Raises an AsyncExecutionException if the query_id:query_secret pair is not found on the server. """ # build a copy of this execution @@ -259,6 +263,7 @@ def from_query_id_and_secret( connection=connection, thrift_backend=thrift_backend, query_id=query_id, + query_secret=query_secret, ) # check to make sure this is a valid one ae.sync_status() diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 48d40f70..97b86ba0 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -405,12 +405,13 @@ def execute_async( ) def get_async_execution( - self, query_id: Union[str, UUID] + self, query_id: Union[str, UUID], query_secret: Union[str, UUID] ) -> "AsyncExecution": """Get an AsyncExecution object for an existing query. Args: query_id: The query id of the query to retrieve + query_secret: The query secret of the query to retrieve Returns: An AsyncExecution object that can be used to poll for status and retrieve results. @@ -421,10 +422,16 @@ def get_async_execution( else: _qid = UUID(hex=query_id) + if isinstance(query_secret, UUID): + _qs = query_secret + else: + _qs = UUID(hex=query_secret) + return AsyncExecution.from_query_id_and_secret( connection=self, thrift_backend=self.thrift_backend, query_id=_qid, + query_secret=_qs, ) diff --git a/tests/e2e/test_execute_async.py b/tests/e2e/test_execute_async.py index 1eeec8de..6fde60ea 100644 --- a/tests/e2e/test_execute_async.py +++ b/tests/e2e/test_execute_async.py @@ -82,24 +82,28 @@ def test_cant_get_results_after_cancel(self, long_running_ae: AsyncExecution): def test_get_async_execution_can_check_status( self, long_running_ae: AsyncExecution ): - query_id = long_running_ae.serialize() + query_id, query_secret = str(long_running_ae.query_id), str( + long_running_ae.query_secret + ) with self.connection() as conn: - ae = conn.get_async_execution(query_id) + ae = conn.get_async_execution(query_id, query_secret) assert ae.is_running def test_get_async_execution_can_cancel_across_threads( self, long_running_ae: AsyncExecution ): - query_id = long_running_ae.serialize() + query_id, query_secret = str(long_running_ae.query_id), str( + long_running_ae.query_secret + ) - def cancel_query_in_separate_thread(query_id): + def cancel_query_in_separate_thread(query_id, query_secret): with self.connection() as conn: - ae = conn.get_async_execution(query_id) + ae = conn.get_async_execution(query_id, query_secret) ae.cancel() threading.Thread( - target=cancel_query_in_separate_thread, args=(query_id) + target=cancel_query_in_separate_thread, args=(query_id, query_secret) ).start() time.sleep(5) @@ -150,10 +154,11 @@ def test_get_async_execution_with_badly_formed_query_id(self): ae = conn.get_async_execution("foo", "bar") def test_serialize(self, long_running_ae: AsyncExecution): - query_id = long_running_ae.serialize() + serialized = long_running_ae.serialize() + query_id, query_secret = serialized.split(":") with self.connection() as conn: - ae = conn.get_async_execution(query_id) + ae = conn.get_async_execution(query_id, query_secret) assert ae.is_running def test_get_async_execution_no_results_when_direct_results_were_sent(self): @@ -161,18 +166,18 @@ def test_get_async_execution_no_results_when_direct_results_were_sent(self): with self.connection() as conn: ae = conn.execute_async(DIRECT_RESULTS_QUERY, {"param": 1}) - query_id = ae.serialize() + query_id, query_secret = ae.serialize().split(":") ae.get_result() with self.connection() as conn: with pytest.raises(AsyncExecutionException, match="Query not found"): - ae_late = conn.get_async_execution(query_id) + ae_late = conn.get_async_execution(query_id, query_secret) def test_get_async_execution_and_fetch_results(self, long_ish_ae: AsyncExecution): - query_id = long_ish_ae.serialize() + query_id, query_secret = long_ish_ae.serialize().split(":") with self.connection() as conn: - ae = conn.get_async_execution(query_id) + ae = conn.get_async_execution(query_id, query_secret) while ae.is_running: time.sleep(1) @@ -189,8 +194,8 @@ def test_get_async_execution_twice(self): with self.connection() as conn_1, self.connection() as conn_2: ae_1 = conn_1.execute_async(LONG_ISH_QUERY) - query_id = ae_1.serialize() - ae_2 = conn_2.get_async_execution(query_id) + query_id, query_secret = ae_1.serialize().split(":") + ae_2 = conn_2.get_async_execution(query_id, query_secret) while ae_1.is_running: time.sleep(1) From a4a6d9aaa6330ad5186eb1f81a26193d19043c96 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Fri, 19 Jan 2024 22:02:14 -0500 Subject: [PATCH 3/6] Add documentation for execute_async. Update changelog. Signed-off-by: Jesse Whitehouse --- CHANGELOG.md | 4 ++ docs/execute_async.md | 94 +++++++++++++++++++++++++++++++++ tests/e2e/test_execute_async.py | 2 +- 3 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 docs/execute_async.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c15f2f9..43928d81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Release History +## 3.1.0 (TBD) + +- Add `execute_async` and `get_async_execution` methods to `Connection` + ## 3.0.1 (2023-12-01) - Other: updated docstring comment about default parameterization approach (#287) diff --git a/docs/execute_async.md b/docs/execute_async.md new file mode 100644 index 00000000..97f26a42 --- /dev/null +++ b/docs/execute_async.md @@ -0,0 +1,94 @@ +# Run queries asynchronously + +This driver supports running queries asynchronously using `execute_async()`. This method can return control to the calling code almost immediately, instead of blocking until the query completes. `execute_async()` never returns a `ResultSet`. Instead it returns a query handle that you can use to poll for the query status, cancel the query, or fetch the query results. + +You can use this method to submit multiple queries in rapid succession and to pick up a running query executions that were kicked-off from a separate thread or `Connection`. This can be especially useful for recovering running executions within serverless functions. + +**Note:** Asynchronous execution is not the same as _asyncio_ in Python. + + +# Requirements + +- `databricks-sql-connector>=3.1.0` + +# Interface + +To run a query asynchronously, use `databricks.sql.client.Connection.execute_async()`. This method takes the same arguments as `execute()`. To pick up an existing query run, use `databricks.sql.client.Connection.get_async_execution()`. Both methods return an `AsyncExecution` object, which lets you interact with a query by exposing these properties and methods: + +**Properties** +- `AsyncExecution.status` is the last-known status of this query run, expressed as an `AsyncExecutionStatus` enumeration value. For example, `RUNNING`, `FINISHED`, or `CANCELED`. When you first call `execute_async()` the resulting status will usually be `RUNNING`. Calling `sync_status()` will refresh the value of this property. +- `AsyncExecution.query_id` is the `UUID` for this query run. You can use this to look-up the query in the Databricks SQL query history. +- `AsyncExecution.query_secret` is the `UUID` secret for this query run. Both the `query_id` and `secret` are needed to fetch results for a running query. A query can be canceled using only its `query_id`. +- `AsyncExecution.is_available` is a boolean that indicates whether this execution can be picked up from another `Connection` or thread. See [below](#note-about-direct-result-queries) for more information. + +**Methods** +- `AsyncExecution.sync_status()` performs a network round-trip to synchronize `.status`. +- `AsyncExecution.get_results()` returns the `ResultSet` for this query run. This is the same return signature as a synchronous `.execute()` call. Note that if you call `get_results()` on a query that is still running, the code will block until the query finishes running and the result has been fetched. +- `AsyncExecution.cancel()` performs a network round-trip to cancel this query execution. +- `AsyncExceution.serialize()` returns a string of the `query_id:query_secret` for this execution. + +**Note:** You do not need to directly instantiate an `AsyncExecution` object in your client code. Instead, use the `execute_async` and `get_async_execution` methods to run or pick up a running query. + +# Code Examples + +### Run a query asynchronously + +This snippet mirrors the synchronous example in this repository's README. + +```python +import os +import time +from databricks import sql + +host = os.getenv("DATABRICKS_HOST") +http_path = os.getenv("DATABRICKS_HTTP_PATH") +access_token = os.getenv("DATABRICKS_TOKEN") + +with sql.connect(server_hostname=host, http_path=http_path, access_token=access_token) as connection: + query = connection.execute_async("SELECT :param `p`, * FROM RANGE(10" {"param": "foo"}) + + # Poll for the result every 5 seconds + while query.is_running: + time.sleep(5) + query.sync_status() + + # this will raise a AsyncExecutionUnrecoverableResultException if the query was canceled + result = query.get_results().fetchall() + +``` + +### Pick up a running query + +Both a `query_id` and `query_secret` are required to pick up a running query. This example runs a query from one `Connection` and fetches its result from a different connection. + +```python +import os +import time +from databricks import sql + +host = os.getenv("DATABRICKS_HOST") +http_path = os.getenv("DATABRICKS_HTTP_PATH") +access_token = os.getenv("DATABRICKS_TOKEN") + +with sql.connect(server_hostname=host, http_path=http_path, access_token=access_token) as connection: + query = connection.execute_async("SELECT :param `p`, * FROM RANGE(10" {"param": "foo"}) + query_id_and_secret = query.serialize() + +# The connection created above has now closed + +with sql.connect(server_hostname=host, http_path=http_path, access_token=access_token) as connection: + query_id, query_secret = query_id_and_secret.split(":") + query = connection.get_async_execution(query_id, query_secret) + + while query.is_running: + time.sleep(5) + query.sync_status() + + result = query.get_results().fetchall() +``` + +# Note about direct result queries + +To minimise network roundtrips for small queries, Databricks will eagerly return a query result if the query completes within five seconds. This means that `execute_async()` may take up to five seconds to return control back to your calling code. When this happens, `AsyncExecution.is_available` will evaluate `False` and the query result will have already been cached in this `AsyncExecution` object. Calling `get_results()` will not invoke a network round-trip. + +Queries that execute in this fashion cannot be picked up with `get_async_execution()` and their results are not persisted on the server to be fetched by a separate thread. Therefore, before calling `.serialize()` to persist a `query_id:query_secret` pair, you should check if `AsyncExecution.available == True` first. \ No newline at end of file diff --git a/tests/e2e/test_execute_async.py b/tests/e2e/test_execute_async.py index 6fde60ea..f7db58bb 100644 --- a/tests/e2e/test_execute_async.py +++ b/tests/e2e/test_execute_async.py @@ -162,7 +162,7 @@ def test_serialize(self, long_running_ae: AsyncExecution): assert ae.is_running def test_get_async_execution_no_results_when_direct_results_were_sent(self): - """It remains to be seen whether results can be fetched repeatedly from a "picked up" execution.""" + """When DirectResults are sent, they cannot be fetched from a separate thread.""" with self.connection() as conn: ae = conn.execute_async(DIRECT_RESULTS_QUERY, {"param": 1}) From 629cb2b85490301db650a365ad2019ffc4c6c7b5 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Mon, 22 Jan 2024 15:24:46 -0500 Subject: [PATCH 4/6] Fix a few typos and clarify terms Signed-off-by: Jesse Whitehouse --- docs/execute_async.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/execute_async.md b/docs/execute_async.md index 97f26a42..de5d7af2 100644 --- a/docs/execute_async.md +++ b/docs/execute_async.md @@ -1,8 +1,8 @@ # Run queries asynchronously -This driver supports running queries asynchronously using `execute_async()`. This method can return control to the calling code almost immediately, instead of blocking until the query completes. `execute_async()` never returns a `ResultSet`. Instead it returns a query handle that you can use to poll for the query status, cancel the query, or fetch the query results. +This driver supports running queries asynchronously using `execute_async()`. This method can return control to the calling code almost immediately instead of blocking until the query completes. `execute_async()` never returns a `ResultSet`. Instead it returns a query handle that you can use to poll for the query status, cancel the query, or fetch the query results. -You can use this method to submit multiple queries in rapid succession and to pick up a running query executions that were kicked-off from a separate thread or `Connection`. This can be especially useful for recovering running executions within serverless functions. +You can use this method to submit multiple queries in rapid succession and to pick up running query executions that were kicked-off from a separate thread or `Connection`. This can be especially useful for recovering running queries within serverless functions. **Note:** Asynchronous execution is not the same as _asyncio_ in Python. @@ -16,15 +16,15 @@ You can use this method to submit multiple queries in rapid succession and to pi To run a query asynchronously, use `databricks.sql.client.Connection.execute_async()`. This method takes the same arguments as `execute()`. To pick up an existing query run, use `databricks.sql.client.Connection.get_async_execution()`. Both methods return an `AsyncExecution` object, which lets you interact with a query by exposing these properties and methods: **Properties** -- `AsyncExecution.status` is the last-known status of this query run, expressed as an `AsyncExecutionStatus` enumeration value. For example, `RUNNING`, `FINISHED`, or `CANCELED`. When you first call `execute_async()` the resulting status will usually be `RUNNING`. Calling `sync_status()` will refresh the value of this property. +- `AsyncExecution.status` is the last-known status of this query run, expressed as an `AsyncExecutionStatus` enumeration value. For example, `RUNNING`, `FINISHED`, or `CANCELED`. When you first call `execute_async()` the resulting status will usually be `RUNNING`. Calling `sync_status()` will refresh the value of this property. In most usages, you do not need to access `.status` directly and can instead use `.is_running`, `.is_canceled`, and `.is_finished` instead. - `AsyncExecution.query_id` is the `UUID` for this query run. You can use this to look-up the query in the Databricks SQL query history. -- `AsyncExecution.query_secret` is the `UUID` secret for this query run. Both the `query_id` and `secret` are needed to fetch results for a running query. A query can be canceled using only its `query_id`. +- `AsyncExecution.query_secret` is the `UUID` secret for this query run. Both the `query_id` and `secret` are needed to fetch results for a running query. - `AsyncExecution.is_available` is a boolean that indicates whether this execution can be picked up from another `Connection` or thread. See [below](#note-about-direct-result-queries) for more information. **Methods** - `AsyncExecution.sync_status()` performs a network round-trip to synchronize `.status`. - `AsyncExecution.get_results()` returns the `ResultSet` for this query run. This is the same return signature as a synchronous `.execute()` call. Note that if you call `get_results()` on a query that is still running, the code will block until the query finishes running and the result has been fetched. -- `AsyncExecution.cancel()` performs a network round-trip to cancel this query execution. +- `AsyncExecution.cancel()` performs a network round-trip to cancel this query run. - `AsyncExceution.serialize()` returns a string of the `query_id:query_secret` for this execution. **Note:** You do not need to directly instantiate an `AsyncExecution` object in your client code. Instead, use the `execute_async` and `get_async_execution` methods to run or pick up a running query. @@ -91,4 +91,4 @@ with sql.connect(server_hostname=host, http_path=http_path, access_token=access_ To minimise network roundtrips for small queries, Databricks will eagerly return a query result if the query completes within five seconds. This means that `execute_async()` may take up to five seconds to return control back to your calling code. When this happens, `AsyncExecution.is_available` will evaluate `False` and the query result will have already been cached in this `AsyncExecution` object. Calling `get_results()` will not invoke a network round-trip. -Queries that execute in this fashion cannot be picked up with `get_async_execution()` and their results are not persisted on the server to be fetched by a separate thread. Therefore, before calling `.serialize()` to persist a `query_id:query_secret` pair, you should check if `AsyncExecution.available == True` first. \ No newline at end of file +Queries that execute in this fashion cannot be picked up with `get_async_execution()` and their results are not persisted on the server to be fetched by a separate thread. Therefore, before calling `.serialize()` to persist a `query_id:query_secret` pair, you should check if `AsyncExecution.is_available == True` first. \ No newline at end of file From d1f153d69eacbc6797e485b646f136935331810b Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Mon, 22 Jan 2024 16:55:44 -0500 Subject: [PATCH 5/6] Update docs about direct results queries Signed-off-by: Jesse Whitehouse --- docs/execute_async.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/execute_async.md b/docs/execute_async.md index de5d7af2..a7fc4c2a 100644 --- a/docs/execute_async.md +++ b/docs/execute_async.md @@ -19,7 +19,7 @@ To run a query asynchronously, use `databricks.sql.client.Connection.execute_asy - `AsyncExecution.status` is the last-known status of this query run, expressed as an `AsyncExecutionStatus` enumeration value. For example, `RUNNING`, `FINISHED`, or `CANCELED`. When you first call `execute_async()` the resulting status will usually be `RUNNING`. Calling `sync_status()` will refresh the value of this property. In most usages, you do not need to access `.status` directly and can instead use `.is_running`, `.is_canceled`, and `.is_finished` instead. - `AsyncExecution.query_id` is the `UUID` for this query run. You can use this to look-up the query in the Databricks SQL query history. - `AsyncExecution.query_secret` is the `UUID` secret for this query run. Both the `query_id` and `secret` are needed to fetch results for a running query. -- `AsyncExecution.is_available` is a boolean that indicates whether this execution can be picked up from another `Connection` or thread. See [below](#note-about-direct-result-queries) for more information. +- `AsyncExecution.returned_as_direct_result` is a boolean that indicates whether this returned a direct result. See [below](#note-about-direct-result-queries) for more information. **Methods** - `AsyncExecution.sync_status()` performs a network round-trip to synchronize `.status`. @@ -89,6 +89,6 @@ with sql.connect(server_hostname=host, http_path=http_path, access_token=access_ # Note about direct result queries -To minimise network roundtrips for small queries, Databricks will eagerly return a query result if the query completes within five seconds. This means that `execute_async()` may take up to five seconds to return control back to your calling code. When this happens, `AsyncExecution.is_available` will evaluate `False` and the query result will have already been cached in this `AsyncExecution` object. Calling `get_results()` will not invoke a network round-trip. +To minimise network roundtrips for small queries, Databricks will eagerly return a query result if the query completes within five seconds and its results can be sent in a single response. This means that `execute_async()` may take up to five seconds to return control back to your calling code. When this happens, `AsyncExecution.returned_as_direct_result` will evaluate `True` and the query result will have already been cached in this `AsyncExecution` object. Calling `get_results()` will not invoke a network round-trip because the query will not be available at the server. -Queries that execute in this fashion cannot be picked up with `get_async_execution()` and their results are not persisted on the server to be fetched by a separate thread. Therefore, before calling `.serialize()` to persist a `query_id:query_secret` pair, you should check if `AsyncExecution.is_available == True` first. \ No newline at end of file +Queries that execute in this fashion cannot be picked up with `get_async_execution()` and their results are not persisted on the server to be fetched by a separate thread. Therefore, before calling `.serialize()` to persist a `query_id:query_secret` pair, you should check if `AsyncExecution.returned_as_direct_result == True` first. \ No newline at end of file From fadb3b196f2ecf0561303474473d28f1fb206393 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Mon, 22 Jan 2024 16:58:55 -0500 Subject: [PATCH 6/6] Update sample code. Signed-off-by: Jesse Whitehouse --- docs/execute_async.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/execute_async.md b/docs/execute_async.md index a7fc4c2a..f5f88af4 100644 --- a/docs/execute_async.md +++ b/docs/execute_async.md @@ -71,8 +71,11 @@ http_path = os.getenv("DATABRICKS_HTTP_PATH") access_token = os.getenv("DATABRICKS_TOKEN") with sql.connect(server_hostname=host, http_path=http_path, access_token=access_token) as connection: - query = connection.execute_async("SELECT :param `p`, * FROM RANGE(10" {"param": "foo"}) - query_id_and_secret = query.serialize() + query = connection.execute_async("SELECT :param `p`, * FROM RANGE(1000000000)" {"param": "foo"}) + + # see note in this document about queries that return direct results + if query.returned_as_direct_result: + assert False, "Queries that return direct results cannot be picked up with get_async_execution()" # The connection created above has now closed