Skip to content

Provide coroutine based Api for asynchronous operations #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
planetmarshall opened this issue Mar 14, 2021 · 10 comments · Fixed by #413
Closed

Provide coroutine based Api for asynchronous operations #205

planetmarshall opened this issue Mar 14, 2021 · 10 comments · Fixed by #413
Labels
enhancement New feature or request
Milestone

Comments

@planetmarshall
Copy link

The standard "pythonic" way of implementing an asynchronous API is with coroutines, eg I would expect usage such as

async def write_data(client):
    data = {...}
    await client.write_data(data)

This would make the InfluxDB API consistent with other asynchronous APIs (eg the asyncio standard library)

@bednar
Copy link
Contributor

bednar commented Mar 15, 2021

@planetmarshall, thanks for using our client, We will take a look.

@bednar bednar added the enhancement New feature or request label Mar 15, 2021
@planetmarshall
Copy link
Author

I used the run_in_executor approach in https://github.com/planetmarshall/solis-service

async def write_record(self, record):
    writer = _client.write_api(write_options=SYNCHRONOUS)
    bucket = 'my_bucket'
    return asyncio.get_running_loop().run_in_executor(
            None, partial(writer.write, bucket, record=record))

@taybin
Copy link

taybin commented Apr 12, 2021

The bigger issue is that openapi-generator's default python generator doesn't really support python's asyncio. OpenAPITools/openapi-generator#763.

However, I dug into their source code and they have a python/asyncio template which does seem to have async/await support. I haven't used openapi-generator and don't know how to activate it.

Enabling that appears to be the ticket to fixing this and making influxdb-client-python behave like a good async python library.

@bednar
Copy link
Contributor

bednar commented Mar 9, 2022

cc @planetmarshall, @taybin, @varna9000, @honglei, @pedvide, @bjenkins348, @zamonia500, @Olegt0rr, @ravngr, @DontPanicO, @hanssens, @wereii, @Fogapod, @j-trotter-rl, @meseta, @pengyao, @mymtw, @illagrenan, @trunet

Starting from upcoming major version the influxdb-client package will supports async/await based on asyncio and aiohttp. Please try a preview version by:

pip install 'git+https://github.com/influxdata/influxdb-client-python.git@asyncio#egg=influxdb-client[ciso,async]'

For more info about API changes and installation instructions see:

Any feedback will be appreciated 👂

@hanssens
Copy link

I just gave draft PR #413 a test spin and it works just fine so far. Great work, @bednar!

Not sure if it's worth mentioning, but a small thing I found is that I got asyncio errors when attemping to use the async client from a class variable. I.e. when the client is initialisation outside of the async function:

  class Foo:
      def __init__(self):
          self.async_client = InfluxDBClientAsync(url=self.url, token=self.token, org=self.org)

      async def push_to_influx(self, bucket: str, points: Iterable[Point]):
          await self.async_client.write_api().write(bucket=bucket, record=points) # <-- errors

Instead, moving the client initialisation into the scope of the function itself does work:

  async def push_to_influx(self, bucket: str, points: Iterable[Point]):
      async with InfluxDBClientAsync(url=self.url, token=self.token, org=self.org) as async_client:
              await async_client.write_api().write(bucket=bucket, record=points)

@bednar
Copy link
Contributor

bednar commented Mar 11, 2022

@hanssens, thanks for feedback 👍

The client should be initialised inside async coroutine otherwise there can be unexpected behaviour. I will update doc about requirements to initialise the async client. For more info see: https://github.com/aio-libs/aiohttp/blob/master/docs/faq.rst#why-is-creating-a-clientsession-outside-of-an-event-loop-dangerous

If you would like to init the client in the __init__ of another class, you can use something like:

import asyncio
from typing import Iterable

from influxdb_client import Point
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync


class Foo:
    url = "http://localhost:8086"
    token = "my-token"
    org = "my-org"

    def __init__(self):
        self.async_client = InfluxDBClientAsync(url=self.url, token=self.token, org=self.org)

    async def close(self) -> None:
        await self.async_client.close()

    async def push_to_influx(self, bucket: str, points: Iterable[Point]):
        await self.async_client.write_api().write(bucket=bucket, record=points)  # <-- err


async def main():
    foo = Foo()
    _point1 = Point("async_m").tag("location", "Prague").field("temperature", 25.3)
    _point2 = Point("async_m").tag("location", "New York").field("temperature", 24.3)
    await foo.push_to_influx(bucket="my-bucket", points=[_point1, _point2])
    await foo.close()


if __name__ == "__main__":
    asyncio.run(main())

Regards

@hanssens
Copy link

Thanks for the tip @bednar.

In the meantime, I've been running the async variant for almost two weeks non-stop in production. It processes approx. 5K points every couple of seconds, 24/7. FWIW => looks stable and usable to me. Again, great job. Hope this makes it to the next release soon.

@wereii
Copy link

wereii commented Mar 24, 2022

The client should be initialised inside async coroutine otherwise there can be unexpected behaviour.

This could be actively shielded against by having a check like this somewhere in the client itself:

import asyncio

try:
    _ = asyncio.events.get_running_loop()
    # RuntimeError: no running event loop
except RuntimeError as exc:
    # wrap it or leave it to error out

@bednar
Copy link
Contributor

bednar commented Mar 25, 2022

@hanssens thanks for your feedback 👍

The #413 will be a part of next major release in April

@bednar
Copy link
Contributor

bednar commented Mar 25, 2022

The client should be initialised inside async coroutine otherwise there can be unexpected behaviour.

This could be actively shielded against by having a check like this somewhere in the client itself:

import asyncio

try:
    _ = asyncio.events.get_running_loop()
    # RuntimeError: no running event loop
except RuntimeError as exc:
    # wrap it or leave it to error out

@wereii

Thanks for tip, I will take a look

@bednar bednar added this to the 1.28.0 milestone Mar 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants