Skip to content

Commit 9a545ad

Browse files
committed
chore: add support to configure retries
1 parent 2f36b2d commit 9a545ad

File tree

5 files changed

+59
-3
lines changed

5 files changed

+59
-3
lines changed

.circleci/config.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ jobs:
130130
pip install -e . --user
131131
pip install -e .\[async\] --user
132132
pip install -e .\[extra\] --user
133+
pip install aiohttp-retry --user
133134
export PYTHONPATH="$PWD"
134135
cd examples
135136
python ./monitoring_and_alerting.py
@@ -139,6 +140,7 @@ jobs:
139140
python ./asynchronous.py
140141
python ./asynchronous_management.py
141142
python ./asynchronous_batching.py
143+
python ./asynchronous_retry.py
142144
check-sphinx:
143145
docker:
144146
- image: *default-python

examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,6 @@
3333
## Asynchronous
3434
- [asynchronous.py](asynchronous.py) - How to use Asyncio with InfluxDB client
3535
- [asynchronous_management.py](asynchronous_management.py) - How to use asynchronous Management API
36-
- [asynchronous_batching.py](asynchronous_batching.py) - HHow to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for Asyncio client
36+
- [asynchronous_batching.py](asynchronous_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches
37+
- [asynchronous_retry.py](asynchronous_retry.py) - How to use [aiohttp-retry](https://github.com/inyutin/aiohttp_retry) to configure retries
3738

examples/asynchronous_retry.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""
2+
How to use `aiohttp-retry` with async client.
3+
4+
This example depends on `aiohttp_retry <https://github.com/inyutin/aiohttp_retry>`_.
5+
Install ``aiohttp_retry`` by: pip install aiohttp-retry.
6+
7+
"""
8+
import asyncio
9+
10+
from aiohttp_retry import ExponentialRetry, RetryClient
11+
12+
from influxdb_client import Point
13+
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
14+
15+
16+
async def main():
17+
"""
18+
Configure Retries - for more info see https://github.com/inyutin/aiohttp_retry
19+
"""
20+
retry_options = ExponentialRetry(attempts=3)
21+
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org",
22+
client_session_type=RetryClient,
23+
client_session_kwargs={"retry_options": retry_options}) as client:
24+
"""
25+
Write data:
26+
"""
27+
print(f"\n------- Written data: -------\n")
28+
write_api = client.write_api()
29+
_point1 = Point("async_m").tag("location", "Prague").field("temperature", 25.3)
30+
_point2 = Point("async_m").tag("location", "New York").field("temperature", 24.3)
31+
successfully = await write_api.write(bucket="my-bucket", record=[_point1, _point2])
32+
print(f" > successfully: {successfully}")
33+
34+
"""
35+
Query: Stream of FluxRecords
36+
"""
37+
print(f"\n------- Query: Stream of FluxRecords -------\n")
38+
query_api = client.query_api()
39+
records = await query_api.query_stream('from(bucket:"my-bucket") '
40+
'|> range(start: -10m) '
41+
'|> filter(fn: (r) => r["_measurement"] == "async_m")')
42+
async for record in records:
43+
print(record)
44+
45+
46+
if __name__ == "__main__":
47+
asyncio.run(main())

influxdb_client/_async/rest.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,14 @@ def __init__(self, configuration, pools_size=4, maxsize=None, **kwargs):
122122
timeout = aiohttp.client.DEFAULT_TIMEOUT
123123

124124
# https pool manager
125-
self.pool_manager = aiohttp.ClientSession(
125+
_client_session_type = kwargs.get('client_session_type', aiohttp.ClientSession)
126+
_client_session_kwargs = kwargs.get('client_session_kwargs', {})
127+
self.pool_manager = _client_session_type(
126128
connector=connector,
127129
trust_env=True,
128130
timeout=timeout,
129-
trace_configs=[trace_config] if configuration.debug else None
131+
trace_configs=[trace_config] if configuration.debug else None,
132+
**_client_session_kwargs
130133
)
131134

132135
async def close(self):

influxdb_client/client/influxdb_client_async.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ def __init__(self, url, token, org: str = None, debug=None, timeout=10_000, enab
4040
(defaults to false, don't set to true when talking to InfluxDB 2)
4141
:key bool allow_redirects: If set to ``False``, do not follow HTTP redirects. ``True`` by default.
4242
:key int max_redirects: Maximum number of HTTP redirects to follow. ``10`` by default.
43+
:key dict client_session_kwargs: Additional configuration arguments for :class:`~aiohttp.ClientSession`
44+
:key type client_session_type: Type of aiohttp client to use. Useful for third party wrappers like
45+
``aiohttp-retry``. :class:`~aiohttp.ClientSession` by default.
4346
:key list[str] profilers: list of enabled Flux profilers
4447
"""
4548
super().__init__(url=url, token=token, org=org, debug=debug, timeout=timeout, enable_gzip=enable_gzip, **kwargs)

0 commit comments

Comments
 (0)