-
Notifications
You must be signed in to change notification settings - Fork 186
feat: implement exponential random retry strategy #225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
62622e4
d642c74
659e934
56bb02d
03b5ff4
5b16095
34767da
9c8977a
0b3c5b0
cd08898
61c0593
97da7ae
3e641f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -256,17 +256,20 @@ The batching is configurable by ``write_options``\ : | |
- the number of milliseconds to increase the batch flush interval by a random amount | ||
- ``0`` | ||
* - **retry_interval** | ||
- the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. | ||
- the number of milliseconds to retry first unsuccessful write. The next retry delay is computed using exponential random backoff. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. | ||
- ``5000`` | ||
* - **max_retry_time** | ||
- maximum total retry timout in milliseconds. | ||
- ``180_000`` | ||
* - **max_retries** | ||
- the number of max retries when write fails | ||
- ``3`` | ||
- ``10`` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this correct? |
||
* - **max_retry_delay** | ||
- the maximum delay between each retry attempt in milliseconds | ||
- ``180_000`` | ||
- ``125_000`` | ||
* - **exponential_base** | ||
- the base for the exponential retry delay, the next delay is computed as ``retry_interval * exponential_base^(attempts-1) + random(jitter_interval)`` | ||
- ``5`` | ||
- the base for the exponential retry delay, the next delay is computed using random exponential backoff. Example for ``retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]`` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add note how looks formula to compute delay. |
||
- ``2`` | ||
|
||
|
||
.. code-block:: python | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -1,10 +1,12 @@ | ||||
"""Implementation for Retry strategy during HTTP requests.""" | ||||
|
||||
import logging | ||||
from datetime import datetime, timedelta | ||||
from itertools import takewhile | ||||
from random import random | ||||
|
||||
from urllib3 import Retry | ||||
from urllib3.exceptions import MaxRetryError, ResponseError | ||||
|
||||
from influxdb_client.client.exceptions import InfluxDBError | ||||
|
||||
|
@@ -15,28 +17,45 @@ class WritesRetry(Retry): | |||
""" | ||||
Writes retry configuration. | ||||
|
||||
:param int jitter_interval: random milliseconds when retrying writes | ||||
:param int max_retry_delay: maximum delay when retrying write | ||||
:param int exponential_base: base for the exponential retry delay, the next delay is computed as | ||||
`backoff_factor * exponential_base^(attempts-1) + random(jitter_interval)` | ||||
:param int max_retry_time: maximum total retry timout in seconds, attempt after this timout throws MaxRetryError | ||||
:param int total: maximum number of retries | ||||
:param num backoff_factor: initial first retry delay range in seconds | ||||
:param num max_retry_delay: maximum delay when retrying write in seconds | ||||
:param int exponential_base: base for the exponential retry delay, | ||||
|
||||
The next delay is computed as random value between range | ||||
`backoff_factor * exponential_base^(attempts-1)` and `backoff_factor * exponential_base^(attempts) | ||||
|
||||
Example: for backoff_factor=5, exponential_base=2, max_retry_delay=125, total=5 | ||||
retry delays are random distributed values within the ranges of | ||||
[5-10, 10-20, 20-40, 40-80, 80-125] | ||||
|
||||
""" | ||||
|
||||
def __init__(self, jitter_interval=0, max_retry_delay=180, exponential_base=5, **kw): | ||||
def __init__(self, max_retry_time=180, total=10, backoff_factor=5, max_retry_delay=125, exponential_base=2, **kw): | ||||
"""Initialize defaults.""" | ||||
super().__init__(**kw) | ||||
self.jitter_interval = jitter_interval | ||||
self.total = total | ||||
self.backoff_factor = backoff_factor | ||||
self.max_retry_delay = max_retry_delay | ||||
self.max_retry_time = max_retry_time | ||||
self.exponential_base = exponential_base | ||||
self.retry_timeout = datetime.now() + timedelta(seconds=max_retry_time) | ||||
|
||||
def new(self, **kw): | ||||
"""Initialize defaults.""" | ||||
if 'jitter_interval' not in kw: | ||||
kw['jitter_interval'] = self.jitter_interval | ||||
if 'max_retry_delay' not in kw: | ||||
kw['max_retry_delay'] = self.max_retry_delay | ||||
|
||||
if 'max_retry_time' not in kw: | ||||
kw['max_retry_time'] = self.max_retry_time | ||||
|
||||
if 'exponential_base' not in kw: | ||||
kw['exponential_base'] = self.exponential_base | ||||
return super().new(**kw) | ||||
|
||||
new = super().new(**kw) | ||||
new.retry_timeout = self.retry_timeout | ||||
return new | ||||
|
||||
def is_retry(self, method, status_code, has_retry_after=False): | ||||
"""is_retry doesn't require retry_after header. If there is not Retry-After we will use backoff.""" | ||||
|
@@ -58,18 +77,27 @@ def get_backoff_time(self): | |||
if consecutive_errors_len < 0: | ||||
return 0 | ||||
|
||||
backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) + self._jitter_delay() | ||||
return min(self.max_retry_delay, backoff_value) | ||||
range_start = self.backoff_factor | ||||
range_stop = self.backoff_factor * self.exponential_base | ||||
|
||||
i = 1 | ||||
while i <= consecutive_errors_len: | ||||
i += 1 | ||||
range_start = range_stop | ||||
range_stop = range_stop * self.exponential_base | ||||
if range_stop > self.max_retry_delay: | ||||
break | ||||
|
||||
def get_retry_after(self, response): | ||||
"""Get the value of Retry-After header and append random jitter delay.""" | ||||
retry_after = super().get_retry_after(response) | ||||
if retry_after: | ||||
retry_after += self._jitter_delay() | ||||
return retry_after | ||||
if range_stop > self.max_retry_delay: | ||||
range_stop = self.max_retry_delay | ||||
|
||||
return range_start + (range_stop - range_start) * self._random() | ||||
|
||||
def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None): | ||||
"""Return a new Retry object with incremented retry counters.""" | ||||
if self.retry_timeout < datetime.now(): | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it also react the same way when retry is disabled? (max_retry_time is 0) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, max_retry_time=0 means retry is disabled, here is the test:
|
||||
raise MaxRetryError(_pool, url, error or ResponseError("max_retry_time exceeded")) | ||||
|
||||
new_retry = super().increment(method, url, response, error, _pool, _stacktrace) | ||||
|
||||
if response is not None: | ||||
|
@@ -87,5 +115,5 @@ def increment(self, method=None, url=None, response=None, error=None, _pool=None | |||
|
||||
return new_retry | ||||
|
||||
def _jitter_delay(self): | ||||
return self.jitter_interval * random() | ||||
def _random(self): | ||||
return random() |
Uh oh!
There was an error while loading. Please reload this page.