Skip to content

Commit 31a3f1c

Browse files
authored
feat: add exponential backoff strategy for retry (#140)
1 parent bfa0ac4 commit 31a3f1c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2517
-371
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## 1.10.0 [unreleased]
22

33
### Features
4+
1. [#140](https://github.com/influxdata/influxdb-client-python/pull/140): Added exponential backoff strategy for batching writes, Allowed to configure default retry strategy. Default value for `retry_interval` is 5_000 milliseconds.
45
1. [#136](https://github.com/influxdata/influxdb-client-python/pull/136): Allows users to skip of verifying SSL certificate
56
1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties
67
1. [#141](https://github.com/influxdata/influxdb-client-python/pull/141): Added possibility to use datetime nanoseconds precision by `pandas.Timestamp`

README.rst

+31-2
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,16 @@ The batching is configurable by ``write_options``\ :
250250
- ``0``
251251
* - **retry_interval**
252252
- the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header.
253-
- ``1000``
253+
- ``5000``
254+
* - **max_retries**
255+
- the number of max retries when write fails
256+
- ``3``
257+
* - **max_retry_delay**
258+
- the maximum delay between each retry attempt in milliseconds
259+
- ``180_000``
260+
* - **exponential_base**
261+
- the base for the exponential retry delay, the next delay is computed as ``retry_interval * exponential_base^(attempts-1) + random(jitter_interval)``
262+
- ``5``
254263

255264

256265
.. code-block:: python
@@ -265,7 +274,10 @@ The batching is configurable by ``write_options``\ :
265274
_write_client = _client.write_api(write_options=WriteOptions(batch_size=500,
266275
flush_interval=10_000,
267276
jitter_interval=2_000,
268-
retry_interval=5_000))
277+
retry_interval=5_000,
278+
max_retries=5,
279+
max_retry_delay=30_000,
280+
exponential_base=2))
269281
270282
"""
271283
Write Line Protocol formatted as string
@@ -899,6 +911,23 @@ The following forward compatible APIs are available:
899911

900912
For detail info see `InfluxDB 1.8 example <examples/influxdb_18_example.py>`_.
901913

914+
HTTP Retry Strategy
915+
^^^^^^^^^^^^^^^^^^^
916+
By default the client uses a retry strategy only for batching writes (for more info see `Batching`_).
917+
For other HTTP requests there is no one retry strategy, but it could be configured by ``retries``
918+
parameter of ``InfluxDBClient``.
919+
920+
For more info about how configure HTTP retry see details in `urllib3 documentation <https://urllib3.readthedocs.io/en/latest/reference/index.html?highlight=retry#urllib3.Retry>`_.
921+
922+
.. code-block:: python
923+
924+
from urllib3 import Retry
925+
926+
from influxdb_client import InfluxDBClient
927+
928+
retries = Retry(connect=5, read=2, redirect=5)
929+
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", retries=retries)
930+
902931
Nanosecond precision
903932
^^^^^^^^^^^^^^^^^^^^
904933

influxdb_client/api_client.py

+26-15
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ class ApiClient(object):
6363
_pool = None
6464

6565
def __init__(self, configuration=None, header_name=None, header_value=None,
66-
cookie=None, pool_threads=None):
66+
cookie=None, pool_threads=None, retries=False):
6767
"""Initialize generic API client."""
6868
if configuration is None:
6969
configuration = Configuration()
7070
self.configuration = configuration
7171
self.pool_threads = pool_threads
7272

73-
self.rest_client = rest.RESTClientObject(configuration)
73+
self.rest_client = rest.RESTClientObject(configuration, retries=retries)
7474
self.default_headers = {}
7575
if header_name is not None:
7676
self.default_headers[header_name] = header_value
@@ -113,7 +113,7 @@ def __call_api(
113113
query_params=None, header_params=None, body=None, post_params=None,
114114
files=None, response_type=None, auth_settings=None,
115115
_return_http_data_only=None, collection_formats=None,
116-
_preload_content=True, _request_timeout=None):
116+
_preload_content=True, _request_timeout=None, urlopen_kw=None):
117117

118118
config = self.configuration
119119

@@ -164,12 +164,14 @@ def __call_api(
164164
# request url
165165
url = self.configuration.host + resource_path
166166

167+
urlopen_kw = urlopen_kw or {}
168+
167169
# perform request and return response
168170
response_data = self.request(
169171
method, url, query_params=query_params, headers=header_params,
170172
post_params=post_params, body=body,
171173
_preload_content=_preload_content,
172-
_request_timeout=_request_timeout)
174+
_request_timeout=_request_timeout, **urlopen_kw)
173175

174176
self.last_response = response_data
175177

@@ -295,7 +297,7 @@ def call_api(self, resource_path, method,
295297
body=None, post_params=None, files=None,
296298
response_type=None, auth_settings=None, async_req=None,
297299
_return_http_data_only=None, collection_formats=None,
298-
_preload_content=True, _request_timeout=None):
300+
_preload_content=True, _request_timeout=None, urlopen_kw=None):
299301
"""Make the HTTP request (synchronous) and Return deserialized data.
300302
301303
To make an async_req request, set the async_req parameter.
@@ -325,6 +327,8 @@ def call_api(self, resource_path, method,
325327
number provided, it will be total request
326328
timeout. It can also be a pair (tuple) of
327329
(connection, read) timeouts.
330+
:param urlopen_kw: Additional parameters are passed to
331+
:meth:`urllib3.request.RequestMethods.request`
328332
:return:
329333
If async_req parameter is True,
330334
the request will be called asynchronously.
@@ -338,7 +342,7 @@ def call_api(self, resource_path, method,
338342
body, post_params, files,
339343
response_type, auth_settings,
340344
_return_http_data_only, collection_formats,
341-
_preload_content, _request_timeout)
345+
_preload_content, _request_timeout, urlopen_kw)
342346
else:
343347
thread = self.pool.apply_async(self.__call_api, (resource_path,
344348
method, path_params, query_params,
@@ -347,64 +351,71 @@ def call_api(self, resource_path, method,
347351
response_type, auth_settings,
348352
_return_http_data_only,
349353
collection_formats,
350-
_preload_content, _request_timeout))
354+
_preload_content, _request_timeout, urlopen_kw))
351355
return thread
352356

353357
def request(self, method, url, query_params=None, headers=None,
354358
post_params=None, body=None, _preload_content=True,
355-
_request_timeout=None):
359+
_request_timeout=None, **urlopen_kw):
356360
"""Make the HTTP request using RESTClient."""
357361
if method == "GET":
358362
return self.rest_client.GET(url,
359363
query_params=query_params,
360364
_preload_content=_preload_content,
361365
_request_timeout=_request_timeout,
362-
headers=headers)
366+
headers=headers,
367+
**urlopen_kw)
363368
elif method == "HEAD":
364369
return self.rest_client.HEAD(url,
365370
query_params=query_params,
366371
_preload_content=_preload_content,
367372
_request_timeout=_request_timeout,
368-
headers=headers)
373+
headers=headers,
374+
**urlopen_kw)
369375
elif method == "OPTIONS":
370376
return self.rest_client.OPTIONS(url,
371377
query_params=query_params,
372378
headers=headers,
373379
post_params=post_params,
374380
_preload_content=_preload_content,
375381
_request_timeout=_request_timeout,
376-
body=body)
382+
body=body,
383+
**urlopen_kw)
377384
elif method == "POST":
378385
return self.rest_client.POST(url,
379386
query_params=query_params,
380387
headers=headers,
381388
post_params=post_params,
382389
_preload_content=_preload_content,
383390
_request_timeout=_request_timeout,
384-
body=body)
391+
body=body,
392+
**urlopen_kw)
385393
elif method == "PUT":
386394
return self.rest_client.PUT(url,
387395
query_params=query_params,
388396
headers=headers,
389397
post_params=post_params,
390398
_preload_content=_preload_content,
391399
_request_timeout=_request_timeout,
392-
body=body)
400+
body=body,
401+
**urlopen_kw)
393402
elif method == "PATCH":
394403
return self.rest_client.PATCH(url,
395404
query_params=query_params,
396405
headers=headers,
397406
post_params=post_params,
398407
_preload_content=_preload_content,
399408
_request_timeout=_request_timeout,
400-
body=body)
409+
body=body,
410+
**urlopen_kw)
401411
elif method == "DELETE":
402412
return self.rest_client.DELETE(url,
403413
query_params=query_params,
404414
headers=headers,
405415
_preload_content=_preload_content,
406416
_request_timeout=_request_timeout,
407-
body=body)
417+
body=body,
418+
**urlopen_kw)
408419
else:
409420
raise ValueError(
410421
"http method must be `GET`, `HEAD`, `OPTIONS`,"

influxdb_client/client/influxdb_client.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org
3232
:param enable_gzip: Enable Gzip compression for http requests. Currently only the "Write" and "Query" endpoints
3333
supports the Gzip compression.
3434
:param org: organization name (used as a default in query and write API)
35-
:key verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
35+
:key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
36+
:key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests
37+
except batching writes. As a default there is no one retry strategy.
3638
3739
"""
3840
self.url = url
@@ -55,8 +57,10 @@ def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org
5557
auth_header_name = "Authorization"
5658
auth_header_value = "Token " + auth_token
5759

60+
retries = kwargs.get('retries', False)
61+
5862
self.api_client = ApiClient(configuration=conf, header_name=auth_header_name,
59-
header_value=auth_header_value)
63+
header_value=auth_header_value, retries=retries)
6064

6165
@classmethod
6266
def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False):
@@ -217,7 +221,6 @@ def health(self) -> HealthCheck:
217221
health = health_service.get_health()
218222
return health
219223
except Exception as e:
220-
print(e)
221224
return HealthCheck(name="influxdb", message=str(e), status="fail")
222225

223226
def ready(self) -> Ready:

influxdb_client/client/write/retry.py

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""Implementation for Retry strategy during HTTP requests."""
2+
3+
from itertools import takewhile
4+
from random import random
5+
6+
from urllib3 import Retry
7+
8+
9+
class WritesRetry(Retry):
10+
"""
11+
Writes retry configuration.
12+
13+
:param int jitter_interval: random milliseconds when retrying writes
14+
:param int max_retry_delay: maximum delay when retrying write
15+
:param int exponential_base: base for the exponential retry delay, the next delay is computed as
16+
`backoff_factor * exponential_base^(attempts-1) + random(jitter_interval)`
17+
"""
18+
19+
def __init__(self, jitter_interval=0, max_retry_delay=180, exponential_base=5, **kw):
20+
"""Initialize defaults."""
21+
super().__init__(**kw)
22+
self.jitter_interval = jitter_interval
23+
self.max_retry_delay = max_retry_delay
24+
self.exponential_base = exponential_base
25+
26+
def new(self, **kw):
27+
"""Initialize defaults."""
28+
if 'jitter_interval' not in kw:
29+
kw['jitter_interval'] = self.jitter_interval
30+
if 'max_retry_delay' not in kw:
31+
kw['max_retry_delay'] = self.max_retry_delay
32+
if 'exponential_base' not in kw:
33+
kw['exponential_base'] = self.exponential_base
34+
return super().new(**kw)
35+
36+
def is_retry(self, method, status_code, has_retry_after=False):
37+
"""is_retry doesn't require retry_after header. If there is not Retry-After we will use backoff."""
38+
if not self._is_method_retryable(method):
39+
return False
40+
41+
return self.total and (status_code >= 429)
42+
43+
def get_backoff_time(self):
44+
"""Variant of exponential backoff with initial and max delay and a random jitter delay."""
45+
# We want to consider only the last consecutive errors sequence (Ignore redirects).
46+
consecutive_errors_len = len(
47+
list(
48+
takewhile(lambda x: x.redirect_location is None, reversed(self.history))
49+
)
50+
)
51+
# First fail doesn't increase backoff
52+
consecutive_errors_len -= 1
53+
if consecutive_errors_len < 0:
54+
return 0
55+
56+
backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) + self._jitter_delay()
57+
return min(self.max_retry_delay, backoff_value)
58+
59+
def get_retry_after(self, response):
60+
"""Get the value of Retry-After header and append random jitter delay."""
61+
retry_after = super().get_retry_after(response)
62+
if retry_after:
63+
retry_after += self._jitter_delay()
64+
return retry_after
65+
66+
def _jitter_delay(self):
67+
return self.jitter_interval * random()

0 commit comments

Comments
 (0)