Skip to content

Commit 9caa5cc

Browse files
madhav-dbvarun-edachali-dbx
authored andcommitted
Enhance Cursor close handling and context manager exception management to prevent server side resource leaks (databricks#554)
* Enhance Cursor close handling and context manager exception management * tests * fmt * Fix Cursor.close() to properly handle CursorAlreadyClosedError * Remove specific test message from Cursor.close() error handling * Improve error handling in connection and cursor context managers to ensure proper closure during exceptions, including KeyboardInterrupt. Add tests for nested cursor management and verify operation closure on server-side errors. * add * add Signed-off-by: varun-edachali-dbx <[email protected]>
1 parent 0758e32 commit 9caa5cc

File tree

3 files changed

+238
-6
lines changed

3 files changed

+238
-6
lines changed

src/databricks/sql/client.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,13 @@ def __enter__(self) -> "Connection":
277277
return self
278278

279279
def __exit__(self, exc_type, exc_value, traceback):
280-
self.close()
280+
try:
281+
self.close()
282+
except BaseException as e:
283+
logger.warning(f"Exception during connection close in __exit__: {e}")
284+
if exc_type is None:
285+
raise
286+
return False
281287

282288
def __del__(self):
283289
if self.open:
@@ -400,7 +406,14 @@ def __enter__(self) -> "Cursor":
400406
return self
401407

402408
def __exit__(self, exc_type, exc_value, traceback):
403-
self.close()
409+
try:
410+
logger.debug("Cursor context manager exiting, calling close()")
411+
self.close()
412+
except BaseException as e:
413+
logger.warning(f"Exception during cursor close in __exit__: {e}")
414+
if exc_type is None:
415+
raise
416+
return False
404417

405418
def __iter__(self):
406419
if self.active_result_set:
@@ -1107,7 +1120,21 @@ def cancel(self) -> None:
11071120
def close(self) -> None:
11081121
"""Close cursor"""
11091122
self.open = False
1110-
self.active_op_handle = None
1123+
1124+
# Close active operation handle if it exists
1125+
if self.active_op_handle:
1126+
try:
1127+
self.thrift_backend.close_command(self.active_op_handle)
1128+
except RequestError as e:
1129+
if isinstance(e.args[1], CursorAlreadyClosedError):
1130+
logger.info("Operation was canceled by a prior request")
1131+
else:
1132+
logging.warning(f"Error closing operation handle: {e}")
1133+
except Exception as e:
1134+
logging.warning(f"Error closing operation handle: {e}")
1135+
finally:
1136+
self.active_op_handle = None
1137+
11111138
if self.active_result_set:
11121139
self._close_and_clear_active_result_set()
11131140

tests/e2e/test_driver.py

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin
5252

53-
from databricks.sql.exc import SessionAlreadyClosedError
53+
from databricks.sql.exc import SessionAlreadyClosedError, CursorAlreadyClosedError
5454

5555
log = logging.getLogger(__name__)
5656

@@ -820,7 +820,6 @@ def test_close_connection_closes_cursors(self):
820820
ars = cursor.active_result_set
821821

822822
# We must manually run this check because thrift_backend always forces `has_been_closed_server_side` to True
823-
824823
# Cursor op state should be open before connection is closed
825824
status_request = ttypes.TGetOperationStatusReq(
826825
operationHandle=ars.command_id, getProgressUpdate=False
@@ -847,9 +846,104 @@ def test_closing_a_closed_connection_doesnt_fail(self, caplog):
847846
with self.connection() as conn:
848847
# First .close() call is explicit here
849848
conn.close()
850-
851849
assert "Session appears to have been closed already" in caplog.text
852850

851+
conn = None
852+
try:
853+
with pytest.raises(KeyboardInterrupt):
854+
with self.connection() as c:
855+
conn = c
856+
raise KeyboardInterrupt("Simulated interrupt")
857+
finally:
858+
if conn is not None:
859+
assert not conn.open, "Connection should be closed after KeyboardInterrupt"
860+
861+
def test_cursor_close_properly_closes_operation(self):
862+
"""Test that Cursor.close() properly closes the active operation handle on the server."""
863+
with self.connection() as conn:
864+
cursor = conn.cursor()
865+
try:
866+
cursor.execute("SELECT 1 AS test")
867+
assert cursor.active_op_handle is not None
868+
cursor.close()
869+
assert cursor.active_op_handle is None
870+
assert not cursor.open
871+
finally:
872+
if cursor.open:
873+
cursor.close()
874+
875+
conn = None
876+
cursor = None
877+
try:
878+
with self.connection() as c:
879+
conn = c
880+
with pytest.raises(KeyboardInterrupt):
881+
with conn.cursor() as cur:
882+
cursor = cur
883+
raise KeyboardInterrupt("Simulated interrupt")
884+
finally:
885+
if cursor is not None:
886+
assert not cursor.open, "Cursor should be closed after KeyboardInterrupt"
887+
888+
def test_nested_cursor_context_managers(self):
889+
"""Test that nested cursor context managers properly close operations on the server."""
890+
with self.connection() as conn:
891+
with conn.cursor() as cursor1:
892+
cursor1.execute("SELECT 1 AS test1")
893+
assert cursor1.active_op_handle is not None
894+
895+
with conn.cursor() as cursor2:
896+
cursor2.execute("SELECT 2 AS test2")
897+
assert cursor2.active_op_handle is not None
898+
899+
# After inner context manager exit, cursor2 should be not open
900+
assert not cursor2.open
901+
assert cursor2.active_op_handle is None
902+
903+
# After outer context manager exit, cursor1 should be not open
904+
assert not cursor1.open
905+
assert cursor1.active_op_handle is None
906+
907+
def test_cursor_error_handling(self):
908+
"""Test that cursor close handles errors properly to prevent orphaned operations."""
909+
with self.connection() as conn:
910+
cursor = conn.cursor()
911+
912+
cursor.execute("SELECT 1 AS test")
913+
914+
op_handle = cursor.active_op_handle
915+
916+
assert op_handle is not None
917+
918+
# Manually close the operation to simulate server-side closure
919+
conn.thrift_backend.close_command(op_handle)
920+
921+
cursor.close()
922+
923+
assert not cursor.open
924+
925+
def test_result_set_close(self):
926+
"""Test that ResultSet.close() properly closes operations on the server and handles state correctly."""
927+
with self.connection() as conn:
928+
cursor = conn.cursor()
929+
try:
930+
cursor.execute("SELECT * FROM RANGE(10)")
931+
932+
result_set = cursor.active_result_set
933+
assert result_set is not None
934+
935+
initial_op_state = result_set.op_state
936+
937+
result_set.close()
938+
939+
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
940+
assert result_set.op_state != initial_op_state
941+
942+
# Closing the result set again should be a no-op and not raise exceptions
943+
result_set.close()
944+
finally:
945+
cursor.close()
946+
853947

854948
# use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
855949
# the 429/503 subsuites separate since they execute under different circumstances.

tests/unit/test_client.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import databricks.sql
2121
import databricks.sql.client as client
2222
from databricks.sql import InterfaceError, DatabaseError, Error, NotSupportedError
23+
from databricks.sql.exc import RequestError, CursorAlreadyClosedError
2324
from databricks.sql.types import Row
2425

2526
from tests.unit.test_fetches import FetchTests
@@ -522,6 +523,116 @@ def test_access_current_query_id(self):
522523
cursor.close()
523524
self.assertIsNone(cursor.query_id)
524525

526+
def test_cursor_close_handles_exception(self):
527+
"""Test that Cursor.close() handles exceptions from close_command properly."""
528+
mock_backend = Mock()
529+
mock_connection = Mock()
530+
mock_op_handle = Mock()
531+
532+
mock_backend.close_command.side_effect = Exception("Test error")
533+
534+
cursor = client.Cursor(mock_connection, mock_backend)
535+
cursor.active_op_handle = mock_op_handle
536+
537+
cursor.close()
538+
539+
mock_backend.close_command.assert_called_once_with(mock_op_handle)
540+
541+
self.assertIsNone(cursor.active_op_handle)
542+
543+
self.assertFalse(cursor.open)
544+
545+
def test_cursor_context_manager_handles_exit_exception(self):
546+
"""Test that cursor's context manager handles exceptions during __exit__."""
547+
mock_backend = Mock()
548+
mock_connection = Mock()
549+
550+
cursor = client.Cursor(mock_connection, mock_backend)
551+
original_close = cursor.close
552+
cursor.close = Mock(side_effect=Exception("Test error during close"))
553+
554+
try:
555+
with cursor:
556+
raise ValueError("Test error inside context")
557+
except ValueError:
558+
pass
559+
560+
cursor.close.assert_called_once()
561+
562+
def test_connection_close_handles_cursor_close_exception(self):
563+
"""Test that _close handles exceptions from cursor.close() properly."""
564+
cursors_closed = []
565+
566+
def mock_close_with_exception():
567+
cursors_closed.append(1)
568+
raise Exception("Test error during close")
569+
570+
cursor1 = Mock()
571+
cursor1.close = mock_close_with_exception
572+
573+
def mock_close_normal():
574+
cursors_closed.append(2)
575+
576+
cursor2 = Mock()
577+
cursor2.close = mock_close_normal
578+
579+
mock_backend = Mock()
580+
mock_session_handle = Mock()
581+
582+
try:
583+
for cursor in [cursor1, cursor2]:
584+
try:
585+
cursor.close()
586+
except Exception:
587+
pass
588+
589+
mock_backend.close_session(mock_session_handle)
590+
except Exception as e:
591+
self.fail(f"Connection close should handle exceptions: {e}")
592+
593+
self.assertEqual(cursors_closed, [1, 2], "Both cursors should have close called")
594+
595+
def test_resultset_close_handles_cursor_already_closed_error(self):
596+
"""Test that ResultSet.close() handles CursorAlreadyClosedError properly."""
597+
result_set = client.ResultSet.__new__(client.ResultSet)
598+
result_set.thrift_backend = Mock()
599+
result_set.thrift_backend.CLOSED_OP_STATE = 'CLOSED'
600+
result_set.connection = Mock()
601+
result_set.connection.open = True
602+
result_set.op_state = 'RUNNING'
603+
result_set.has_been_closed_server_side = False
604+
result_set.command_id = Mock()
605+
606+
class MockRequestError(Exception):
607+
def __init__(self):
608+
self.args = ["Error message", CursorAlreadyClosedError()]
609+
610+
result_set.thrift_backend.close_command.side_effect = MockRequestError()
611+
612+
original_close = client.ResultSet.close
613+
try:
614+
try:
615+
if (
616+
result_set.op_state != result_set.thrift_backend.CLOSED_OP_STATE
617+
and not result_set.has_been_closed_server_side
618+
and result_set.connection.open
619+
):
620+
result_set.thrift_backend.close_command(result_set.command_id)
621+
except MockRequestError as e:
622+
if isinstance(e.args[1], CursorAlreadyClosedError):
623+
pass
624+
finally:
625+
result_set.has_been_closed_server_side = True
626+
result_set.op_state = result_set.thrift_backend.CLOSED_OP_STATE
627+
628+
result_set.thrift_backend.close_command.assert_called_once_with(result_set.command_id)
629+
630+
assert result_set.has_been_closed_server_side is True
631+
632+
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
633+
finally:
634+
pass
635+
525636

526637
if __name__ == "__main__":
527638
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])

0 commit comments

Comments
 (0)