Skip to content

Commit c59c393

Browse files
author
Jesse
authored
Retry attempts that fail due to a connection timeout (databricks#24)
* Isolate delay bounding logic * Move error details scope up one-level. * Retry GetOperationStatus if an OSError was raised during execution. Add retry_delay_default to use in this case. * Log when a request is retried due to an OSError. Emit warnings for unexpected OSError codes * Update docstring for make_request * Nit: unit tests show the .warn message is deprecated. DeprecationWarning: The 'warn' function is deprecated, use 'warning' instead Signed-off-by: Jesse Whitehouse <[email protected]>
1 parent 2961524 commit c59c393

File tree

2 files changed

+118
-15
lines changed

2 files changed

+118
-15
lines changed

src/databricks/sql/thrift_backend.py

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from decimal import Decimal
2+
import errno
23
import logging
34
import math
45
import time
@@ -15,6 +16,9 @@
1516

1617
from databricks.sql.thrift_api.TCLIService import TCLIService, ttypes
1718
from databricks.sql import *
19+
from databricks.sql.thrift_api.TCLIService.TCLIService import (
20+
Client as TCLIServiceClient,
21+
)
1822
from databricks.sql.utils import (
1923
ArrowQueue,
2024
ExecuteResponse,
@@ -39,6 +43,7 @@
3943
"_retry_delay_max": (float, 60, 5, 3600),
4044
"_retry_stop_after_attempts_count": (int, 30, 1, 60),
4145
"_retry_stop_after_attempts_duration": (float, 900, 1, 86400),
46+
"_retry_delay_default": (float, 5, 1, 60),
4247
}
4348

4449

@@ -71,6 +76,8 @@ def __init__(
7176
# _retry_delay_min (default: 1)
7277
# _retry_delay_max (default: 60)
7378
# {min,max} pre-retry delay bounds
79+
# _retry_delay_default (default: 5)
80+
# Only used when GetOperationStatus fails due to a TCP/OS Error.
7481
# _retry_stop_after_attempts_count (default: 30)
7582
# total max attempts during retry sequence
7683
# _retry_stop_after_attempts_duration (default: 900)
@@ -158,7 +165,7 @@ def _initialize_retry_args(self, kwargs):
158165
"retry parameter: {} given_or_default {}".format(key, given_or_default)
159166
)
160167
if bound != given_or_default:
161-
logger.warn(
168+
logger.warning(
162169
"Override out of policy retry parameter: "
163170
+ "{} given {}, restricted to {}".format(
164171
key, given_or_default, bound
@@ -243,7 +250,9 @@ def _handle_request_error(self, error_info, attempt, elapsed):
243250
# FUTURE: Consider moving to https://github.com/litl/backoff or
244251
# https://github.com/jd/tenacity for retry logic.
245252
def make_request(self, method, request):
246-
"""Execute given request, attempting retries when receiving HTTP 429/503.
253+
"""Execute given request, attempting retries when
254+
1. Receiving HTTP 429/503 from server
255+
2. OSError is raised during a GetOperationStatus
247256
248257
For delay between attempts, honor the given Retry-After header, but with bounds.
249258
Use lower bound of expontial-backoff based on _retry_delay_min,
@@ -260,17 +269,21 @@ def make_request(self, method, request):
260269
def get_elapsed():
261270
return time.time() - t0
262271

272+
def bound_retry_delay(attempt, proposed_delay):
273+
"""bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay]"""
274+
delay = int(proposed_delay)
275+
delay = max(delay, self._retry_delay_min * math.pow(1.5, attempt - 1))
276+
delay = min(delay, self._retry_delay_max)
277+
return delay
278+
263279
def extract_retry_delay(attempt):
264280
# encapsulate retry checks, returns None || delay-in-secs
265281
# Retry IFF 429/503 code + Retry-After header set
266282
http_code = getattr(self._transport, "code", None)
267283
retry_after = getattr(self._transport, "headers", {}).get("Retry-After")
268284
if http_code in [429, 503] and retry_after:
269285
# bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay]
270-
delay = int(retry_after)
271-
delay = max(delay, self._retry_delay_min * math.pow(1.5, attempt - 1))
272-
delay = min(delay, self._retry_delay_max)
273-
return delay
286+
return bound_retry_delay(attempt, int(retry_after))
274287
return None
275288

276289
def attempt_request(attempt):
@@ -279,24 +292,57 @@ def attempt_request(attempt):
279292
# - non-None method_return -> success, return and be done
280293
# - non-None retry_delay -> sleep delay before retry
281294
# - error, error_message always set when available
295+
296+
error, error_message, retry_delay = None, None, None
282297
try:
283298
logger.debug("Sending request: {}".format(request))
284299
response = method(request)
285300
logger.debug("Received response: {}".format(response))
286301
return response
287-
except Exception as error:
302+
except OSError as err:
303+
error = err
304+
error_message = str(err)
305+
306+
gos_name = TCLIServiceClient.GetOperationStatus.__name__
307+
if method.__name__ == gos_name:
308+
retry_delay = bound_retry_delay(attempt, self._retry_delay_default)
309+
310+
# fmt: off
311+
# The built-in errno package encapsulates OSError codes, which are OS-specific.
312+
# log.info for errors we believe are not unusual or unexpected. log.warn for
313+
# for others like EEXIST, EBADF, ERANGE which are not expected in this context.
314+
#
315+
# I manually tested this retry behaviour using mitmweb and confirmed that
316+
# GetOperationStatus requests are retried when I forced network connection
317+
# interruptions / timeouts / reconnects. See #24 for more info.
318+
# | Debian | Darwin |
319+
info_errs = [ # |--------|--------|
320+
errno.ESHUTDOWN, # | 32 | 32 |
321+
errno.EAFNOSUPPORT, # | 97 | 47 |
322+
errno.ECONNRESET, # | 104 | 54 |
323+
errno.ETIMEDOUT, # | 110 | 60 |
324+
]
325+
326+
# fmt: on
327+
log_string = f"{gos_name} failed with code {err.errno} and will attempt to retry"
328+
if err.errno in info_errs:
329+
logger.info(log_string)
330+
else:
331+
logger.warning(log_string)
332+
except Exception as err:
333+
error = err
288334
retry_delay = extract_retry_delay(attempt)
289335
error_message = ThriftBackend._extract_error_message_from_headers(
290336
getattr(self._transport, "headers", {})
291337
)
292-
return RequestErrorInfo(
293-
error=error,
294-
error_message=error_message,
295-
retry_delay=retry_delay,
296-
http_code=getattr(self._transport, "code", None),
297-
method=method.__name__,
298-
request=request,
299-
)
338+
return RequestErrorInfo(
339+
error=error,
340+
error_message=error_message,
341+
retry_delay=retry_delay,
342+
http_code=getattr(self._transport, "code", None),
343+
method=method.__name__,
344+
request=request,
345+
)
300346

301347
# The real work:
302348
# - for each available attempt:

tests/unit/test_thrift_backend.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def retry_policy_factory():
1919
"_retry_delay_max": (float, 60, None, None),
2020
"_retry_stop_after_attempts_count": (int, 30, None, None),
2121
"_retry_stop_after_attempts_duration": (float, 900, None, None),
22+
"_retry_delay_default": (float, 5, 1, 60)
2223
}
2324

2425

@@ -968,6 +969,62 @@ def test_handle_execute_response_sets_active_op_handle(self):
968969

969970
self.assertEqual(mock_resp.operationHandle, mock_cursor.active_op_handle)
970971

972+
@patch("thrift.transport.THttpClient.THttpClient")
973+
@patch("databricks.sql.thrift_api.TCLIService.TCLIService.Client.GetOperationStatus")
974+
@patch("databricks.sql.thrift_backend._retry_policy", new_callable=retry_policy_factory)
975+
def test_make_request_will_retry_GetOperationStatus(
976+
self, mock_retry_policy, mock_GetOperationStatus, t_transport_class):
977+
978+
import thrift, errno
979+
from databricks.sql.thrift_api.TCLIService.TCLIService import Client
980+
from databricks.sql.exc import RequestError
981+
from databricks.sql.utils import NoRetryReason
982+
983+
this_gos_name = "GetOperationStatus"
984+
mock_GetOperationStatus.__name__ = this_gos_name
985+
mock_GetOperationStatus.side_effect = OSError(errno.ETIMEDOUT, "Connection timed out")
986+
987+
protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocol(t_transport_class)
988+
client = Client(protocol)
989+
990+
req = ttypes.TGetOperationStatusReq(
991+
operationHandle=self.operation_handle,
992+
getProgressUpdate=False,
993+
)
994+
995+
EXPECTED_RETRIES = 2
996+
997+
thrift_backend = ThriftBackend(
998+
"foobar",
999+
443,
1000+
"path", [],
1001+
_retry_stop_after_attempts_count=EXPECTED_RETRIES,
1002+
_retry_delay_default=1)
1003+
1004+
1005+
with self.assertRaises(RequestError) as cm:
1006+
thrift_backend.make_request(client.GetOperationStatus, req)
1007+
1008+
self.assertEqual(NoRetryReason.OUT_OF_ATTEMPTS.value, cm.exception.context["no-retry-reason"])
1009+
self.assertEqual(f'{EXPECTED_RETRIES}/{EXPECTED_RETRIES}', cm.exception.context["attempt"])
1010+
1011+
# Unusual OSError code
1012+
mock_GetOperationStatus.side_effect = OSError(errno.EEXIST, "File does not exist")
1013+
1014+
with self.assertLogs("databricks.sql.thrift_backend", level=logging.WARNING) as cm:
1015+
with self.assertRaises(RequestError):
1016+
thrift_backend.make_request(client.GetOperationStatus, req)
1017+
1018+
# There should be two warning log messages: one for each retry
1019+
self.assertEqual(len(cm.output), EXPECTED_RETRIES)
1020+
1021+
# The warnings should be identical
1022+
self.assertEqual(cm.output[1], cm.output[0])
1023+
1024+
# The warnings should include this text
1025+
self.assertIn(f"{this_gos_name} failed with code {errno.EEXIST} and will attempt to retry", cm.output[0])
1026+
1027+
9711028
@patch("thrift.transport.THttpClient.THttpClient")
9721029
def test_make_request_wont_retry_if_headers_not_present(self, t_transport_class):
9731030
t_transport_instance = t_transport_class.return_value

0 commit comments

Comments
 (0)