Skip to content

Commit 20c867d

Browse files
authored
feat: add Async version of the InfluxDB client (#413)
1 parent a6ee6f6 commit 20c867d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+16463
-11124
lines changed

.circleci/config.yml

+12-4
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,19 @@ jobs:
128128
name: Checks that examples are runnable
129129
command: |
130130
pip install -e . --user
131+
pip install -e .\[async\] --user
132+
pip install -e .\[extra\] --user
133+
pip install aiohttp-retry --user
131134
export PYTHONPATH="$PWD"
132-
python examples/monitoring_and_alerting.py
133-
python examples/buckets_management.py
134-
python examples/write_structured_data.py
135-
python examples/write_api_callbacks.py
135+
cd examples
136+
python ./monitoring_and_alerting.py
137+
python ./buckets_management.py
138+
python ./write_structured_data.py
139+
python ./write_api_callbacks.py
140+
python ./asynchronous.py
141+
python ./asynchronous_management.py
142+
python ./asynchronous_batching.py
143+
python ./asynchronous_retry.py
136144
check-sphinx:
137145
docker:
138146
- image: *default-python

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
1. [#412](https://github.com/influxdata/influxdb-client-python/pull/412): `DeleteApi` uses default value from `InfluxDBClient.org` if an `org` parameter is not specified
77
1. [#405](https://github.com/influxdata/influxdb-client-python/pull/405): Add `InfluxLoggingHandler`. A handler to use the client in native python logging.
88
1. [#404](https://github.com/influxdata/influxdb-client-python/pull/404): Add `InvocableScriptsApi` to create, update, list, delete and invoke scripts by seamless way
9+
1. [#413](https://github.com/influxdata/influxdb-client-python/pull/413): Add support for `async/await` with asyncio via `InfluxDBClientAsync`, for more info see: **How to use Asyncio**
910

1011
### Bug Fixes
1112
1. [#419](https://github.com/influxdata/influxdb-client-python/pull/419): Use `allowed_methods` to clear deprecation warning [urllib3]

README.rst

+213-21
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ Then import the package:
120120
121121
import influxdb_client
122122
123+
If your application uses async/await in Python you can install with the ``async`` extra::
124+
125+
$ pip install influxdb-client[async]
126+
127+
For more info se `How to use Asyncio`_.
128+
123129
Setuptools
124130
^^^^^^^^^^
125131

@@ -581,27 +587,6 @@ Examples:
581587
582588
self.client = InfluxDBClient.from_env_properties()
583589
584-
Asynchronous client
585-
"""""""""""""""""""
586-
587-
Data are writes in an asynchronous HTTP request.
588-
589-
.. code-block:: python
590-
591-
from influxdb_client import InfluxDBClient, Point
592-
from influxdb_client.client.write_api import ASYNCHRONOUS
593-
594-
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
595-
write_api = client.write_api(write_options=ASYNCHRONOUS)
596-
597-
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
598-
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
599-
600-
async_result = write_api.write(bucket="my-bucket", record=[_point1, _point2])
601-
async_result.get()
602-
603-
client.close()
604-
605590
Synchronous client
606591
""""""""""""""""""
607592

@@ -1324,6 +1309,213 @@ that is replacement for python ``datetime.datetime`` object and also you should
13241309
13251310
.. marker-nanosecond-end
13261311
1312+
How to use Asyncio
1313+
^^^^^^^^^^^^^^^^^^
1314+
.. marker-asyncio-start
1315+
1316+
Starting from version 1.27.0 for Python 3.6+ the ``influxdb-client`` package supports ``async/await`` based on
1317+
`asyncio <https://docs.python.org/3/library/asyncio.html>`_ and `aiohttp <https://docs.aiohttp.org>`_.
1318+
You can install ``aiohttp`` directly:
1319+
1320+
.. code-block:: bash
1321+
1322+
$ python -m pip install influxdb-client aiohttp
1323+
1324+
or use the ``[async]`` extra:
1325+
1326+
.. code-block:: bash
1327+
1328+
$ python -m pip install influxdb-client[async]
1329+
1330+
.. warning::
1331+
1332+
The ``InfluxDBClientAsync`` should be initialised inside ``async coroutine``
1333+
otherwise there can be unexpected behaviour.
1334+
For more info see: `Why is creating a ClientSession outside of an event loop dangerous? <https://docs.aiohttp.org/en/stable/faq.html#why-is-creating-a-clientsession-outside-of-an-event-loop-dangerous>`__.
1335+
1336+
Async APIs
1337+
""""""""""
1338+
All async APIs are available via :class:`~influxdb_client.client.influxdb_client_async.InfluxDBClientAsync`.
1339+
The ``async`` version of the client supports following asynchronous APIs:
1340+
1341+
* :class:`~influxdb_client.client.write_api_async.WriteApiAsync`
1342+
* :class:`~influxdb_client.client.query_api_async.QueryApiAsync`
1343+
* :class:`~influxdb_client.client.delete_api_async.DeleteApiAsync`
1344+
* Management services into ``influxdb_client.service`` supports async operation
1345+
1346+
and also check to readiness of the InfluxDB via ``/ping`` endpoint:
1347+
1348+
.. code-block:: python
1349+
1350+
import asyncio
1351+
1352+
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
1353+
1354+
1355+
async def main():
1356+
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
1357+
ready = await client.ping()
1358+
print(f"InfluxDB: {ready}")
1359+
1360+
1361+
if __name__ == "__main__":
1362+
asyncio.run(main())
1363+
1364+
Async Write API
1365+
"""""""""""""""
1366+
1367+
The :class:`~influxdb_client.client.write_api_async.WriteApiAsync` supports ingesting data as:
1368+
1369+
* ``string`` or ``bytes`` that is formatted as a InfluxDB's line protocol
1370+
* `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16>`__ structure
1371+
* Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` or custom structure
1372+
* `NamedTuple <https://docs.python.org/3/library/collections.html#collections.namedtuple>`_
1373+
* `Data Classes <https://docs.python.org/3/library/dataclasses.html>`_
1374+
* `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
1375+
* List of above items
1376+
1377+
.. code-block:: python
1378+
1379+
import asyncio
1380+
1381+
from influxdb_client import Point
1382+
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
1383+
1384+
1385+
async def main():
1386+
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
1387+
1388+
write_api = client.write_api()
1389+
1390+
_point1 = Point("async_m").tag("location", "Prague").field("temperature", 25.3)
1391+
_point2 = Point("async_m").tag("location", "New York").field("temperature", 24.3)
1392+
1393+
successfully = await write_api.write(bucket="my-bucket", record=[_point1, _point2])
1394+
1395+
print(f" > successfully: {successfully}")
1396+
1397+
1398+
if __name__ == "__main__":
1399+
asyncio.run(main())
1400+
1401+
1402+
Async Query API
1403+
"""""""""""""""
1404+
1405+
The :class:`~influxdb_client.client.query_api_async.QueryApiAsync` supports retrieve data as:
1406+
1407+
* List of :class:`~influxdb_client.client.flux_table.FluxTable`
1408+
* Stream of :class:`~influxdb_client.client.flux_table.FluxRecord` via :class:`~typing.AsyncGenerator`
1409+
* `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
1410+
* Stream of `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_ via :class:`~typing.AsyncGenerator`
1411+
* Raw :class:`~str` output
1412+
1413+
.. code-block:: python
1414+
1415+
import asyncio
1416+
1417+
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
1418+
1419+
1420+
async def main():
1421+
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
1422+
# Stream of FluxRecords
1423+
query_api = client.query_api()
1424+
records = await query_api.query_stream('from(bucket:"my-bucket") '
1425+
'|> range(start: -10m) '
1426+
'|> filter(fn: (r) => r["_measurement"] == "async_m")')
1427+
async for record in records:
1428+
print(record)
1429+
1430+
1431+
if __name__ == "__main__":
1432+
asyncio.run(main())
1433+
1434+
1435+
Async Delete API
1436+
""""""""""""""""
1437+
1438+
.. code-block:: python
1439+
1440+
import asyncio
1441+
from datetime import datetime
1442+
1443+
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
1444+
1445+
1446+
async def main():
1447+
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
1448+
start = datetime.utcfromtimestamp(0)
1449+
stop = datetime.now()
1450+
# Delete data with location = 'Prague'
1451+
successfully = await client.delete_api().delete(start=start, stop=stop, bucket="my-bucket",
1452+
predicate="location = \"Prague\"")
1453+
print(f" > successfully: {successfully}")
1454+
1455+
1456+
if __name__ == "__main__":
1457+
asyncio.run(main())
1458+
1459+
1460+
Management API
1461+
""""""""""""""
1462+
1463+
.. code-block:: python
1464+
1465+
import asyncio
1466+
1467+
from influxdb_client import OrganizationsService
1468+
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
1469+
1470+
1471+
async def main():
1472+
async with InfluxDBClientAsync(url='http://localhost:8086', token='my-token', org='my-org') as client:
1473+
# Initialize async OrganizationsService
1474+
organizations_service = OrganizationsService(api_client=client.api_client)
1475+
1476+
# Find organization with name 'my-org'
1477+
organizations = await organizations_service.get_orgs(org='my-org')
1478+
for organization in organizations.orgs:
1479+
print(f'name: {organization.name}, id: {organization.id}')
1480+
1481+
1482+
if __name__ == "__main__":
1483+
asyncio.run(main())
1484+
1485+
1486+
Proxy and redirects
1487+
"""""""""""""""""""
1488+
1489+
You can configure the client to tunnel requests through an HTTP proxy.
1490+
The following proxy options are supported:
1491+
1492+
- ``proxy`` - Set this to configure the http proxy to be used, ex. ``http://localhost:3128``
1493+
- ``proxy_headers`` - A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
1494+
1495+
.. code-block:: python
1496+
1497+
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
1498+
1499+
1500+
async with InfluxDBClientAsync(url="http://localhost:8086",
1501+
token="my-token",
1502+
org="my-org",
1503+
proxy="http://localhost:3128") as client:
1504+
1505+
.. note::
1506+
1507+
If your proxy notify the client with permanent redirect (``HTTP 301``) to **different host**.
1508+
The client removes ``Authorization`` header, because otherwise the contents of ``Authorization`` is sent to third parties
1509+
which is a security vulnerability.
1510+
1511+
Client automatically follows HTTP redirects. The default redirect policy is to follow up to ``10`` consecutive requests. The redirects can be configured via:
1512+
1513+
- ``allow_redirects`` - If set to ``False``, do not follow HTTP redirects. ``True`` by default.
1514+
- ``max_redirects`` - Maximum number of HTTP redirects to follow. ``10`` by default.
1515+
1516+
1517+
.. marker-asyncio-end
1518+
13271519
Local tests
13281520
-----------
13291521

docs/api.rst

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ QueryApi
1414
.. autoclass:: influxdb_client.QueryApi
1515
:members:
1616

17+
.. autoclass:: influxdb_client.client.flux_table.FluxTable
18+
:members:
19+
20+
.. autoclass:: influxdb_client.client.flux_table.FluxRecord
21+
:members:
22+
1723
WriteApi
1824
""""""""
1925
.. autoclass:: influxdb_client.WriteApi

docs/api_async.rst

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
Async API Reference
2+
===================
3+
4+
.. contents::
5+
:local:
6+
7+
InfluxDBClientAsync
8+
"""""""""""""""""""
9+
.. autoclass:: influxdb_client.client.influxdb_client_async.InfluxDBClientAsync
10+
:members:
11+
12+
QueryApiAsync
13+
"""""""""""""
14+
.. autoclass:: influxdb_client.client.query_api_async.QueryApiAsync
15+
:members:
16+
17+
WriteApiAsync
18+
"""""""""""""
19+
.. autoclass:: influxdb_client.client.write_api_async.WriteApiAsync
20+
:members:
21+
22+
DeleteApiAsync
23+
""""""""""""""
24+
.. autoclass:: influxdb_client.client.delete_api_async.DeleteApiAsync
25+
:members:

docs/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ InfluxDB 2.0 python client
88

99
usage
1010
api
11+
api_async
1112
migration
1213

1314
.. include:: ../README.rst

docs/usage.rst

+6
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ Handling Errors
5252
:start-after: marker-handling-errors-start
5353
:end-before: marker-handling-errors-end
5454

55+
How to use Asyncio
56+
^^^^^^^^^^^^^^^^^^
57+
.. include:: ../README.rst
58+
:start-after: marker-asyncio-start
59+
:end-before: marker-asyncio-end
60+
5561
Debugging
5662
^^^^^^^^^
5763

examples/README.md

+6
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,10 @@
2929
- [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8
3030
- [nanosecond_precision.py](nanosecond_precision.py) - How to use nanoseconds precision
3131
- [invocable_scripts.py](invocable_scripts.py) - How to use Invocable scripts Cloud API to create custom endpoints that query data
32+
33+
## Asynchronous
34+
- [asynchronous.py](asynchronous.py) - How to use Asyncio with InfluxDB client
35+
- [asynchronous_management.py](asynchronous_management.py) - How to use asynchronous Management API
36+
- [asynchronous_batching.py](asynchronous_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches
37+
- [asynchronous_retry.py](asynchronous_retry.py) - How to use [aiohttp-retry](https://github.com/inyutin/aiohttp_retry) to configure retries
3238

0 commit comments

Comments
 (0)