Skip to content

Commit 36525c3

Browse files
Merge remote-tracking branch 'origin/sea-migration' into backend-interface
2 parents e84b09f + eaa542d commit 36525c3

File tree

7 files changed

+274
-7
lines changed

7 files changed

+274
-7
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
# the repo. Unless a later match takes precedence, these
33
# users will be requested for review when someone opens a
44
# pull request.
5-
* @deeksha-db @samikshya-db @jprakash-db @yunbodeng-db @jackyhu-db @benc-db
5+
* @deeksha-db @samikshya-db @jprakash-db @jackyhu-db @madhav-db @gopalldb @jayantsing-db @vikrantpuppala @shivam2680

.github/workflows/code-quality-checks.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ on:
33
push:
44
branches:
55
- main
6+
- sea-migration
7+
- telemetry
68
pull_request:
79
branches:
810
- main
11+
- sea-migration
12+
- telemetry
913
jobs:
1014
run-unit-tests:
1115
runs-on: ubuntu-latest

.github/workflows/integration.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ on:
44
paths-ignore:
55
- "**.MD"
66
- "**.md"
7+
pull_request:
8+
branches:
9+
- main
10+
- sea-migration
11+
- telemetry
712

813
jobs:
914
run-e2e-tests:

src/databricks/sql/backend/thrift_backend.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@ def __init__(
132132
# max_download_threads
133133
# Number of threads for handling cloud fetch downloads. Defaults to 10
134134

135+
logger.debug(
136+
"ThriftBackend.__init__(server_hostname=%s, port=%s, http_path=%s)",
137+
server_hostname,
138+
port,
139+
http_path,
140+
)
141+
135142
port = port or 443
136143
if kwargs.get("_connection_uri"):
137144
uri = kwargs.get("_connection_uri")
@@ -403,6 +410,8 @@ def attempt_request(attempt):
403410

404411
# TODO: don't use exception handling for GOS polling...
405412

413+
logger.error("ThriftBackend.attempt_request: HTTPError: %s", err)
414+
406415
gos_name = TCLIServiceClient.GetOperationStatus.__name__
407416
if method.__name__ == gos_name:
408417
delay_default = (
@@ -447,6 +456,7 @@ def attempt_request(attempt):
447456
else:
448457
logger.warning(log_string)
449458
except Exception as err:
459+
logger.error("ThriftBackend.attempt_request: Exception: %s", err)
450460
error = err
451461
retry_delay = extract_retry_delay(attempt)
452462
error_message = (
@@ -904,6 +914,12 @@ def execute_command(
904914
):
905915
assert session_handle is not None
906916

917+
logger.debug(
918+
"ThriftBackend.execute_command(operation=%s, session_handle=%s)",
919+
operation,
920+
session_handle,
921+
)
922+
907923
spark_arrow_types = ttypes.TSparkArrowTypes(
908924
timestampAsArrow=self._use_arrow_native_timestamps,
909925
decimalAsArrow=self._use_arrow_native_decimals,
@@ -1090,6 +1106,7 @@ def fetch_results(
10901106
return queue, resp.hasMoreRows
10911107

10921108
def close_command(self, op_handle):
1109+
logger.debug("ThriftBackend.close_command(op_handle=%s)", op_handle)
10931110
req = ttypes.TCloseOperationReq(operationHandle=op_handle)
10941111
resp = self.make_request(self._client.CloseOperation, req)
10951112
return resp.status

src/databricks/sql/client.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,12 @@ def read(self) -> Optional[OAuthToken]:
216216
# use_cloud_fetch
217217
# Enable use of cloud fetch to extract large query results in parallel via cloud storage
218218

219+
logger.debug(
220+
"Connection.__init__(server_hostname=%s, http_path=%s)",
221+
server_hostname,
222+
http_path,
223+
)
224+
219225
if access_token:
220226
access_token_kv = {"access_token": access_token}
221227
kwargs = {**kwargs, **access_token_kv}
@@ -278,7 +284,13 @@ def __enter__(self) -> "Connection":
278284
return self
279285

280286
def __exit__(self, exc_type, exc_value, traceback):
281-
self.close()
287+
try:
288+
self.close()
289+
except BaseException as e:
290+
logger.warning(f"Exception during connection close in __exit__: {e}")
291+
if exc_type is None:
292+
raise
293+
return False
282294

283295
def __del__(self):
284296
if self.open:
@@ -401,7 +413,14 @@ def __enter__(self) -> "Cursor":
401413
return self
402414

403415
def __exit__(self, exc_type, exc_value, traceback):
404-
self.close()
416+
try:
417+
logger.debug("Cursor context manager exiting, calling close()")
418+
self.close()
419+
except BaseException as e:
420+
logger.warning(f"Exception during cursor close in __exit__: {e}")
421+
if exc_type is None:
422+
raise
423+
return False
405424

406425
def __iter__(self):
407426
if self.active_result_set:
@@ -732,6 +751,9 @@ def execute(
732751
733752
:returns self
734753
"""
754+
logger.debug(
755+
"Cursor.execute(operation=%s, parameters=%s)", operation, parameters
756+
)
735757

736758
param_approach = self._determine_parameter_approach(parameters)
737759
if param_approach == ParameterApproach.NONE:
@@ -1112,7 +1134,21 @@ def cancel(self) -> None:
11121134
def close(self) -> None:
11131135
"""Close cursor"""
11141136
self.open = False
1115-
self.active_op_handle = None
1137+
1138+
# Close active operation handle if it exists
1139+
if self.active_op_handle:
1140+
try:
1141+
self.thrift_backend.close_command(self.active_op_handle)
1142+
except RequestError as e:
1143+
if isinstance(e.args[1], CursorAlreadyClosedError):
1144+
logger.info("Operation was canceled by a prior request")
1145+
else:
1146+
logging.warning(f"Error closing operation handle: {e}")
1147+
except Exception as e:
1148+
logging.warning(f"Error closing operation handle: {e}")
1149+
finally:
1150+
self.active_op_handle = None
1151+
11161152
if self.active_result_set:
11171153
self._close_and_clear_active_result_set()
11181154

tests/e2e/test_driver.py

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin
5252

53-
from databricks.sql.exc import SessionAlreadyClosedError
53+
from databricks.sql.exc import SessionAlreadyClosedError, CursorAlreadyClosedError
5454

5555
log = logging.getLogger(__name__)
5656

@@ -820,7 +820,6 @@ def test_close_connection_closes_cursors(self):
820820
ars = cursor.active_result_set
821821

822822
# We must manually run this check because thrift_backend always forces `has_been_closed_server_side` to True
823-
824823
# Cursor op state should be open before connection is closed
825824
status_request = ttypes.TGetOperationStatusReq(
826825
operationHandle=ars.command_id, getProgressUpdate=False
@@ -847,9 +846,104 @@ def test_closing_a_closed_connection_doesnt_fail(self, caplog):
847846
with self.connection() as conn:
848847
# First .close() call is explicit here
849848
conn.close()
850-
851849
assert "Session appears to have been closed already" in caplog.text
852850

851+
conn = None
852+
try:
853+
with pytest.raises(KeyboardInterrupt):
854+
with self.connection() as c:
855+
conn = c
856+
raise KeyboardInterrupt("Simulated interrupt")
857+
finally:
858+
if conn is not None:
859+
assert not conn.open, "Connection should be closed after KeyboardInterrupt"
860+
861+
def test_cursor_close_properly_closes_operation(self):
862+
"""Test that Cursor.close() properly closes the active operation handle on the server."""
863+
with self.connection() as conn:
864+
cursor = conn.cursor()
865+
try:
866+
cursor.execute("SELECT 1 AS test")
867+
assert cursor.active_op_handle is not None
868+
cursor.close()
869+
assert cursor.active_op_handle is None
870+
assert not cursor.open
871+
finally:
872+
if cursor.open:
873+
cursor.close()
874+
875+
conn = None
876+
cursor = None
877+
try:
878+
with self.connection() as c:
879+
conn = c
880+
with pytest.raises(KeyboardInterrupt):
881+
with conn.cursor() as cur:
882+
cursor = cur
883+
raise KeyboardInterrupt("Simulated interrupt")
884+
finally:
885+
if cursor is not None:
886+
assert not cursor.open, "Cursor should be closed after KeyboardInterrupt"
887+
888+
def test_nested_cursor_context_managers(self):
889+
"""Test that nested cursor context managers properly close operations on the server."""
890+
with self.connection() as conn:
891+
with conn.cursor() as cursor1:
892+
cursor1.execute("SELECT 1 AS test1")
893+
assert cursor1.active_op_handle is not None
894+
895+
with conn.cursor() as cursor2:
896+
cursor2.execute("SELECT 2 AS test2")
897+
assert cursor2.active_op_handle is not None
898+
899+
# After inner context manager exit, cursor2 should be not open
900+
assert not cursor2.open
901+
assert cursor2.active_op_handle is None
902+
903+
# After outer context manager exit, cursor1 should be not open
904+
assert not cursor1.open
905+
assert cursor1.active_op_handle is None
906+
907+
def test_cursor_error_handling(self):
908+
"""Test that cursor close handles errors properly to prevent orphaned operations."""
909+
with self.connection() as conn:
910+
cursor = conn.cursor()
911+
912+
cursor.execute("SELECT 1 AS test")
913+
914+
op_handle = cursor.active_op_handle
915+
916+
assert op_handle is not None
917+
918+
# Manually close the operation to simulate server-side closure
919+
conn.thrift_backend.close_command(op_handle)
920+
921+
cursor.close()
922+
923+
assert not cursor.open
924+
925+
def test_result_set_close(self):
926+
"""Test that ResultSet.close() properly closes operations on the server and handles state correctly."""
927+
with self.connection() as conn:
928+
cursor = conn.cursor()
929+
try:
930+
cursor.execute("SELECT * FROM RANGE(10)")
931+
932+
result_set = cursor.active_result_set
933+
assert result_set is not None
934+
935+
initial_op_state = result_set.op_state
936+
937+
result_set.close()
938+
939+
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
940+
assert result_set.op_state != initial_op_state
941+
942+
# Closing the result set again should be a no-op and not raise exceptions
943+
result_set.close()
944+
finally:
945+
cursor.close()
946+
853947

854948
# use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
855949
# the 429/503 subsuites separate since they execute under different circumstances.

0 commit comments

Comments
 (0)