From fa13d754288f924e88ad6fa3ba1dac4b442b299b Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 28 Jul 2022 16:04:25 -0500 Subject: [PATCH 01/23] Explicitly catch OSError and socket.timeout errors, with automatic retry Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index d812f93b..df9bf801 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -5,6 +5,7 @@ import threading from uuid import uuid4 from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED, create_default_context +import socket import pyarrow import thrift.transport.THttpClient @@ -279,24 +280,31 @@ def attempt_request(attempt): # - non-None method_return -> success, return and be done # - non-None retry_delay -> sleep delay before retry # - error, error_message always set when available + + error = None try: logger.debug("Sending request: {}".format(request)) response = method(request) logger.debug("Received response: {}".format(response)) return response - except Exception as error: + except (socket.timeout, OSError) as err: + retry_delay = 5 + error_message = str(err) + error = err + except Exception as err: retry_delay = extract_retry_delay(attempt) error_message = ThriftBackend._extract_error_message_from_headers( getattr(self._transport, "headers", {}) ) - return RequestErrorInfo( - error=error, - error_message=error_message, - retry_delay=retry_delay, - http_code=getattr(self._transport, "code", None), - method=method.__name__, - request=request, - ) + error = err + return RequestErrorInfo( + error=error, + error_message=error_message, + retry_delay=retry_delay, + http_code=getattr(self._transport, "code", None), + method=method.__name__, + request=request, + ) # The real work: # - for each available attempt: From b44d0b88e4774e306aff9bc9fa1091c537a72d7a Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 28 Jul 2022 16:45:42 -0500 Subject: [PATCH 02/23] Add unit and e2e tests for retries on timeout behaviour. Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 38 ++++++++++++++++++++++++++- tests/unit/test_thrift_backend.py | 43 +++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 358f0b26..f829b311 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -7,7 +7,7 @@ import sys import threading import time -from unittest import loader, skipIf, skipUnless, TestCase +from unittest import loader, skipIf, skipUnless, TestCase, mock from uuid import uuid4 import numpy as np @@ -360,6 +360,42 @@ def execute_really_long_query(): cursor.execute("SELECT * FROM range(3)") self.assertEqual(len(cursor.fetchall()), 3) + def test_retry_after_connection_timeout(self): + + import socket + from databricks.sql.thrift_api.TCLIService import ttypes + + ATTEMPTS_TO_TRY = 2 + + with self.cursor({}) as cursor: + cursor.execute("SELECT id FROM RANGE(10)") + op_handle = cursor.active_op_handle + thrift_backend = cursor.thrift_backend + + + thrift_backend._retry_stop_after_attempts_count = ATTEMPTS_TO_TRY + req = ttypes.TGetOperationStatusReq( + operationHandle=op_handle, + getProgressUpdate=False, + ) + + + with self.assertRaises(OperationalError) as cm: + with mock.patch("socket.create_connection") as mock_create_connection: + mock_create_connection.side_effect = OSError("[Errno 110]: Connection timed out") + thrift_backend.make_request(thrift_backend._client.GetOperationStatus, req) + + self.assertIn("{0}/{0}".format(ATTEMPTS_TO_TRY), cm.exception.message_with_context()) + + with self.assertRaises(OperationalError) as cm: + with mock.patch("socket.create_connection") as mock_create_connection: + mock_create_connection.side_effect = socket.timeout + thrift_backend.make_request(thrift_backend._client.GetOperationStatus, req) + + self.assertIn("{0}/{0}".format(ATTEMPTS_TO_TRY), cm.exception.message_with_context()) + + + @skipIf(pysql_has_version('<', '2'), 'requires pysql v2') def test_can_execute_command_after_failure(self): with self.cursor({}) as cursor: diff --git a/tests/unit/test_thrift_backend.py b/tests/unit/test_thrift_backend.py index d411df76..5a1f9e90 100644 --- a/tests/unit/test_thrift_backend.py +++ b/tests/unit/test_thrift_backend.py @@ -984,6 +984,49 @@ def test_make_request_wont_retry_if_headers_not_present(self, t_transport_class) self.assertIn("This method fails", str(cm.exception.message_with_context())) + @patch("thrift.transport.THttpClient.THttpClient") + def test_will_retry_on_connection_timeout(self, t_transport_class): + + import socket + + mock_method = Mock() + mock_method.__name__ = "method name" + mock_method.side_effect = socket.timeout + + thrift_backend = ThriftBackend( + "foobar", + 443, + "path", + [], + _retry_stop_after_attempts_count=2, + _retry_delay_max=5, + _retry_delay_min=0, + ) + + with self.assertRaises(OperationalError) as cm: + thrift_backend.make_request(mock_method, Mock()) + + self.assertIn("2/2", cm.exception.message_with_context()) + + mock_method = Mock() + mock_method.__name__ = "method name" + mock_method.side_effect = OSError("[Errno 110] Connection timed out") + + thrift_backend = ThriftBackend( + "foobar", + 443, + "path", + [], + _retry_stop_after_attempts_count=2, + _retry_delay_max=5, + _retry_delay_min=0, + ) + + with self.assertRaises(OperationalError) as cm: + thrift_backend.make_request(mock_method, Mock()) + + self.assertIn("2/2", cm.exception.message_with_context()) + @patch("thrift.transport.THttpClient.THttpClient") def test_make_request_wont_retry_if_error_code_not_429_or_503(self, t_transport_class): t_transport_instance = t_transport_class.return_value From 22d2ac0128ca350ec9f907a2b975f76f95d86f19 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 28 Jul 2022 16:50:23 -0500 Subject: [PATCH 03/23] Add default delay to the retry policy Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index df9bf801..821f149d 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -40,6 +40,7 @@ "_retry_delay_max": (float, 60, 5, 3600), "_retry_stop_after_attempts_count": (int, 30, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), + "_retry_delay_default": (float, 5, None, None) } @@ -80,6 +81,8 @@ def __init__( # next calculated pre-retry delay would go past # _retry_stop_after_attempts_duration, stop now.) # + #_retry_delay_default (default: 5) + # used when Retry-After is not specified by the server # _retry_stop_after_attempts_count # The maximum number of times we should retry retryable requests (defaults to 24) # _socket_timeout @@ -288,7 +291,7 @@ def attempt_request(attempt): logger.debug("Received response: {}".format(response)) return response except (socket.timeout, OSError) as err: - retry_delay = 5 + retry_delay = self._retry_delay_default error_message = str(err) error = err except Exception as err: From 44f12b7e3c534eb2a8b62cc663ff326e76bdf705 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 28 Jul 2022 16:50:43 -0500 Subject: [PATCH 04/23] Don't make unit test take 5+ seconds Signed-off-by: Jesse Whitehouse --- tests/unit/test_thrift_backend.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/tests/unit/test_thrift_backend.py b/tests/unit/test_thrift_backend.py index 5a1f9e90..09f6fdfd 100644 --- a/tests/unit/test_thrift_backend.py +++ b/tests/unit/test_thrift_backend.py @@ -999,8 +999,7 @@ def test_will_retry_on_connection_timeout(self, t_transport_class): "path", [], _retry_stop_after_attempts_count=2, - _retry_delay_max=5, - _retry_delay_min=0, + _retry_delay_default=0.25 ) with self.assertRaises(OperationalError) as cm: @@ -1012,16 +1011,6 @@ def test_will_retry_on_connection_timeout(self, t_transport_class): mock_method.__name__ = "method name" mock_method.side_effect = OSError("[Errno 110] Connection timed out") - thrift_backend = ThriftBackend( - "foobar", - 443, - "path", - [], - _retry_stop_after_attempts_count=2, - _retry_delay_max=5, - _retry_delay_min=0, - ) - with self.assertRaises(OperationalError) as cm: thrift_backend.make_request(mock_method, Mock()) From 09cefec137ed0cf951e2e43670f77299fcefd707 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 28 Jul 2022 17:02:49 -0500 Subject: [PATCH 05/23] Fix broken unit tests: test_retry_args_bounding It doesn't like retry policy bounds == None. Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 2 +- tests/unit/test_thrift_backend.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 821f149d..3060e68c 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -40,7 +40,7 @@ "_retry_delay_max": (float, 60, 5, 3600), "_retry_stop_after_attempts_count": (int, 30, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), - "_retry_delay_default": (float, 5, None, None) + "_retry_delay_default": (float, 5, 1, 60) } diff --git a/tests/unit/test_thrift_backend.py b/tests/unit/test_thrift_backend.py index 09f6fdfd..f671c5c1 100644 --- a/tests/unit/test_thrift_backend.py +++ b/tests/unit/test_thrift_backend.py @@ -19,6 +19,7 @@ def retry_policy_factory(): "_retry_delay_max": (float, 60, None, None), "_retry_stop_after_attempts_count": (int, 30, None, None), "_retry_stop_after_attempts_duration": (float, 900, None, None), + "_retry_delay_default": (float, 5, 0, 10), } From de96fce0e21dd7093ea331e3f6b54e93c4abdd35 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 28 Jul 2022 17:05:44 -0500 Subject: [PATCH 06/23] Black thrift_backend.py Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 3060e68c..d77dae0c 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -40,7 +40,7 @@ "_retry_delay_max": (float, 60, 5, 3600), "_retry_stop_after_attempts_count": (int, 30, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), - "_retry_delay_default": (float, 5, 1, 60) + "_retry_delay_default": (float, 5, 1, 60), } @@ -81,7 +81,7 @@ def __init__( # next calculated pre-retry delay would go past # _retry_stop_after_attempts_duration, stop now.) # - #_retry_delay_default (default: 5) + # _retry_delay_default (default: 5) # used when Retry-After is not specified by the server # _retry_stop_after_attempts_count # The maximum number of times we should retry retryable requests (defaults to 24) From 21c06d414f0401a395c65259656b80969c124c93 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 28 Jul 2022 17:21:23 -0500 Subject: [PATCH 07/23] Only retry OSError's that mention Errno 110 Other OSError's like `EINTR` could indicate a call was interrupted after it was received by the server, which would potentially not be idempotent Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 11 ++++++++++- tests/unit/test_thrift_backend.py | 25 +++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index d77dae0c..2116a5b8 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -290,10 +290,19 @@ def attempt_request(attempt): response = method(request) logger.debug("Received response: {}".format(response)) return response - except (socket.timeout, OSError) as err: + except socket.timeout as err: + # socket.timeout means the connection was timed out by the socket package retry_delay = self._retry_delay_default error_message = str(err) error = err + except OSError as err: + # OSError 110 means the connection was timed out by the operating system + if "Errno 110" in str(err): + retry_delay = self._retry_delay_default + else: + retry_delay = None + error_message = str(err) + error = err except Exception as err: retry_delay = extract_retry_delay(attempt) error_message = ThriftBackend._extract_error_message_from_headers( diff --git a/tests/unit/test_thrift_backend.py b/tests/unit/test_thrift_backend.py index f671c5c1..1e37010e 100644 --- a/tests/unit/test_thrift_backend.py +++ b/tests/unit/test_thrift_backend.py @@ -1017,6 +1017,31 @@ def test_will_retry_on_connection_timeout(self, t_transport_class): self.assertIn("2/2", cm.exception.message_with_context()) + @patch("thrift.transport.THttpClient.THttpClient") + def test_will_not_retry_on_non_timeout_oserror(self, t_transport_class): + + + + mock_method = Mock() + mock_method.__name__ = "method name" + mock_method.side_effect = OSError("I am not a timeout error") + + thrift_backend = ThriftBackend( + "foobar", + 443, + "path", + [], + _retry_stop_after_attempts_count=2, + _retry_delay_default=0.25 + ) + + with self.assertRaises(OperationalError) as cm: + thrift_backend.make_request(mock_method, Mock()) + + self.assertIn("I am not a timeout error", str(cm.exception.message_with_context())) + self.assertIn("1/2", cm.exception.message_with_context()) + + @patch("thrift.transport.THttpClient.THttpClient") def test_make_request_wont_retry_if_error_code_not_429_or_503(self, t_transport_class): t_transport_instance = t_transport_class.return_value From befdf3dc1b4e0368311c1c3a46c68d15f3dc4b86 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 28 Jul 2022 17:25:40 -0500 Subject: [PATCH 08/23] Fix unnecessary indentation Signed-off-by: Jesse Whitehouse --- tests/unit/test_thrift_backend.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_thrift_backend.py b/tests/unit/test_thrift_backend.py index 1e37010e..85b05218 100644 --- a/tests/unit/test_thrift_backend.py +++ b/tests/unit/test_thrift_backend.py @@ -1008,14 +1008,14 @@ def test_will_retry_on_connection_timeout(self, t_transport_class): self.assertIn("2/2", cm.exception.message_with_context()) - mock_method = Mock() - mock_method.__name__ = "method name" - mock_method.side_effect = OSError("[Errno 110] Connection timed out") + mock_method = Mock() + mock_method.__name__ = "method name" + mock_method.side_effect = OSError("[Errno 110] Connection timed out") - with self.assertRaises(OperationalError) as cm: - thrift_backend.make_request(mock_method, Mock()) + with self.assertRaises(OperationalError) as cm: + thrift_backend.make_request(mock_method, Mock()) - self.assertIn("2/2", cm.exception.message_with_context()) + self.assertIn("2/2", cm.exception.message_with_context()) @patch("thrift.transport.THttpClient.THttpClient") def test_will_not_retry_on_non_timeout_oserror(self, t_transport_class): From fb1275b04f17a840320c113567bc9bf5b786d2d2 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Fri, 29 Jul 2022 13:42:32 -0500 Subject: [PATCH 09/23] Clarify docstrings after review feedback Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 2116a5b8..bdaf1006 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -247,7 +247,7 @@ def _handle_request_error(self, error_info, attempt, elapsed): # FUTURE: Consider moving to https://github.com/litl/backoff or # https://github.com/jd/tenacity for retry logic. def make_request(self, method, request): - """Execute given request, attempting retries when receiving HTTP 429/503. + """Execute given request, attempting retries when TCP connection fils or when receiving HTTP 429/503. For delay between attempts, honor the given Retry-After header, but with bounds. Use lower bound of expontial-backoff based on _retry_delay_min, @@ -265,8 +265,12 @@ def get_elapsed(): return time.time() - t0 def extract_retry_delay(attempt): - # encapsulate retry checks, returns None || delay-in-secs - # Retry IFF 429/503 code + Retry-After header set + """ + Encapsulate retry checks based on HTTP headers. Returns None || delay-in-secs + + Retry IFF 429/503 code + Retry-After header set + """ + http_code = getattr(self._transport, "code", None) retry_after = getattr(self._transport, "headers", {}).get("Retry-After") if http_code in [429, 503] and retry_after: From c76ee65ee674a3c1fcd1b45417b0493bcccdf899 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Fri, 29 Jul 2022 13:45:29 -0500 Subject: [PATCH 10/23] Default retry_delay = None (not retryable) Only make it non-null for retryable requests Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index bdaf1006..df5eb4d1 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -289,6 +289,9 @@ def attempt_request(attempt): # - error, error_message always set when available error = None + + # If retry_delay is None the request is treated as non-retryable + retry_delay = None try: logger.debug("Sending request: {}".format(request)) response = method(request) @@ -303,8 +306,6 @@ def attempt_request(attempt): # OSError 110 means the connection was timed out by the operating system if "Errno 110" in str(err): retry_delay = self._retry_delay_default - else: - retry_delay = None error_message = str(err) error = err except Exception as err: From 74748341bbdc36617ba2c5c9a0a75ece2127bed8 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 2 Aug 2022 08:59:21 -0500 Subject: [PATCH 11/23] This approach passes the e2e tests, but they take exactly 4 mins 51 secs every time. Whereas the previous approach they passed in ten seconds. Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 30 ++++++++++++-- tests/e2e/driver_tests.py | 58 ++++++++++++++++++++-------- 2 files changed, 67 insertions(+), 21 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index df5eb4d1..ce01b99b 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -5,7 +5,7 @@ import threading from uuid import uuid4 from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED, create_default_context -import socket +import socket, types import pyarrow import thrift.transport.THttpClient @@ -298,12 +298,34 @@ def attempt_request(attempt): logger.debug("Received response: {}".format(response)) return response except socket.timeout as err: - # socket.timeout means the connection was timed out by the socket package - retry_delay = self._retry_delay_default + # We only retry for socket.timeout if the operation that timed out was a connection request + # Otherwise idempotency is not guaranteed because something may have been transmitted to the server + + def _dig_through_traceback(tb: types.TracebackType, mod, meth): + """Recursively search the traceback stack to see if mod.meth raised the exception + """ + _mod, _meth = mod, meth + tb_meth = tb.tb_frame.f_code.co_name + tb_mod = tb.tb_frame.f_code.co_filename.split("/")[-1].replace(".py", "") + + if tb_meth == _meth and _mod == tb_mod: + return True + elif tb.tb_next is None: + return False + + return _dig_through_traceback(tb.tb_next, mod, meth) + + tb = err.__traceback__ + failed_during_socket_connect = _dig_through_traceback(tb, "socket", "create_connection") + failed_during_http_open = _dig_through_traceback(tb, "client", "connect") + + if failed_during_socket_connect and failed_during_http_open: + retry_delay = self._retry_delay_default + error_message = str(err) error = err except OSError as err: - # OSError 110 means the connection was timed out by the operating system + # OSError 110 means EHOSTUNREACHABLE, which means the connection was timed out by the operating system if "Errno 110" in str(err): retry_delay = self._retry_delay_default error_message = str(err) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index f829b311..70cfd304 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -361,36 +361,60 @@ def execute_really_long_query(): self.assertEqual(len(cursor.fetchall()), 3) def test_retry_after_connection_timeout(self): + # We only retry a request that failed because of a socket timeout in this case + # when the timeout occurred while trying to connect with the thrift server. + # In this situation, we know that a command was not sent to the server because + # no connection was made. import socket + from unittest.mock import Mock from databricks.sql.thrift_api.TCLIService import ttypes - ATTEMPTS_TO_TRY = 2 - - with self.cursor({}) as cursor: - cursor.execute("SELECT id FROM RANGE(10)") - op_handle = cursor.active_op_handle - thrift_backend = cursor.thrift_backend + # First let's check the non-retry behavior + # Given the client has successfully connected to the server already + # When a socket.timeout exception is raised + # Then the request is not retried + with self.assertRaises(OperationalError) as cm: + + # No mocks at this point. If calling self.cursor() succeeds + # that means there is an open / working connection to thrift server. + with self.cursor({}) as cursor: + + # Next apply a patch to the transport which raises a socket.timeout + # whenever any data is sent over the wire + with mock.patch("http.client.HTTPConnection.send") as mock_send: + mock_send.side_effect = socket.timeout + cursor.execute("I AM A VERY DANGEROUS, NOT IDEMPOTENT QUERY!!!") + self.assertIn("non-retryable error", cm.exception.message_with_context()) + + + # Second, let's check whether a request is retried if it fails during + # the connection attempt, instead of the send attempt. + ATTEMPTS_TO_TRY = 2 - thrift_backend._retry_stop_after_attempts_count = ATTEMPTS_TO_TRY - req = ttypes.TGetOperationStatusReq( - operationHandle=op_handle, - getProgressUpdate=False, - ) + # This is a normal query execution + # with self.cursor({}) as cursor: + # thrift_backend = cursor.thrift_backend with self.assertRaises(OperationalError) as cm: - with mock.patch("socket.create_connection") as mock_create_connection: - mock_create_connection.side_effect = OSError("[Errno 110]: Connection timed out") - thrift_backend.make_request(thrift_backend._client.GetOperationStatus, req) + with mock.patch("socket.socket.connect") as mock_connect: + mock_connect.side_effect = OSError("[Errno 110]: Connection timed out") + with self.cursor() as cursor: + # Connection will fail + cursor.execute("SOME RANDOM STATEMENT") + pass self.assertIn("{0}/{0}".format(ATTEMPTS_TO_TRY), cm.exception.message_with_context()) with self.assertRaises(OperationalError) as cm: - with mock.patch("socket.create_connection") as mock_create_connection: - mock_create_connection.side_effect = socket.timeout - thrift_backend.make_request(thrift_backend._client.GetOperationStatus, req) + with mock.patch("socket.socket.connect") as mock_connect: + mock_connect.side_effect = socket.timeout + with self.cursor() as cursor: + # Connection will fail + cursor.execute("SOME RANDOM STATEMENT") + pass self.assertIn("{0}/{0}".format(ATTEMPTS_TO_TRY), cm.exception.message_with_context()) From fa1fd50c4388abd2ac41727b50a90d522e8e98c5 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 4 Aug 2022 10:20:32 -0500 Subject: [PATCH 12/23] Revert all changes since `main` Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 71 +++++----------------------- tests/e2e/driver_tests.py | 62 +----------------------- tests/unit/test_thrift_backend.py | 58 ----------------------- 3 files changed, 13 insertions(+), 178 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index ce01b99b..d812f93b 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -5,7 +5,6 @@ import threading from uuid import uuid4 from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED, create_default_context -import socket, types import pyarrow import thrift.transport.THttpClient @@ -40,7 +39,6 @@ "_retry_delay_max": (float, 60, 5, 3600), "_retry_stop_after_attempts_count": (int, 30, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), - "_retry_delay_default": (float, 5, 1, 60), } @@ -81,8 +79,6 @@ def __init__( # next calculated pre-retry delay would go past # _retry_stop_after_attempts_duration, stop now.) # - # _retry_delay_default (default: 5) - # used when Retry-After is not specified by the server # _retry_stop_after_attempts_count # The maximum number of times we should retry retryable requests (defaults to 24) # _socket_timeout @@ -247,7 +243,7 @@ def _handle_request_error(self, error_info, attempt, elapsed): # FUTURE: Consider moving to https://github.com/litl/backoff or # https://github.com/jd/tenacity for retry logic. def make_request(self, method, request): - """Execute given request, attempting retries when TCP connection fils or when receiving HTTP 429/503. + """Execute given request, attempting retries when receiving HTTP 429/503. For delay between attempts, honor the given Retry-After header, but with bounds. Use lower bound of expontial-backoff based on _retry_delay_min, @@ -265,12 +261,8 @@ def get_elapsed(): return time.time() - t0 def extract_retry_delay(attempt): - """ - Encapsulate retry checks based on HTTP headers. Returns None || delay-in-secs - - Retry IFF 429/503 code + Retry-After header set - """ - + # encapsulate retry checks, returns None || delay-in-secs + # Retry IFF 429/503 code + Retry-After header set http_code = getattr(self._transport, "code", None) retry_after = getattr(self._transport, "headers", {}).get("Retry-After") if http_code in [429, 503] and retry_after: @@ -287,63 +279,24 @@ def attempt_request(attempt): # - non-None method_return -> success, return and be done # - non-None retry_delay -> sleep delay before retry # - error, error_message always set when available - - error = None - - # If retry_delay is None the request is treated as non-retryable - retry_delay = None try: logger.debug("Sending request: {}".format(request)) response = method(request) logger.debug("Received response: {}".format(response)) return response - except socket.timeout as err: - # We only retry for socket.timeout if the operation that timed out was a connection request - # Otherwise idempotency is not guaranteed because something may have been transmitted to the server - - def _dig_through_traceback(tb: types.TracebackType, mod, meth): - """Recursively search the traceback stack to see if mod.meth raised the exception - """ - _mod, _meth = mod, meth - tb_meth = tb.tb_frame.f_code.co_name - tb_mod = tb.tb_frame.f_code.co_filename.split("/")[-1].replace(".py", "") - - if tb_meth == _meth and _mod == tb_mod: - return True - elif tb.tb_next is None: - return False - - return _dig_through_traceback(tb.tb_next, mod, meth) - - tb = err.__traceback__ - failed_during_socket_connect = _dig_through_traceback(tb, "socket", "create_connection") - failed_during_http_open = _dig_through_traceback(tb, "client", "connect") - - if failed_during_socket_connect and failed_during_http_open: - retry_delay = self._retry_delay_default - - error_message = str(err) - error = err - except OSError as err: - # OSError 110 means EHOSTUNREACHABLE, which means the connection was timed out by the operating system - if "Errno 110" in str(err): - retry_delay = self._retry_delay_default - error_message = str(err) - error = err - except Exception as err: + except Exception as error: retry_delay = extract_retry_delay(attempt) error_message = ThriftBackend._extract_error_message_from_headers( getattr(self._transport, "headers", {}) ) - error = err - return RequestErrorInfo( - error=error, - error_message=error_message, - retry_delay=retry_delay, - http_code=getattr(self._transport, "code", None), - method=method.__name__, - request=request, - ) + return RequestErrorInfo( + error=error, + error_message=error_message, + retry_delay=retry_delay, + http_code=getattr(self._transport, "code", None), + method=method.__name__, + request=request, + ) # The real work: # - for each available attempt: diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 70cfd304..358f0b26 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -7,7 +7,7 @@ import sys import threading import time -from unittest import loader, skipIf, skipUnless, TestCase, mock +from unittest import loader, skipIf, skipUnless, TestCase from uuid import uuid4 import numpy as np @@ -360,66 +360,6 @@ def execute_really_long_query(): cursor.execute("SELECT * FROM range(3)") self.assertEqual(len(cursor.fetchall()), 3) - def test_retry_after_connection_timeout(self): - # We only retry a request that failed because of a socket timeout in this case - # when the timeout occurred while trying to connect with the thrift server. - # In this situation, we know that a command was not sent to the server because - # no connection was made. - - import socket - from unittest.mock import Mock - from databricks.sql.thrift_api.TCLIService import ttypes - - # First let's check the non-retry behavior - # Given the client has successfully connected to the server already - # When a socket.timeout exception is raised - # Then the request is not retried - with self.assertRaises(OperationalError) as cm: - - # No mocks at this point. If calling self.cursor() succeeds - # that means there is an open / working connection to thrift server. - with self.cursor({}) as cursor: - - # Next apply a patch to the transport which raises a socket.timeout - # whenever any data is sent over the wire - with mock.patch("http.client.HTTPConnection.send") as mock_send: - mock_send.side_effect = socket.timeout - cursor.execute("I AM A VERY DANGEROUS, NOT IDEMPOTENT QUERY!!!") - self.assertIn("non-retryable error", cm.exception.message_with_context()) - - - # Second, let's check whether a request is retried if it fails during - # the connection attempt, instead of the send attempt. - - ATTEMPTS_TO_TRY = 2 - - # This is a normal query execution - # with self.cursor({}) as cursor: - # thrift_backend = cursor.thrift_backend - - - with self.assertRaises(OperationalError) as cm: - with mock.patch("socket.socket.connect") as mock_connect: - mock_connect.side_effect = OSError("[Errno 110]: Connection timed out") - with self.cursor() as cursor: - # Connection will fail - cursor.execute("SOME RANDOM STATEMENT") - pass - - self.assertIn("{0}/{0}".format(ATTEMPTS_TO_TRY), cm.exception.message_with_context()) - - with self.assertRaises(OperationalError) as cm: - with mock.patch("socket.socket.connect") as mock_connect: - mock_connect.side_effect = socket.timeout - with self.cursor() as cursor: - # Connection will fail - cursor.execute("SOME RANDOM STATEMENT") - pass - - self.assertIn("{0}/{0}".format(ATTEMPTS_TO_TRY), cm.exception.message_with_context()) - - - @skipIf(pysql_has_version('<', '2'), 'requires pysql v2') def test_can_execute_command_after_failure(self): with self.cursor({}) as cursor: diff --git a/tests/unit/test_thrift_backend.py b/tests/unit/test_thrift_backend.py index 85b05218..d411df76 100644 --- a/tests/unit/test_thrift_backend.py +++ b/tests/unit/test_thrift_backend.py @@ -19,7 +19,6 @@ def retry_policy_factory(): "_retry_delay_max": (float, 60, None, None), "_retry_stop_after_attempts_count": (int, 30, None, None), "_retry_stop_after_attempts_duration": (float, 900, None, None), - "_retry_delay_default": (float, 5, 0, 10), } @@ -985,63 +984,6 @@ def test_make_request_wont_retry_if_headers_not_present(self, t_transport_class) self.assertIn("This method fails", str(cm.exception.message_with_context())) - @patch("thrift.transport.THttpClient.THttpClient") - def test_will_retry_on_connection_timeout(self, t_transport_class): - - import socket - - mock_method = Mock() - mock_method.__name__ = "method name" - mock_method.side_effect = socket.timeout - - thrift_backend = ThriftBackend( - "foobar", - 443, - "path", - [], - _retry_stop_after_attempts_count=2, - _retry_delay_default=0.25 - ) - - with self.assertRaises(OperationalError) as cm: - thrift_backend.make_request(mock_method, Mock()) - - self.assertIn("2/2", cm.exception.message_with_context()) - - mock_method = Mock() - mock_method.__name__ = "method name" - mock_method.side_effect = OSError("[Errno 110] Connection timed out") - - with self.assertRaises(OperationalError) as cm: - thrift_backend.make_request(mock_method, Mock()) - - self.assertIn("2/2", cm.exception.message_with_context()) - - @patch("thrift.transport.THttpClient.THttpClient") - def test_will_not_retry_on_non_timeout_oserror(self, t_transport_class): - - - - mock_method = Mock() - mock_method.__name__ = "method name" - mock_method.side_effect = OSError("I am not a timeout error") - - thrift_backend = ThriftBackend( - "foobar", - 443, - "path", - [], - _retry_stop_after_attempts_count=2, - _retry_delay_default=0.25 - ) - - with self.assertRaises(OperationalError) as cm: - thrift_backend.make_request(mock_method, Mock()) - - self.assertIn("I am not a timeout error", str(cm.exception.message_with_context())) - self.assertIn("1/2", cm.exception.message_with_context()) - - @patch("thrift.transport.THttpClient.THttpClient") def test_make_request_wont_retry_if_error_code_not_429_or_503(self, t_transport_class): t_transport_instance = t_transport_class.return_value From bf65b81df48cd9ec718c84ff6f1d20e47864c546 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 2 Aug 2022 10:12:52 -0500 Subject: [PATCH 13/23] isolate delay bounding Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index d812f93b..3497f2da 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -260,6 +260,14 @@ def make_request(self, method, request): def get_elapsed(): return time.time() - t0 + def bound_retry_delay(attempt, proposed_delay): + """bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay] + """ + delay = int(proposed_delay) + delay = max(delay, self._retry_delay_min * math.pow(1.5, attempt - 1)) + delay = min(delay, self._retry_delay_max) + return delay + def extract_retry_delay(attempt): # encapsulate retry checks, returns None || delay-in-secs # Retry IFF 429/503 code + Retry-After header set @@ -267,10 +275,7 @@ def extract_retry_delay(attempt): retry_after = getattr(self._transport, "headers", {}).get("Retry-After") if http_code in [429, 503] and retry_after: # bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay] - delay = int(retry_after) - delay = max(delay, self._retry_delay_min * math.pow(1.5, attempt - 1)) - delay = min(delay, self._retry_delay_max) - return delay + return bound_retry_delay(attempt, int(retry_after)) return None def attempt_request(attempt): From a0d340e659f8e142959cb11bbab7cb13bf03651f Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 2 Aug 2022 10:21:47 -0500 Subject: [PATCH 14/23] Move error details scope up one-level. Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 3497f2da..cc4a0bfc 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -284,24 +284,27 @@ def attempt_request(attempt): # - non-None method_return -> success, return and be done # - non-None retry_delay -> sleep delay before retry # - error, error_message always set when available + + error, error_message, retry_delay = None, None, None try: logger.debug("Sending request: {}".format(request)) response = method(request) logger.debug("Received response: {}".format(response)) return response - except Exception as error: + except Exception as err: + error = err retry_delay = extract_retry_delay(attempt) error_message = ThriftBackend._extract_error_message_from_headers( getattr(self._transport, "headers", {}) ) - return RequestErrorInfo( - error=error, - error_message=error_message, - retry_delay=retry_delay, - http_code=getattr(self._transport, "code", None), - method=method.__name__, - request=request, - ) + return RequestErrorInfo( + error=error, + error_message=error_message, + retry_delay=retry_delay, + http_code=getattr(self._transport, "code", None), + method=method.__name__, + request=request, + ) # The real work: # - for each available attempt: From a55cf9dc1af85a750d1bc71ccb5dfe5be4dfbaf2 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 2 Aug 2022 10:14:51 -0500 Subject: [PATCH 15/23] Retry GetOperationStatus if an OSError was raised during execution Add retry_delay_default to use in this case. Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 13 ++++++++++ tests/unit/test_thrift_backend.py | 39 ++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index cc4a0bfc..8c5296dc 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -15,6 +15,9 @@ from databricks.sql.thrift_api.TCLIService import TCLIService, ttypes from databricks.sql import * +from databricks.sql.thrift_api.TCLIService.TCLIService import ( + Client as TCLIServiceClient, +) from databricks.sql.utils import ( ArrowQueue, ExecuteResponse, @@ -39,6 +42,7 @@ "_retry_delay_max": (float, 60, 5, 3600), "_retry_stop_after_attempts_count": (int, 30, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), + "_retry_delay_default": (float, 5, 1, 60) } @@ -71,6 +75,8 @@ def __init__( # _retry_delay_min (default: 1) # _retry_delay_max (default: 60) # {min,max} pre-retry delay bounds + # _retry_delay_default (default: 5) + # Only used when GetOperationStatus fails due to a TCP/OS Error. # _retry_stop_after_attempts_count (default: 30) # total max attempts during retry sequence # _retry_stop_after_attempts_duration (default: 900) @@ -291,6 +297,13 @@ def attempt_request(attempt): response = method(request) logger.debug("Received response: {}".format(response)) return response + except OSError as err: + error = err + error_message = str(err) + + gos_name = TCLIServiceClient.GetOperationStatus.__name__ + if method.__name__ == gos_name: + retry_delay = bound_retry_delay(attempt, self._retry_delay_default) except Exception as err: error = err retry_delay = extract_retry_delay(attempt) diff --git a/tests/unit/test_thrift_backend.py b/tests/unit/test_thrift_backend.py index d411df76..d2b8745c 100644 --- a/tests/unit/test_thrift_backend.py +++ b/tests/unit/test_thrift_backend.py @@ -19,6 +19,7 @@ def retry_policy_factory(): "_retry_delay_max": (float, 60, None, None), "_retry_stop_after_attempts_count": (int, 30, None, None), "_retry_stop_after_attempts_duration": (float, 900, None, None), + "_retry_delay_default": (float, 5, 1, 60) } @@ -968,6 +969,44 @@ def test_handle_execute_response_sets_active_op_handle(self): self.assertEqual(mock_resp.operationHandle, mock_cursor.active_op_handle) + @patch("thrift.transport.THttpClient.THttpClient") + @patch("databricks.sql.thrift_api.TCLIService.TCLIService.Client.GetOperationStatus") + @patch("databricks.sql.thrift_backend._retry_policy", new_callable=retry_policy_factory) + def test_make_request_will_retry_GetOperationStatus( + self, mock_retry_policy, mock_GetOperationStatus, t_transport_class): + + import thrift + from databricks.sql.thrift_api.TCLIService.TCLIService import Client + from databricks.sql.exc import RequestError + from databricks.sql.utils import NoRetryReason + + mock_GetOperationStatus.__name__ = "GetOperationStatus" + mock_GetOperationStatus.side_effect = TimeoutError(110) + + protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocol(t_transport_class) + client = Client(protocol) + + req = ttypes.TGetOperationStatusReq( + operationHandle=self.operation_handle, + getProgressUpdate=False, + ) + + EXPECTED_RETRIES = 2 + + thrift_backend = ThriftBackend( + "foobar", + 443, + "path", [], + _retry_stop_after_attempts_count=EXPECTED_RETRIES, + _retry_delay_default=0.1) + + with self.assertRaises(RequestError) as cm: + thrift_backend.make_request(client.GetOperationStatus, req) + + self.assertEqual(NoRetryReason.OUT_OF_ATTEMPTS.value, cm.exception.context["no-retry-reason"]) + self.assertEqual(f'{EXPECTED_RETRIES}/{EXPECTED_RETRIES}', cm.exception.context["attempt"]) + + @patch("thrift.transport.THttpClient.THttpClient") def test_make_request_wont_retry_if_headers_not_present(self, t_transport_class): t_transport_instance = t_transport_class.return_value From 38411a8c0ed8db59d403ab24bd82a4e1dee62da1 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 4 Aug 2022 08:58:57 -0500 Subject: [PATCH 16/23] Log when a request is retried due to an OSError. Emit warnings for unexpected OSError codes Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 22 ++++++++++++++++++++++ tests/unit/test_thrift_backend.py | 28 +++++++++++++++++++++++----- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 8c5296dc..2cd116c5 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -1,4 +1,5 @@ from decimal import Decimal +import errno import logging import math import time @@ -304,6 +305,27 @@ def attempt_request(attempt): gos_name = TCLIServiceClient.GetOperationStatus.__name__ if method.__name__ == gos_name: retry_delay = bound_retry_delay(attempt, self._retry_delay_default) + + # fmt: off + # The built-in errno package encapsulates OSError codes, which are OS-specific. + # log.info for errors we believe are not unusual or unexpected. log.warn for + # for others like EEXIST, EBADF, ERANGE which are not expected in this context. + # | Debian | Darwin | + info_errs = [ # |--------|--------| + errno.ESHUTDOWN, # | 32 | 32 | + errno.EAFNOSUPPORT, # | 97 | 47 | + errno.ECONNRESET, # | 104 | 54 | + errno.ETIMEDOUT, # | 110 | 60 | + ] + + # fmt: on + log_string = ( + f"{gos_name} failed with code {err.errno} and will attempt to retry" + ) + if err.errno in info_errs: + logger.info(log_string) + else: + logger.warning(log_string) except Exception as err: error = err retry_delay = extract_retry_delay(attempt) diff --git a/tests/unit/test_thrift_backend.py b/tests/unit/test_thrift_backend.py index d2b8745c..e8c5a727 100644 --- a/tests/unit/test_thrift_backend.py +++ b/tests/unit/test_thrift_backend.py @@ -975,13 +975,14 @@ def test_handle_execute_response_sets_active_op_handle(self): def test_make_request_will_retry_GetOperationStatus( self, mock_retry_policy, mock_GetOperationStatus, t_transport_class): - import thrift + import thrift, errno from databricks.sql.thrift_api.TCLIService.TCLIService import Client from databricks.sql.exc import RequestError from databricks.sql.utils import NoRetryReason - mock_GetOperationStatus.__name__ = "GetOperationStatus" - mock_GetOperationStatus.side_effect = TimeoutError(110) + this_gos_name = "GetOperationStatus" + mock_GetOperationStatus.__name__ = this_gos_name + mock_GetOperationStatus.side_effect = OSError(errno.ETIMEDOUT, "Connection timed out") protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocol(t_transport_class) client = Client(protocol) @@ -998,13 +999,30 @@ def test_make_request_will_retry_GetOperationStatus( 443, "path", [], _retry_stop_after_attempts_count=EXPECTED_RETRIES, - _retry_delay_default=0.1) + _retry_delay_default=1) + with self.assertRaises(RequestError) as cm: thrift_backend.make_request(client.GetOperationStatus, req) self.assertEqual(NoRetryReason.OUT_OF_ATTEMPTS.value, cm.exception.context["no-retry-reason"]) - self.assertEqual(f'{EXPECTED_RETRIES}/{EXPECTED_RETRIES}', cm.exception.context["attempt"]) + self.assertEqual(f'{EXPECTED_RETRIES}/{EXPECTED_RETRIES}', cm.exception.context["attempt"]) + + # Unusual OSError code + mock_GetOperationStatus.side_effect = OSError(errno.EEXIST, "File does not exist") + + with self.assertLogs("databricks.sql.thrift_backend", level=logging.WARNING) as cm: + with self.assertRaises(RequestError): + thrift_backend.make_request(client.GetOperationStatus, req) + + # There should be two warning log messages: one for each retry + self.assertEqual(len(cm.output), EXPECTED_RETRIES) + + # The warnings should be identical + self.assertEqual(cm.output[1], cm.output[0]) + + # The warnings should include this text + self.assertIn(f"{this_gos_name} failed with code {errno.EEXIST} and will attempt to retry", cm.output[0]) @patch("thrift.transport.THttpClient.THttpClient") From 5096ef0c939c7eb89419edba67ca02d00cec4004 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 4 Aug 2022 09:46:33 -0500 Subject: [PATCH 17/23] Update docstring for make_request Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 2cd116c5..a1bc811d 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -250,7 +250,9 @@ def _handle_request_error(self, error_info, attempt, elapsed): # FUTURE: Consider moving to https://github.com/litl/backoff or # https://github.com/jd/tenacity for retry logic. def make_request(self, method, request): - """Execute given request, attempting retries when receiving HTTP 429/503. + """Execute given request, attempting retries when + 1. Receiving HTTP 429/503 from server + 2. OSError is raised during a GetOperationStatus For delay between attempts, honor the given Retry-After header, but with bounds. Use lower bound of expontial-backoff based on _retry_delay_min, From 5c1ee793522f9110c07026b906395599e61c84fd Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 4 Aug 2022 09:47:21 -0500 Subject: [PATCH 18/23] Nit: unit tests show the .warn message is deprecated. DeprecationWarning: The 'warn' function is deprecated, use 'warning' instead Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index a1bc811d..271a04bd 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -165,7 +165,7 @@ def _initialize_retry_args(self, kwargs): "retry parameter: {} given_or_default {}".format(key, given_or_default) ) if bound != given_or_default: - logger.warn( + logger.warning( "Override out of policy retry parameter: " + "{} given {}, restricted to {}".format( key, given_or_default, bound From 1f87a384b3857313633d8ec9295a16b5ffbed612 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 4 Aug 2022 09:47:27 -0500 Subject: [PATCH 19/23] Black thrift_backend.py Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 271a04bd..13eeaf70 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -43,7 +43,7 @@ "_retry_delay_max": (float, 60, 5, 3600), "_retry_stop_after_attempts_count": (int, 30, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), - "_retry_delay_default": (float, 5, 1, 60) + "_retry_delay_default": (float, 5, 1, 60), } @@ -77,7 +77,7 @@ def __init__( # _retry_delay_max (default: 60) # {min,max} pre-retry delay bounds # _retry_delay_default (default: 5) - # Only used when GetOperationStatus fails due to a TCP/OS Error. + # Only used when GetOperationStatus fails due to a TCP/OS Error. # _retry_stop_after_attempts_count (default: 30) # total max attempts during retry sequence # _retry_stop_after_attempts_duration (default: 900) @@ -270,8 +270,7 @@ def get_elapsed(): return time.time() - t0 def bound_retry_delay(attempt, proposed_delay): - """bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay] - """ + """bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay]""" delay = int(proposed_delay) delay = max(delay, self._retry_delay_min * math.pow(1.5, attempt - 1)) delay = min(delay, self._retry_delay_max) @@ -293,7 +292,7 @@ def attempt_request(attempt): # - non-None method_return -> success, return and be done # - non-None retry_delay -> sleep delay before retry # - error, error_message always set when available - + error, error_message, retry_delay = None, None, None try: logger.debug("Sending request: {}".format(request)) From baff3d5250c6818c52647ba6cecebb9655ebc39a Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 4 Aug 2022 10:50:36 -0500 Subject: [PATCH 20/23] Rerun black on thrift_backend.py Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 13eeaf70..650a163d 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -320,9 +320,7 @@ def attempt_request(attempt): ] # fmt: on - log_string = ( - f"{gos_name} failed with code {err.errno} and will attempt to retry" - ) + log_string = f"{gos_name} failed with code {err.errno} and will attempt to retry" if err.errno in info_errs: logger.info(log_string) else: From 10016ea254359a8c3d6ad2d92c2ae86a0efe9c0f Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Fri, 5 Aug 2022 15:51:45 -0500 Subject: [PATCH 21/23] Add comment about manual tests Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 650a163d..cf665d5e 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -311,6 +311,10 @@ def attempt_request(attempt): # The built-in errno package encapsulates OSError codes, which are OS-specific. # log.info for errors we believe are not unusual or unexpected. log.warn for # for others like EEXIST, EBADF, ERANGE which are not expected in this context. + # + # I manually tested this retry behaviour using mitmweb and confirmed that + # GetOperationStatus requests are retried when I forced network connection + # interruptions / timeouts / reconnects. See #24 for more info. # | Debian | Darwin | info_errs = [ # |--------|--------| errno.ESHUTDOWN, # | 32 | 32 | From 4db4ad019b6830d85ee4ac2363b4be713ef609ae Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Fri, 5 Aug 2022 16:15:48 -0500 Subject: [PATCH 22/23] Bump to v2.0.3 Signed-off-by: Jesse Whitehouse --- CHANGELOG.md | 5 ++++- pyproject.toml | 2 +- src/databricks/sql/__init__.py | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2730415a..aeaba174 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,10 @@ # Release History -## 2.x.x (Unreleased) +## 2.0.x (Unreleased) +## 2.0.3 (2022-08-05) + +- Add retry logic for `GetOperationStatus` requests that fail with an `OSError` - Reorganised code to use Poetry for dependency management. ## 2.0.2 (2022-05-04) - Better exception handling in automatic connection close diff --git a/pyproject.toml b/pyproject.toml index de9160de..ac211b9d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databricks-sql-connector" -version = "2.0.2" +version = "2.0.3" description = "Databricks SQL Connector for Python" authors = ["Databricks "] license = "Apache-2.0" diff --git a/src/databricks/sql/__init__.py b/src/databricks/sql/__init__.py index 8f67d465..f1d979f6 100644 --- a/src/databricks/sql/__init__.py +++ b/src/databricks/sql/__init__.py @@ -28,7 +28,7 @@ def __repr__(self): DATE = DBAPITypeObject("date") ROWID = DBAPITypeObject() -__version__ = "2.0.2" +__version__ = "2.0.3" USER_AGENT_NAME = "PyDatabricksSqlConnector" # These two functions are pyhive legacy From 767e34ce1fcf98aa874786373361c3d8680fcd37 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Fri, 5 Aug 2022 16:19:56 -0500 Subject: [PATCH 23/23] Revert "Bump to v2.0.3" This reverts commit 4db4ad019b6830d85ee4ac2363b4be713ef609ae. Signed-off-by: Jesse Whitehouse --- CHANGELOG.md | 5 +---- pyproject.toml | 2 +- src/databricks/sql/__init__.py | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aeaba174..2730415a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,7 @@ # Release History -## 2.0.x (Unreleased) +## 2.x.x (Unreleased) -## 2.0.3 (2022-08-05) - -- Add retry logic for `GetOperationStatus` requests that fail with an `OSError` - Reorganised code to use Poetry for dependency management. ## 2.0.2 (2022-05-04) - Better exception handling in automatic connection close diff --git a/pyproject.toml b/pyproject.toml index ac211b9d..de9160de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databricks-sql-connector" -version = "2.0.3" +version = "2.0.2" description = "Databricks SQL Connector for Python" authors = ["Databricks "] license = "Apache-2.0" diff --git a/src/databricks/sql/__init__.py b/src/databricks/sql/__init__.py index f1d979f6..8f67d465 100644 --- a/src/databricks/sql/__init__.py +++ b/src/databricks/sql/__init__.py @@ -28,7 +28,7 @@ def __repr__(self): DATE = DBAPITypeObject("date") ROWID = DBAPITypeObject() -__version__ = "2.0.3" +__version__ = "2.0.2" USER_AGENT_NAME = "PyDatabricksSqlConnector" # These two functions are pyhive legacy