Skip to content

Commit c7830db

Browse files
Merge remote-tracking branch 'origin/sea-migration' into fetch-interface
2 parents f94adef + eaa542d commit c7830db

File tree

7 files changed

+274
-7
lines changed

7 files changed

+274
-7
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
# the repo. Unless a later match takes precedence, these
33
# users will be requested for review when someone opens a
44
# pull request.
5-
* @deeksha-db @samikshya-db @jprakash-db @yunbodeng-db @jackyhu-db @benc-db
5+
* @deeksha-db @samikshya-db @jprakash-db @jackyhu-db @madhav-db @gopalldb @jayantsing-db @vikrantpuppala @shivam2680

.github/workflows/code-quality-checks.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ on:
33
push:
44
branches:
55
- main
6+
- sea-migration
7+
- telemetry
68
pull_request:
79
branches:
810
- main
11+
- sea-migration
12+
- telemetry
913
jobs:
1014
run-unit-tests:
1115
runs-on: ubuntu-latest

.github/workflows/integration.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ on:
44
paths-ignore:
55
- "**.MD"
66
- "**.md"
7+
pull_request:
8+
branches:
9+
- main
10+
- sea-migration
11+
- telemetry
712

813
jobs:
914
run-e2e-tests:

src/databricks/sql/backend/thrift_backend.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ def __init__(
133133
# max_download_threads
134134
# Number of threads for handling cloud fetch downloads. Defaults to 10
135135

136+
logger.debug(
137+
"ThriftBackend.__init__(server_hostname=%s, port=%s, http_path=%s)",
138+
server_hostname,
139+
port,
140+
http_path,
141+
)
142+
136143
port = port or 443
137144
if kwargs.get("_connection_uri"):
138145
uri = kwargs.get("_connection_uri")
@@ -404,6 +411,8 @@ def attempt_request(attempt):
404411

405412
# TODO: don't use exception handling for GOS polling...
406413

414+
logger.error("ThriftBackend.attempt_request: HTTPError: %s", err)
415+
407416
gos_name = TCLIServiceClient.GetOperationStatus.__name__
408417
if method.__name__ == gos_name:
409418
delay_default = (
@@ -448,6 +457,7 @@ def attempt_request(attempt):
448457
else:
449458
logger.warning(log_string)
450459
except Exception as err:
460+
logger.error("ThriftBackend.attempt_request: Exception: %s", err)
451461
error = err
452462
retry_delay = extract_retry_delay(attempt)
453463
error_message = (
@@ -914,6 +924,12 @@ def execute_command(
914924
):
915925
assert session_handle is not None
916926

927+
logger.debug(
928+
"ThriftBackend.execute_command(operation=%s, session_handle=%s)",
929+
operation,
930+
session_handle,
931+
)
932+
917933
spark_arrow_types = ttypes.TSparkArrowTypes(
918934
timestampAsArrow=self._use_arrow_native_timestamps,
919935
decimalAsArrow=self._use_arrow_native_decimals,
@@ -1150,6 +1166,7 @@ def fetch_results(
11501166
return queue, resp.hasMoreRows
11511167

11521168
def close_command(self, op_handle):
1169+
logger.debug("ThriftBackend.close_command(op_handle=%s)", op_handle)
11531170
req = ttypes.TCloseOperationReq(operationHandle=op_handle)
11541171
resp = self.make_request(self._client.CloseOperation, req)
11551172
return resp.status

src/databricks/sql/client.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,12 @@ def read(self) -> Optional[OAuthToken]:
216216
# use_cloud_fetch
217217
# Enable use of cloud fetch to extract large query results in parallel via cloud storage
218218

219+
logger.debug(
220+
"Connection.__init__(server_hostname=%s, http_path=%s)",
221+
server_hostname,
222+
http_path,
223+
)
224+
219225
if access_token:
220226
access_token_kv = {"access_token": access_token}
221227
kwargs = {**kwargs, **access_token_kv}
@@ -278,7 +284,13 @@ def __enter__(self) -> "Connection":
278284
return self
279285

280286
def __exit__(self, exc_type, exc_value, traceback):
281-
self.close()
287+
try:
288+
self.close()
289+
except BaseException as e:
290+
logger.warning(f"Exception during connection close in __exit__: {e}")
291+
if exc_type is None:
292+
raise
293+
return False
282294

283295
def __del__(self):
284296
if self.open:
@@ -401,7 +413,14 @@ def __enter__(self) -> "Cursor":
401413
return self
402414

403415
def __exit__(self, exc_type, exc_value, traceback):
404-
self.close()
416+
try:
417+
logger.debug("Cursor context manager exiting, calling close()")
418+
self.close()
419+
except BaseException as e:
420+
logger.warning(f"Exception during cursor close in __exit__: {e}")
421+
if exc_type is None:
422+
raise
423+
return False
405424

406425
def __iter__(self):
407426
if self.active_result_set:
@@ -732,6 +751,9 @@ def execute(
732751
733752
:returns self
734753
"""
754+
logger.debug(
755+
"Cursor.execute(operation=%s, parameters=%s)", operation, parameters
756+
)
735757

736758
param_approach = self._determine_parameter_approach(parameters)
737759
if param_approach == ParameterApproach.NONE:
@@ -1066,7 +1088,21 @@ def cancel(self) -> None:
10661088
def close(self) -> None:
10671089
"""Close cursor"""
10681090
self.open = False
1069-
self.active_op_handle = None
1091+
1092+
# Close active operation handle if it exists
1093+
if self.active_op_handle:
1094+
try:
1095+
self.thrift_backend.close_command(self.active_op_handle)
1096+
except RequestError as e:
1097+
if isinstance(e.args[1], CursorAlreadyClosedError):
1098+
logger.info("Operation was canceled by a prior request")
1099+
else:
1100+
logging.warning(f"Error closing operation handle: {e}")
1101+
except Exception as e:
1102+
logging.warning(f"Error closing operation handle: {e}")
1103+
finally:
1104+
self.active_op_handle = None
1105+
10701106
if self.active_result_set:
10711107
self._close_and_clear_active_result_set()
10721108

tests/e2e/test_driver.py

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin
5252

53-
from databricks.sql.exc import SessionAlreadyClosedError
53+
from databricks.sql.exc import SessionAlreadyClosedError, CursorAlreadyClosedError
5454

5555
log = logging.getLogger(__name__)
5656

@@ -820,7 +820,6 @@ def test_close_connection_closes_cursors(self):
820820
ars = cursor.active_result_set
821821

822822
# We must manually run this check because thrift_backend always forces `has_been_closed_server_side` to True
823-
824823
# Cursor op state should be open before connection is closed
825824
status_request = ttypes.TGetOperationStatusReq(
826825
operationHandle=ars.command_id, getProgressUpdate=False
@@ -847,9 +846,104 @@ def test_closing_a_closed_connection_doesnt_fail(self, caplog):
847846
with self.connection() as conn:
848847
# First .close() call is explicit here
849848
conn.close()
850-
851849
assert "Session appears to have been closed already" in caplog.text
852850

851+
conn = None
852+
try:
853+
with pytest.raises(KeyboardInterrupt):
854+
with self.connection() as c:
855+
conn = c
856+
raise KeyboardInterrupt("Simulated interrupt")
857+
finally:
858+
if conn is not None:
859+
assert not conn.open, "Connection should be closed after KeyboardInterrupt"
860+
861+
def test_cursor_close_properly_closes_operation(self):
862+
"""Test that Cursor.close() properly closes the active operation handle on the server."""
863+
with self.connection() as conn:
864+
cursor = conn.cursor()
865+
try:
866+
cursor.execute("SELECT 1 AS test")
867+
assert cursor.active_op_handle is not None
868+
cursor.close()
869+
assert cursor.active_op_handle is None
870+
assert not cursor.open
871+
finally:
872+
if cursor.open:
873+
cursor.close()
874+
875+
conn = None
876+
cursor = None
877+
try:
878+
with self.connection() as c:
879+
conn = c
880+
with pytest.raises(KeyboardInterrupt):
881+
with conn.cursor() as cur:
882+
cursor = cur
883+
raise KeyboardInterrupt("Simulated interrupt")
884+
finally:
885+
if cursor is not None:
886+
assert not cursor.open, "Cursor should be closed after KeyboardInterrupt"
887+
888+
def test_nested_cursor_context_managers(self):
889+
"""Test that nested cursor context managers properly close operations on the server."""
890+
with self.connection() as conn:
891+
with conn.cursor() as cursor1:
892+
cursor1.execute("SELECT 1 AS test1")
893+
assert cursor1.active_op_handle is not None
894+
895+
with conn.cursor() as cursor2:
896+
cursor2.execute("SELECT 2 AS test2")
897+
assert cursor2.active_op_handle is not None
898+
899+
# After inner context manager exit, cursor2 should be not open
900+
assert not cursor2.open
901+
assert cursor2.active_op_handle is None
902+
903+
# After outer context manager exit, cursor1 should be not open
904+
assert not cursor1.open
905+
assert cursor1.active_op_handle is None
906+
907+
def test_cursor_error_handling(self):
908+
"""Test that cursor close handles errors properly to prevent orphaned operations."""
909+
with self.connection() as conn:
910+
cursor = conn.cursor()
911+
912+
cursor.execute("SELECT 1 AS test")
913+
914+
op_handle = cursor.active_op_handle
915+
916+
assert op_handle is not None
917+
918+
# Manually close the operation to simulate server-side closure
919+
conn.thrift_backend.close_command(op_handle)
920+
921+
cursor.close()
922+
923+
assert not cursor.open
924+
925+
def test_result_set_close(self):
926+
"""Test that ResultSet.close() properly closes operations on the server and handles state correctly."""
927+
with self.connection() as conn:
928+
cursor = conn.cursor()
929+
try:
930+
cursor.execute("SELECT * FROM RANGE(10)")
931+
932+
result_set = cursor.active_result_set
933+
assert result_set is not None
934+
935+
initial_op_state = result_set.op_state
936+
937+
result_set.close()
938+
939+
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
940+
assert result_set.op_state != initial_op_state
941+
942+
# Closing the result set again should be a no-op and not raise exceptions
943+
result_set.close()
944+
finally:
945+
cursor.close()
946+
853947

854948
# use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
855949
# the 429/503 subsuites separate since they execute under different circumstances.

0 commit comments

Comments
 (0)