Skip to content

Commit dbb2ec5

Browse files
Merge remote-tracking branch 'upstream/sea-migration' into decouple-session
2 parents 7192f11 + 48746d1 commit dbb2ec5

File tree

5 files changed

+265
-7
lines changed

5 files changed

+265
-7
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
# the repo. Unless a later match takes precedence, these
33
# users will be requested for review when someone opens a
44
# pull request.
5-
* @deeksha-db @samikshya-db @jprakash-db @yunbodeng-db @jackyhu-db @benc-db
5+
* @deeksha-db @samikshya-db @jprakash-db @jackyhu-db @madhav-db @gopalldb @jayantsing-db @vikrantpuppala @shivam2680

src/databricks/sql/client.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,12 @@ def read(self) -> Optional[OAuthToken]:
215215
# use_cloud_fetch
216216
# Enable use of cloud fetch to extract large query results in parallel via cloud storage
217217

218+
logger.debug(
219+
"Connection.__init__(server_hostname=%s, http_path=%s)",
220+
server_hostname,
221+
http_path,
222+
)
223+
218224
if access_token:
219225
access_token_kv = {"access_token": access_token}
220226
kwargs = {**kwargs, **access_token_kv}
@@ -277,7 +283,13 @@ def __enter__(self) -> "Connection":
277283
return self
278284

279285
def __exit__(self, exc_type, exc_value, traceback):
280-
self.close()
286+
try:
287+
self.close()
288+
except BaseException as e:
289+
logger.warning(f"Exception during connection close in __exit__: {e}")
290+
if exc_type is None:
291+
raise
292+
return False
281293

282294
def __del__(self):
283295
if self.open:
@@ -400,7 +412,14 @@ def __enter__(self) -> "Cursor":
400412
return self
401413

402414
def __exit__(self, exc_type, exc_value, traceback):
403-
self.close()
415+
try:
416+
logger.debug("Cursor context manager exiting, calling close()")
417+
self.close()
418+
except BaseException as e:
419+
logger.warning(f"Exception during cursor close in __exit__: {e}")
420+
if exc_type is None:
421+
raise
422+
return False
404423

405424
def __iter__(self):
406425
if self.active_result_set:
@@ -731,6 +750,9 @@ def execute(
731750
732751
:returns self
733752
"""
753+
logger.debug(
754+
"Cursor.execute(operation=%s, parameters=%s)", operation, parameters
755+
)
734756

735757
param_approach = self._determine_parameter_approach(parameters)
736758
if param_approach == ParameterApproach.NONE:
@@ -1107,7 +1129,21 @@ def cancel(self) -> None:
11071129
def close(self) -> None:
11081130
"""Close cursor"""
11091131
self.open = False
1110-
self.active_op_handle = None
1132+
1133+
# Close active operation handle if it exists
1134+
if self.active_op_handle:
1135+
try:
1136+
self.thrift_backend.close_command(self.active_op_handle)
1137+
except RequestError as e:
1138+
if isinstance(e.args[1], CursorAlreadyClosedError):
1139+
logger.info("Operation was canceled by a prior request")
1140+
else:
1141+
logging.warning(f"Error closing operation handle: {e}")
1142+
except Exception as e:
1143+
logging.warning(f"Error closing operation handle: {e}")
1144+
finally:
1145+
self.active_op_handle = None
1146+
11111147
if self.active_result_set:
11121148
self._close_and_clear_active_result_set()
11131149

src/databricks/sql/thrift_backend.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ def __init__(
131131
# max_download_threads
132132
# Number of threads for handling cloud fetch downloads. Defaults to 10
133133

134+
logger.debug(
135+
"ThriftBackend.__init__(server_hostname=%s, port=%s, http_path=%s)",
136+
server_hostname,
137+
port,
138+
http_path,
139+
)
140+
134141
port = port or 443
135142
if kwargs.get("_connection_uri"):
136143
uri = kwargs.get("_connection_uri")
@@ -390,6 +397,8 @@ def attempt_request(attempt):
390397

391398
# TODO: don't use exception handling for GOS polling...
392399

400+
logger.error("ThriftBackend.attempt_request: HTTPError: %s", err)
401+
393402
gos_name = TCLIServiceClient.GetOperationStatus.__name__
394403
if method.__name__ == gos_name:
395404
delay_default = (
@@ -434,6 +443,7 @@ def attempt_request(attempt):
434443
else:
435444
logger.warning(log_string)
436445
except Exception as err:
446+
logger.error("ThriftBackend.attempt_request: Exception: %s", err)
437447
error = err
438448
retry_delay = extract_retry_delay(attempt)
439449
error_message = ThriftBackend._extract_error_message_from_headers(
@@ -888,6 +898,12 @@ def execute_command(
888898
):
889899
assert session_handle is not None
890900

901+
logger.debug(
902+
"ThriftBackend.execute_command(operation=%s, session_handle=%s)",
903+
operation,
904+
session_handle,
905+
)
906+
891907
spark_arrow_types = ttypes.TSparkArrowTypes(
892908
timestampAsArrow=self._use_arrow_native_timestamps,
893909
decimalAsArrow=self._use_arrow_native_decimals,
@@ -1074,6 +1090,7 @@ def fetch_results(
10741090
return queue, resp.hasMoreRows
10751091

10761092
def close_command(self, op_handle):
1093+
logger.debug("ThriftBackend.close_command(op_handle=%s)", op_handle)
10771094
req = ttypes.TCloseOperationReq(operationHandle=op_handle)
10781095
resp = self.make_request(self._client.CloseOperation, req)
10791096
return resp.status

tests/e2e/test_driver.py

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin
5252

53-
from databricks.sql.exc import SessionAlreadyClosedError
53+
from databricks.sql.exc import SessionAlreadyClosedError, CursorAlreadyClosedError
5454

5555
log = logging.getLogger(__name__)
5656

@@ -820,7 +820,6 @@ def test_close_connection_closes_cursors(self):
820820
ars = cursor.active_result_set
821821

822822
# We must manually run this check because thrift_backend always forces `has_been_closed_server_side` to True
823-
824823
# Cursor op state should be open before connection is closed
825824
status_request = ttypes.TGetOperationStatusReq(
826825
operationHandle=ars.command_id, getProgressUpdate=False
@@ -847,9 +846,104 @@ def test_closing_a_closed_connection_doesnt_fail(self, caplog):
847846
with self.connection() as conn:
848847
# First .close() call is explicit here
849848
conn.close()
850-
851849
assert "Session appears to have been closed already" in caplog.text
852850

851+
conn = None
852+
try:
853+
with pytest.raises(KeyboardInterrupt):
854+
with self.connection() as c:
855+
conn = c
856+
raise KeyboardInterrupt("Simulated interrupt")
857+
finally:
858+
if conn is not None:
859+
assert not conn.open, "Connection should be closed after KeyboardInterrupt"
860+
861+
def test_cursor_close_properly_closes_operation(self):
862+
"""Test that Cursor.close() properly closes the active operation handle on the server."""
863+
with self.connection() as conn:
864+
cursor = conn.cursor()
865+
try:
866+
cursor.execute("SELECT 1 AS test")
867+
assert cursor.active_op_handle is not None
868+
cursor.close()
869+
assert cursor.active_op_handle is None
870+
assert not cursor.open
871+
finally:
872+
if cursor.open:
873+
cursor.close()
874+
875+
conn = None
876+
cursor = None
877+
try:
878+
with self.connection() as c:
879+
conn = c
880+
with pytest.raises(KeyboardInterrupt):
881+
with conn.cursor() as cur:
882+
cursor = cur
883+
raise KeyboardInterrupt("Simulated interrupt")
884+
finally:
885+
if cursor is not None:
886+
assert not cursor.open, "Cursor should be closed after KeyboardInterrupt"
887+
888+
def test_nested_cursor_context_managers(self):
889+
"""Test that nested cursor context managers properly close operations on the server."""
890+
with self.connection() as conn:
891+
with conn.cursor() as cursor1:
892+
cursor1.execute("SELECT 1 AS test1")
893+
assert cursor1.active_op_handle is not None
894+
895+
with conn.cursor() as cursor2:
896+
cursor2.execute("SELECT 2 AS test2")
897+
assert cursor2.active_op_handle is not None
898+
899+
# After inner context manager exit, cursor2 should be not open
900+
assert not cursor2.open
901+
assert cursor2.active_op_handle is None
902+
903+
# After outer context manager exit, cursor1 should be not open
904+
assert not cursor1.open
905+
assert cursor1.active_op_handle is None
906+
907+
def test_cursor_error_handling(self):
908+
"""Test that cursor close handles errors properly to prevent orphaned operations."""
909+
with self.connection() as conn:
910+
cursor = conn.cursor()
911+
912+
cursor.execute("SELECT 1 AS test")
913+
914+
op_handle = cursor.active_op_handle
915+
916+
assert op_handle is not None
917+
918+
# Manually close the operation to simulate server-side closure
919+
conn.thrift_backend.close_command(op_handle)
920+
921+
cursor.close()
922+
923+
assert not cursor.open
924+
925+
def test_result_set_close(self):
926+
"""Test that ResultSet.close() properly closes operations on the server and handles state correctly."""
927+
with self.connection() as conn:
928+
cursor = conn.cursor()
929+
try:
930+
cursor.execute("SELECT * FROM RANGE(10)")
931+
932+
result_set = cursor.active_result_set
933+
assert result_set is not None
934+
935+
initial_op_state = result_set.op_state
936+
937+
result_set.close()
938+
939+
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
940+
assert result_set.op_state != initial_op_state
941+
942+
# Closing the result set again should be a no-op and not raise exceptions
943+
result_set.close()
944+
finally:
945+
cursor.close()
946+
853947

854948
# use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
855949
# the 429/503 subsuites separate since they execute under different circumstances.

tests/unit/test_client.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import databricks.sql
2121
import databricks.sql.client as client
2222
from databricks.sql import InterfaceError, DatabaseError, Error, NotSupportedError
23+
from databricks.sql.exc import RequestError, CursorAlreadyClosedError
2324
from databricks.sql.types import Row
2425

2526
from tests.unit.test_fetches import FetchTests
@@ -522,6 +523,116 @@ def test_access_current_query_id(self):
522523
cursor.close()
523524
self.assertIsNone(cursor.query_id)
524525

526+
def test_cursor_close_handles_exception(self):
527+
"""Test that Cursor.close() handles exceptions from close_command properly."""
528+
mock_backend = Mock()
529+
mock_connection = Mock()
530+
mock_op_handle = Mock()
531+
532+
mock_backend.close_command.side_effect = Exception("Test error")
533+
534+
cursor = client.Cursor(mock_connection, mock_backend)
535+
cursor.active_op_handle = mock_op_handle
536+
537+
cursor.close()
538+
539+
mock_backend.close_command.assert_called_once_with(mock_op_handle)
540+
541+
self.assertIsNone(cursor.active_op_handle)
542+
543+
self.assertFalse(cursor.open)
544+
545+
def test_cursor_context_manager_handles_exit_exception(self):
546+
"""Test that cursor's context manager handles exceptions during __exit__."""
547+
mock_backend = Mock()
548+
mock_connection = Mock()
549+
550+
cursor = client.Cursor(mock_connection, mock_backend)
551+
original_close = cursor.close
552+
cursor.close = Mock(side_effect=Exception("Test error during close"))
553+
554+
try:
555+
with cursor:
556+
raise ValueError("Test error inside context")
557+
except ValueError:
558+
pass
559+
560+
cursor.close.assert_called_once()
561+
562+
def test_connection_close_handles_cursor_close_exception(self):
563+
"""Test that _close handles exceptions from cursor.close() properly."""
564+
cursors_closed = []
565+
566+
def mock_close_with_exception():
567+
cursors_closed.append(1)
568+
raise Exception("Test error during close")
569+
570+
cursor1 = Mock()
571+
cursor1.close = mock_close_with_exception
572+
573+
def mock_close_normal():
574+
cursors_closed.append(2)
575+
576+
cursor2 = Mock()
577+
cursor2.close = mock_close_normal
578+
579+
mock_backend = Mock()
580+
mock_session_handle = Mock()
581+
582+
try:
583+
for cursor in [cursor1, cursor2]:
584+
try:
585+
cursor.close()
586+
except Exception:
587+
pass
588+
589+
mock_backend.close_session(mock_session_handle)
590+
except Exception as e:
591+
self.fail(f"Connection close should handle exceptions: {e}")
592+
593+
self.assertEqual(cursors_closed, [1, 2], "Both cursors should have close called")
594+
595+
def test_resultset_close_handles_cursor_already_closed_error(self):
596+
"""Test that ResultSet.close() handles CursorAlreadyClosedError properly."""
597+
result_set = client.ResultSet.__new__(client.ResultSet)
598+
result_set.thrift_backend = Mock()
599+
result_set.thrift_backend.CLOSED_OP_STATE = 'CLOSED'
600+
result_set.connection = Mock()
601+
result_set.connection.open = True
602+
result_set.op_state = 'RUNNING'
603+
result_set.has_been_closed_server_side = False
604+
result_set.command_id = Mock()
605+
606+
class MockRequestError(Exception):
607+
def __init__(self):
608+
self.args = ["Error message", CursorAlreadyClosedError()]
609+
610+
result_set.thrift_backend.close_command.side_effect = MockRequestError()
611+
612+
original_close = client.ResultSet.close
613+
try:
614+
try:
615+
if (
616+
result_set.op_state != result_set.thrift_backend.CLOSED_OP_STATE
617+
and not result_set.has_been_closed_server_side
618+
and result_set.connection.open
619+
):
620+
result_set.thrift_backend.close_command(result_set.command_id)
621+
except MockRequestError as e:
622+
if isinstance(e.args[1], CursorAlreadyClosedError):
623+
pass
624+
finally:
625+
result_set.has_been_closed_server_side = True
626+
result_set.op_state = result_set.thrift_backend.CLOSED_OP_STATE
627+
628+
result_set.thrift_backend.close_command.assert_called_once_with(result_set.command_id)
629+
630+
assert result_set.has_been_closed_server_side is True
631+
632+
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
633+
finally:
634+
pass
635+
525636

526637
if __name__ == "__main__":
527638
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])

0 commit comments

Comments
 (0)