Skip to content

Request: Throw exception when write_api fails #122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
datarian opened this issue Jul 1, 2020 · 6 comments
Closed

Request: Throw exception when write_api fails #122

datarian opened this issue Jul 1, 2020 · 6 comments
Labels
question Further information is requested
Milestone

Comments

@datarian
Copy link

datarian commented Jul 1, 2020

I am writing an influxdb hook for apache airflow. The relevant code to write a list of lineprotocol-points is as follows:

def write_points(self, points: Union[List[str], str], write_options: dict=WRITE_OPTIONS_BATCH) -> bool:
        """
        Writes one or more point(s) to InfluxDB at the bucket specified for the hook.
        Point structure is: "measurement_name,key(s) field=value,field=value timestamp[ns]
        If timestamp is omitted, the local UTC server time is inserted. Key is also optional
        """
        if not isinstance(points, List):
            points = [points]
        if not self._validate_lp_list(points):
            raise ValueError(f"The lineprotocol data is not valid.\nFirst point: {points[0]}")
        try:
            write_client = self.client.write_api(
                write_options=WriteOptions(**write_options))
            try:
                bucket = self.bucket[0] if isinstance(self.bucket, tuple) else self.bucket
                write_client.write(bucket, self.org, points)
            except Exception as e:
                raise e
        except Exception as e:
            self.log.error(f"Failed to setup the write client. Reason:  {e}")     
            raise e     
        finally:
            write_client.__del__()
        return True

Unfortunately, write_client.write() does not throw exceptions when something goes wrong. All that happens is a message in the error log. Would it be possible to make write_api raise an exception for such cases? If not, how else would it be possible to know when things go wrong, apart from the logs?

An example of what I see in the logs is this:

[2020-07-01 10:20:29,312] {{logging_mixin.py:112}} INFO - [2020-07-01 10:20:29,311] {{write_api.py:399}} ERROR - The batch item wasn't processed successfully because: (400) Reason: Bad Request HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json; charset=utf-8', 'X-Platform-Error-Code': 'invalid', 'Date': 'Wed, 01 Jul 2020 10:20:29 GMT', 'Content-Length': '254'})

@bednar
Copy link
Contributor

bednar commented Jul 1, 2020

Hi @datarian,

the batching api writes data at a background thread so it's not possible to raise exception (it will raise at background thread).

If you would like to immediately know if your data was successfully written then you should use the synchronous version of write api:

from influxdb_client import InfluxDBClient, Point
from influxdb_client .client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)

write_api.write(bucket="my-bucket", record=data)

client.__del__()

Regards

@bednar bednar added the question Further information is requested label Jul 2, 2020
@mdegat01
Copy link

mdegat01 commented Jul 9, 2020

@bednar I have this issue as well. It's not specific to the batch API either, in fact I think there's a breaking change here. I recently updated from v1.6.0 to v1.8.0 and an exception I was expecting is no longer thrown. On v1.6.0 here is what occurs when I try to connect to a host that does not exist:

>>> from influxdb_client import InfluxDBClient
>>> influx = InfluxDBClient(url="http://localhost:9000", org="dslkfjdslj", token="kjlkj")
>>> from influxdb_client.client.write_api import SYNCHRONOUS
>>> write_api = influx.write_api(write_options=SYNCHRONOUS)
>>> try:
...   write_api.write([])
... except Exception as exc:
...   print(type(exc))
...
<class 'urllib3.exceptions.MaxRetryError'>

And here is that same test after installing v1.8.0:

>>> from influxdb_client import InfluxDBClient
>>> from influxdb_client.client.write_api import SYNCHRONOUS
>>> influx = InfluxDBClient(url="http://localhost:9000", org="dslkfjdslj", token="kjlkj")
>>> write_api = influx.write_api(write_options=SYNCHRONOUS)
>>> try:
...   write_api.write([])
... except Exception as exc:
...   print(type(exc))
...
>>>

There's no longer an exception thrown at all when you fail to connect to the host entirely, even when using SYNCHRONOUS. This is a problem as part of the setup of the application where this library is used I ask the user to provide the details for their InfluxDB client they want me to write to. I need a way to validate what they entered is correct. It seems there is no way to do that anymore since the write API does not throw an exception or return a response indicating that it failed to connect anymore.

@mdegat01
Copy link

mdegat01 commented Jul 9, 2020

In fact this seems to an issue for any write, not just if the host is missing. Here's me trying to synchronously write to a valid host with everything else invalid on v1.6.0

>>> from influxdb_client import InfluxDBClient
>>> from influxdb_client.client.write_api import SYNCHRONOUS
>>> influx = InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", org="dlfsdklfjs", token="lkjdlksjf")
>>> write_api = influx.write_api(write_options=SYNCHRONOUS)
>>> try:
...   write_api.write(record=[], bucket="lkjlkj")
... except Exception as exc:
...   print(type(exc))
...
<class 'influxdb_client.rest.ApiException'>
>>> try:
...   write_api.write(record=[], bucket="lkjlkj")
... except Exception as exc:
...   print(exc)
...
(401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Date': 'Thu, 09 Jul 2020 22:34:37 GMT', 'Content-Type': 'application/json; charset=utf-8', 'Content-Length': '55', 'Connection': 'keep-alive', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains', 'x-platform-error-code': 'unauthorized'})
HTTP response body: {"code":"unauthorized","message":"unauthorized access"}

And here's the same test on v1.8.0:

>>> from influxdb_client import InfluxDBClient
>>> from influxdb_client.client.write_api import SYNCHRONOUS
>>> influx = InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", org="dlfsdklfjs", token="lkjdlksjf")
>>> write_api = influx.write_api(write_options=SYNCHRONOUS)
>>> try:
...   write_api.write(record=[], bucket="lkjlkj")
... except Exception as exc:
...   print(exc)
...
>>>

@bednar
Copy link
Contributor

bednar commented Jul 10, 2020

Hi @mdegat01,

the client didn't raise exception because the array is empty.

If you would like to tests a credentials to write then use:

write_api.write(record=b'',  bucket="lkjlkj")

I've add a test to ensure that client works for you as expect:

def test_check_write_permission_by_empty_data(self):
and also prepared PR to home-assistant: home-assistant/core#37710.

Regards

@mdegat01
Copy link

Thanks @bednar ! That's exactly what I was looking for. You're right, that works great now.

Unfortunately it looks like that still throws an 'Invalid Inputs' exception on success, I was hoping to eliminate exception handling from the success path in the code. I created a separate feature request for that for your consideration. But yea, thanks again for your help and the PR 👍

@bednar
Copy link
Contributor

bednar commented Jul 13, 2020

@mdegat01 you are welcome

@bednar bednar added this to the 1.10.0 milestone Jul 20, 2020
@bednar bednar closed this as completed Jul 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants