Skip to content

feat: add Async version of the InfluxDB client #413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
197a82e
feat: prepare async client
bednar Mar 3, 2022
9dc43cb
fix: code & doc style
bednar Mar 3, 2022
8cc39a7
feat: prepared _BaseClient to share sources between sync and async cl…
bednar Mar 3, 2022
9cd8a78
fix: CI
bednar Mar 3, 2022
7b6931a
fix: code style
bednar Mar 3, 2022
3c55787
feat: implement async version, ping
bednar Mar 3, 2022
44c9d3e
feat: add asynchronous.py example
bednar Mar 3, 2022
5848ace
chore: add tests for ping and version
bednar Mar 3, 2022
1719120
chore: add tests for ping and version
bednar Mar 3, 2022
a47dddc
chore: prepare query and write api for async API
bednar Mar 3, 2022
c525847
chore: prepare QueryApi async
bednar Mar 7, 2022
01dcceb
chore: add async support into FluxCSVParser
bednar Mar 7, 2022
b9014ff
chore: reuse `_parse_flux_response_row`
bednar Mar 7, 2022
a02f178
feat: add async query into raw output
bednar Mar 7, 2022
0b091a1
feat: add async query into raw output
bednar Mar 7, 2022
1fed67c
feat: add async query into FluxRecord stream
bednar Mar 7, 2022
2fe01da
docs: add Async API Reference
bednar Mar 8, 2022
2b44aa6
feat: add async query into Pandas Dataframe
bednar Mar 8, 2022
6d69424
fix: check example
bednar Mar 8, 2022
02f4770
feat: prepare async version of WriteApi
bednar Mar 8, 2022
1cae01c
feat: async version of WriteApi
bednar Mar 8, 2022
8573636
fix: code style
bednar Mar 8, 2022
e8bcc46
chore: delete_service is able to run async request
bednar Mar 8, 2022
e3b1dcd
chore: delete_service is able to run async request
bednar Mar 8, 2022
e2a5a2a
docs: clarify used types of queries
bednar Mar 8, 2022
4ed75c1
chore: delete api return True for successfully written data
bednar Mar 8, 2022
db3fe2f
docs: polish documentation
bednar Mar 8, 2022
54584d0
docs: for async client
bednar Mar 9, 2022
4200f04
fix: code style
bednar Mar 9, 2022
ba4d57b
docs: update CHANGELOG.md
bednar Mar 9, 2022
15a2bd4
fix: check twine
bednar Mar 9, 2022
e62f8a5
feat: implement debug logging for async client
bednar Mar 10, 2022
e40611c
feat: implement timeout for async client
bednar Mar 10, 2022
f032e05
feat: implement gzip for async client
bednar Mar 10, 2022
518f08d
docs: document features
bednar Mar 10, 2022
b3ed164
feat: add initialization from config and env variables
bednar Mar 10, 2022
c10fb1d
fix: tests
bednar Mar 10, 2022
f5fba4f
docs: add warning about async initialization
bednar Mar 11, 2022
7430a18
feat: add async method to all APIs
bednar Mar 14, 2022
50a1f6b
docs: add example how to use asynchronous management API
bednar Mar 14, 2022
c2efbb3
docs: add asynchronous example into README.rst
bednar Mar 14, 2022
039c35a
docs: add asynchronous batching
bednar Mar 15, 2022
914035f
fix: running example on CircleCI
bednar Mar 15, 2022
2f36b2d
feat: add support to configure redirects
bednar Mar 15, 2022
9a545ad
chore: add support to configure retries
bednar Mar 15, 2022
b1f2e5d
docs: clarify parsing a line of query response
bednar Mar 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,19 @@ jobs:
name: Checks that examples are runnable
command: |
pip install -e . --user
pip install -e .\[async\] --user
pip install -e .\[extra\] --user
pip install aiohttp-retry --user
export PYTHONPATH="$PWD"
python examples/monitoring_and_alerting.py
python examples/buckets_management.py
python examples/write_structured_data.py
python examples/write_api_callbacks.py
cd examples
python ./monitoring_and_alerting.py
python ./buckets_management.py
python ./write_structured_data.py
python ./write_api_callbacks.py
python ./asynchronous.py
python ./asynchronous_management.py
python ./asynchronous_batching.py
python ./asynchronous_retry.py
check-sphinx:
docker:
- image: *default-python
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
1. [#412](https://github.com/influxdata/influxdb-client-python/pull/412): `DeleteApi` uses default value from `InfluxDBClient.org` if an `org` parameter is not specified
1. [#405](https://github.com/influxdata/influxdb-client-python/pull/405): Add `InfluxLoggingHandler`. A handler to use the client in native python logging.
1. [#404](https://github.com/influxdata/influxdb-client-python/pull/404): Add `InvocableScriptsApi` to create, update, list, delete and invoke scripts by seamless way
1. [#413](https://github.com/influxdata/influxdb-client-python/pull/413): Add support for `async/await` with asyncio via `InfluxDBClientAsync`, for more info see: **How to use Asyncio**

### CI
1. [#411](https://github.com/influxdata/influxdb-client-python/pull/411): Use new Codecov uploader for reporting code coverage
Expand Down
234 changes: 213 additions & 21 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ Then import the package:

import influxdb_client

If your application uses async/await in Python you can install with the ``async`` extra::

$ pip install influxdb-client[async]

For more info se `How to use Asyncio`_.

Setuptools
^^^^^^^^^^

Expand Down Expand Up @@ -581,27 +587,6 @@ Examples:

self.client = InfluxDBClient.from_env_properties()

Asynchronous client
"""""""""""""""""""

Data are writes in an asynchronous HTTP request.

.. code-block:: python

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=ASYNCHRONOUS)

_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)

async_result = write_api.write(bucket="my-bucket", record=[_point1, _point2])
async_result.get()

client.close()

Synchronous client
""""""""""""""""""

Expand Down Expand Up @@ -1324,6 +1309,213 @@ that is replacement for python ``datetime.datetime`` object and also you should

.. marker-nanosecond-end

How to use Asyncio
^^^^^^^^^^^^^^^^^^
.. marker-asyncio-start

Starting from version 1.27.0 for Python 3.6+ the ``influxdb-client`` package supports ``async/await`` based on
`asyncio <https://docs.python.org/3/library/asyncio.html>`_ and `aiohttp <https://docs.aiohttp.org>`_.
You can install ``aiohttp`` directly:

.. code-block:: bash

$ python -m pip install influxdb-client aiohttp

or use the ``[async]`` extra:

.. code-block:: bash

$ python -m pip install influxdb-client[async]

.. warning::

The ``InfluxDBClientAsync`` should be initialised inside ``async coroutine``
otherwise there can be unexpected behaviour.
For more info see: `Why is creating a ClientSession outside of an event loop dangerous? <https://docs.aiohttp.org/en/stable/faq.html#why-is-creating-a-clientsession-outside-of-an-event-loop-dangerous>`__.

Async APIs
""""""""""
All async APIs are available via :class:`~influxdb_client.client.influxdb_client_async.InfluxDBClientAsync`.
The ``async`` version of the client supports following asynchronous APIs:

* :class:`~influxdb_client.client.write_api_async.WriteApiAsync`
* :class:`~influxdb_client.client.query_api_async.QueryApiAsync`
* :class:`~influxdb_client.client.delete_api_async.DeleteApiAsync`
* Management services into ``influxdb_client.service`` supports async operation

and also check to readiness of the InfluxDB via ``/ping`` endpoint:

.. code-block:: python

import asyncio

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync


async def main():
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
ready = await client.ping()
print(f"InfluxDB: {ready}")


if __name__ == "__main__":
asyncio.run(main())

Async Write API
"""""""""""""""

The :class:`~influxdb_client.client.write_api_async.WriteApiAsync` supports ingesting data as:

* ``string`` or ``bytes`` that is formatted as a InfluxDB's line protocol
* `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16>`__ structure
* Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` or custom structure
* `NamedTuple <https://docs.python.org/3/library/collections.html#collections.namedtuple>`_
* `Data Classes <https://docs.python.org/3/library/dataclasses.html>`_
* `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
* List of above items

.. code-block:: python

import asyncio

from influxdb_client import Point
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync


async def main():
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:

write_api = client.write_api()

_point1 = Point("async_m").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("async_m").tag("location", "New York").field("temperature", 24.3)

successfully = await write_api.write(bucket="my-bucket", record=[_point1, _point2])

print(f" > successfully: {successfully}")


if __name__ == "__main__":
asyncio.run(main())


Async Query API
"""""""""""""""

The :class:`~influxdb_client.client.query_api_async.QueryApiAsync` supports retrieve data as:

* List of :class:`~influxdb_client.client.flux_table.FluxTable`
* Stream of :class:`~influxdb_client.client.flux_table.FluxRecord` via :class:`~typing.AsyncGenerator`
* `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
* Stream of `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_ via :class:`~typing.AsyncGenerator`
* Raw :class:`~str` output

.. code-block:: python

import asyncio

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync


async def main():
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
# Stream of FluxRecords
query_api = client.query_api()
records = await query_api.query_stream('from(bucket:"my-bucket") '
'|> range(start: -10m) '
'|> filter(fn: (r) => r["_measurement"] == "async_m")')
async for record in records:
print(record)


if __name__ == "__main__":
asyncio.run(main())


Async Delete API
""""""""""""""""

.. code-block:: python

import asyncio
from datetime import datetime

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync


async def main():
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
start = datetime.utcfromtimestamp(0)
stop = datetime.now()
# Delete data with location = 'Prague'
successfully = await client.delete_api().delete(start=start, stop=stop, bucket="my-bucket",
predicate="location = \"Prague\"")
print(f" > successfully: {successfully}")


if __name__ == "__main__":
asyncio.run(main())


Management API
""""""""""""""

.. code-block:: python

import asyncio

from influxdb_client import OrganizationsService
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync


async def main():
async with InfluxDBClientAsync(url='http://localhost:8086', token='my-token', org='my-org') as client:
# Initialize async OrganizationsService
organizations_service = OrganizationsService(api_client=client.api_client)

# Find organization with name 'my-org'
organizations = await organizations_service.get_orgs(org='my-org')
for organization in organizations.orgs:
print(f'name: {organization.name}, id: {organization.id}')


if __name__ == "__main__":
asyncio.run(main())


Proxy and redirects
"""""""""""""""""""

You can configure the client to tunnel requests through an HTTP proxy.
The following proxy options are supported:

- ``proxy`` - Set this to configure the http proxy to be used, ex. ``http://localhost:3128``
- ``proxy_headers`` - A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.

.. code-block:: python

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync


async with InfluxDBClientAsync(url="http://localhost:8086",
token="my-token",
org="my-org",
proxy="http://localhost:3128") as client:

.. note::

If your proxy notify the client with permanent redirect (``HTTP 301``) to **different host**.
The client removes ``Authorization`` header, because otherwise the contents of ``Authorization`` is sent to third parties
which is a security vulnerability.

Client automatically follows HTTP redirects. The default redirect policy is to follow up to ``10`` consecutive requests. The redirects can be configured via:

- ``allow_redirects`` - If set to ``False``, do not follow HTTP redirects. ``True`` by default.
- ``max_redirects`` - Maximum number of HTTP redirects to follow. ``10`` by default.


.. marker-asyncio-end

Local tests
-----------

Expand Down
6 changes: 6 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ QueryApi
.. autoclass:: influxdb_client.QueryApi
:members:

.. autoclass:: influxdb_client.client.flux_table.FluxTable
:members:

.. autoclass:: influxdb_client.client.flux_table.FluxRecord
:members:

WriteApi
""""""""
.. autoclass:: influxdb_client.WriteApi
Expand Down
25 changes: 25 additions & 0 deletions docs/api_async.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Async API Reference
===================

.. contents::
:local:

InfluxDBClientAsync
"""""""""""""""""""
.. autoclass:: influxdb_client.client.influxdb_client_async.InfluxDBClientAsync
:members:

QueryApiAsync
"""""""""""""
.. autoclass:: influxdb_client.client.query_api_async.QueryApiAsync
:members:

WriteApiAsync
"""""""""""""
.. autoclass:: influxdb_client.client.write_api_async.WriteApiAsync
:members:

DeleteApiAsync
""""""""""""""
.. autoclass:: influxdb_client.client.delete_api_async.DeleteApiAsync
:members:
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ InfluxDB 2.0 python client

usage
api
api_async
migration

.. include:: ../README.rst
Expand Down
6 changes: 6 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ Handling Errors
:start-after: marker-handling-errors-start
:end-before: marker-handling-errors-end

How to use Asyncio
^^^^^^^^^^^^^^^^^^
.. include:: ../README.rst
:start-after: marker-asyncio-start
:end-before: marker-asyncio-end

Debugging
^^^^^^^^^

Expand Down
6 changes: 6 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@
- [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8
- [nanosecond_precision.py](nanosecond_precision.py) - How to use nanoseconds precision
- [invocable_scripts.py](invocable_scripts.py) - How to use Invocable scripts Cloud API to create custom endpoints that query data

## Asynchronous
- [asynchronous.py](asynchronous.py) - How to use Asyncio with InfluxDB client
- [asynchronous_management.py](asynchronous_management.py) - How to use asynchronous Management API
- [asynchronous_batching.py](asynchronous_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches
- [asynchronous_retry.py](asynchronous_retry.py) - How to use [aiohttp-retry](https://github.com/inyutin/aiohttp_retry) to configure retries

Loading