Skip to content

Retry attempts that fail due to a connection timeout #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fa13d75
Explicitly catch OSError and socket.timeout errors, with automatic retry
susodapop Jul 28, 2022
b44d0b8
Add unit and e2e tests for retries on timeout behaviour.
susodapop Jul 28, 2022
22d2ac0
Add default delay to the retry policy
susodapop Jul 28, 2022
44f12b7
Don't make unit test take 5+ seconds
susodapop Jul 28, 2022
09cefec
Fix broken unit tests: test_retry_args_bounding
susodapop Jul 28, 2022
de96fce
Black thrift_backend.py
susodapop Jul 28, 2022
21c06d4
Only retry OSError's that mention Errno 110
susodapop Jul 28, 2022
befdf3d
Fix unnecessary indentation
susodapop Jul 28, 2022
fb1275b
Clarify docstrings after review feedback
susodapop Jul 29, 2022
c76ee65
Default retry_delay = None (not retryable)
susodapop Jul 29, 2022
7474834
This approach passes the e2e tests, but they take exactly 4 mins 51 secs
susodapop Aug 2, 2022
fa1fd50
Revert all changes since `main`
susodapop Aug 4, 2022
bf65b81
isolate delay bounding
susodapop Aug 2, 2022
a0d340e
Move error details scope up one-level.
susodapop Aug 2, 2022
a55cf9d
Retry GetOperationStatus if an OSError was raised during execution
susodapop Aug 2, 2022
38411a8
Log when a request is retried due to an OSError.
susodapop Aug 4, 2022
5096ef0
Update docstring for make_request
susodapop Aug 4, 2022
5c1ee79
Nit: unit tests show the .warn message is deprecated.
susodapop Aug 4, 2022
1f87a38
Black thrift_backend.py
susodapop Aug 4, 2022
baff3d5
Rerun black on thrift_backend.py
susodapop Aug 4, 2022
10016ea
Add comment about manual tests
susodapop Aug 5, 2022
4db4ad0
Bump to v2.0.3
susodapop Aug 5, 2022
767e34c
Revert "Bump to v2.0.3"
susodapop Aug 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import threading
from uuid import uuid4
from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED, create_default_context
import socket

import pyarrow
import thrift.transport.THttpClient
Expand Down Expand Up @@ -39,6 +40,7 @@
"_retry_delay_max": (float, 60, 5, 3600),
"_retry_stop_after_attempts_count": (int, 30, 1, 60),
"_retry_stop_after_attempts_duration": (float, 900, 1, 86400),
"_retry_delay_default": (float, 5, 1, 60),
}


Expand Down Expand Up @@ -79,6 +81,8 @@ def __init__(
# next calculated pre-retry delay would go past
# _retry_stop_after_attempts_duration, stop now.)
#
# _retry_delay_default (default: 5)
# used when Retry-After is not specified by the server
# _retry_stop_after_attempts_count
# The maximum number of times we should retry retryable requests (defaults to 24)
# _socket_timeout
Expand Down Expand Up @@ -243,7 +247,7 @@ def _handle_request_error(self, error_info, attempt, elapsed):
# FUTURE: Consider moving to https://github.com/litl/backoff or
# https://github.com/jd/tenacity for retry logic.
def make_request(self, method, request):
"""Execute given request, attempting retries when receiving HTTP 429/503.
"""Execute given request, attempting retries when TCP connection fils or when receiving HTTP 429/503.

For delay between attempts, honor the given Retry-After header, but with bounds.
Use lower bound of expontial-backoff based on _retry_delay_min,
Expand All @@ -261,8 +265,12 @@ def get_elapsed():
return time.time() - t0

def extract_retry_delay(attempt):
# encapsulate retry checks, returns None || delay-in-secs
# Retry IFF 429/503 code + Retry-After header set
"""
Encapsulate retry checks based on HTTP headers. Returns None || delay-in-secs

Retry IFF 429/503 code + Retry-After header set
"""

http_code = getattr(self._transport, "code", None)
retry_after = getattr(self._transport, "headers", {}).get("Retry-After")
if http_code in [429, 503] and retry_after:
Expand All @@ -279,24 +287,41 @@ def attempt_request(attempt):
# - non-None method_return -> success, return and be done
# - non-None retry_delay -> sleep delay before retry
# - error, error_message always set when available

error = None

# If retry_delay is None the request is treated as non-retryable
retry_delay = None
try:
logger.debug("Sending request: {}".format(request))
response = method(request)
logger.debug("Received response: {}".format(response))
return response
except Exception as error:
except socket.timeout as err:
# socket.timeout means the connection was timed out by the socket package
retry_delay = self._retry_delay_default
error_message = str(err)
error = err
except OSError as err:
# OSError 110 means the connection was timed out by the operating system
if "Errno 110" in str(err):
retry_delay = self._retry_delay_default
error_message = str(err)
error = err
except Exception as err:
retry_delay = extract_retry_delay(attempt)
error_message = ThriftBackend._extract_error_message_from_headers(
getattr(self._transport, "headers", {})
)
return RequestErrorInfo(
error=error,
error_message=error_message,
retry_delay=retry_delay,
http_code=getattr(self._transport, "code", None),
method=method.__name__,
request=request,
)
error = err
return RequestErrorInfo(
error=error,
error_message=error_message,
retry_delay=retry_delay,
http_code=getattr(self._transport, "code", None),
method=method.__name__,
request=request,
)

# The real work:
# - for each available attempt:
Expand Down
38 changes: 37 additions & 1 deletion tests/e2e/driver_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sys
import threading
import time
from unittest import loader, skipIf, skipUnless, TestCase
from unittest import loader, skipIf, skipUnless, TestCase, mock
from uuid import uuid4

import numpy as np
Expand Down Expand Up @@ -360,6 +360,42 @@ def execute_really_long_query():
cursor.execute("SELECT * FROM range(3)")
self.assertEqual(len(cursor.fetchall()), 3)

def test_retry_after_connection_timeout(self):

import socket
from databricks.sql.thrift_api.TCLIService import ttypes

ATTEMPTS_TO_TRY = 2

with self.cursor({}) as cursor:
cursor.execute("SELECT id FROM RANGE(10)")
op_handle = cursor.active_op_handle
thrift_backend = cursor.thrift_backend


thrift_backend._retry_stop_after_attempts_count = ATTEMPTS_TO_TRY
req = ttypes.TGetOperationStatusReq(
operationHandle=op_handle,
getProgressUpdate=False,
)


with self.assertRaises(OperationalError) as cm:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be more specific to assert RequestError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually removed the e2e tests for this behaviour because they were no more useful than unit tests. I'll add an e2e test for this scenario after we merge support for http proxies. Then we can simulate real timeouts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the unit tests asserts on RequestError btw.

with mock.patch("socket.create_connection") as mock_create_connection:
mock_create_connection.side_effect = OSError("[Errno 110]: Connection timed out")
thrift_backend.make_request(thrift_backend._client.GetOperationStatus, req)

self.assertIn("{0}/{0}".format(ATTEMPTS_TO_TRY), cm.exception.message_with_context())

with self.assertRaises(OperationalError) as cm:
with mock.patch("socket.create_connection") as mock_create_connection:
mock_create_connection.side_effect = socket.timeout
thrift_backend.make_request(thrift_backend._client.GetOperationStatus, req)

self.assertIn("{0}/{0}".format(ATTEMPTS_TO_TRY), cm.exception.message_with_context())



@skipIf(pysql_has_version('<', '2'), 'requires pysql v2')
def test_can_execute_command_after_failure(self):
with self.cursor({}) as cursor:
Expand Down
58 changes: 58 additions & 0 deletions tests/unit/test_thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def retry_policy_factory():
"_retry_delay_max": (float, 60, None, None),
"_retry_stop_after_attempts_count": (int, 30, None, None),
"_retry_stop_after_attempts_duration": (float, 900, None, None),
"_retry_delay_default": (float, 5, 0, 10),
}


Expand Down Expand Up @@ -984,6 +985,63 @@ def test_make_request_wont_retry_if_headers_not_present(self, t_transport_class)

self.assertIn("This method fails", str(cm.exception.message_with_context()))

@patch("thrift.transport.THttpClient.THttpClient")
def test_will_retry_on_connection_timeout(self, t_transport_class):

import socket

mock_method = Mock()
mock_method.__name__ = "method name"
mock_method.side_effect = socket.timeout

thrift_backend = ThriftBackend(
"foobar",
443,
"path",
[],
_retry_stop_after_attempts_count=2,
_retry_delay_default=0.25
)

with self.assertRaises(OperationalError) as cm:
thrift_backend.make_request(mock_method, Mock())

self.assertIn("2/2", cm.exception.message_with_context())

mock_method = Mock()
mock_method.__name__ = "method name"
mock_method.side_effect = OSError("[Errno 110] Connection timed out")

with self.assertRaises(OperationalError) as cm:
thrift_backend.make_request(mock_method, Mock())

self.assertIn("2/2", cm.exception.message_with_context())

@patch("thrift.transport.THttpClient.THttpClient")
def test_will_not_retry_on_non_timeout_oserror(self, t_transport_class):



mock_method = Mock()
mock_method.__name__ = "method name"
mock_method.side_effect = OSError("I am not a timeout error")

thrift_backend = ThriftBackend(
"foobar",
443,
"path",
[],
_retry_stop_after_attempts_count=2,
_retry_delay_default=0.25
)

with self.assertRaises(OperationalError) as cm:
thrift_backend.make_request(mock_method, Mock())

self.assertIn("I am not a timeout error", str(cm.exception.message_with_context()))
self.assertIn("1/2", cm.exception.message_with_context())


@patch("thrift.transport.THttpClient.THttpClient")
def test_make_request_wont_retry_if_error_code_not_429_or_503(self, t_transport_class):
t_transport_instance = t_transport_class.return_value
Expand Down