From 39eb1ddd476515576100e25bc8056a478e2c8fc3 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 7 Oct 2022 14:16:45 +0200 Subject: [PATCH 1/2] fix: exception propagation for asynchronous `QueryApi` --- influxdb_client/client/query_api_async.py | 36 ++++++++++++----------- tests/test_InfluxDBClientAsync.py | 9 ++++++ 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/influxdb_client/client/query_api_async.py b/influxdb_client/client/query_api_async.py index 0cbe4a3a..995adff4 100644 --- a/influxdb_client/client/query_api_async.py +++ b/influxdb_client/client/query_api_async.py @@ -8,7 +8,8 @@ from influxdb_client.client._base import _BaseQueryApi from influxdb_client.client.flux_table import FluxRecord, TableList from influxdb_client.client.query_api import QueryOptions -from influxdb_client.rest import _UTF_8_encoding +from influxdb_client.rest import _UTF_8_encoding, ApiException +from .._async.rest import RESTResponseAsync class QueryApiAsync(_BaseQueryApi): @@ -98,10 +99,7 @@ async def query(self, query: str, org=None, params: dict = None) -> TableList: """ # noqa: E501 org = self._org_param(org) - response = await self._query_api.post_query_async(org=org, - query=self._create_query(query, self.default_dialect, params), - async_req=False, _preload_content=False, - _return_http_data_only=True) + response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params)) return await self._to_tables_async(response, query_options=self._get_query_options()) @@ -118,10 +116,7 @@ async def query_stream(self, query: str, org=None, params: dict = None) -> Async """ # noqa: E501 org = self._org_param(org) - response = await self._query_api.post_query_async(org=org, - query=self._create_query(query, self.default_dialect, params), - async_req=False, _preload_content=False, - _return_http_data_only=True) + response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params)) return await self._to_flux_record_stream_async(response, query_options=self._get_query_options()) @@ -193,11 +188,8 @@ async def query_data_frame_stream(self, query: str, org=None, data_frame_index: """ # noqa: E501 org = self._org_param(org) - response = await self._query_api.post_query_async(org=org, - query=self._create_query(query, self.default_dialect, params, - dataframe_query=True), - async_req=False, _preload_content=False, - _return_http_data_only=True) + response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params, + dataframe_query=True)) return await self._to_data_frame_stream_async(data_frame_index=data_frame_index, response=response, query_options=self._get_query_options()) @@ -215,8 +207,18 @@ async def query_raw(self, query: str, org=None, dialect=_BaseQueryApi.default_di :return: :class:`~str` """ org = self._org_param(org) - result = await self._query_api.post_query_async(org=org, query=self._create_query(query, dialect, params), - async_req=False, _preload_content=False, - _return_http_data_only=True) + result = await self._post_query(org=org, query=self._create_query(query, dialect, params)) raw_bytes = await result.read() return raw_bytes.decode(_UTF_8_encoding) + + async def _post_query(self, org, query): + response = await self._query_api.post_query_async(org=org, + query=query, + async_req=False, + _preload_content=False, + _return_http_data_only=True) + if not 200 <= response.status <= 299: + data = await response.read() + raise ApiException(http_resp=RESTResponseAsync(response, data)) + + return response diff --git a/tests/test_InfluxDBClientAsync.py b/tests/test_InfluxDBClientAsync.py index ec339235..ddd005c7 100644 --- a/tests/test_InfluxDBClientAsync.py +++ b/tests/test_InfluxDBClientAsync.py @@ -374,6 +374,15 @@ async def test_parse_csv_with_new_lines_in_column(self, mocked): self.assertEqual(4, len(records)) + @async_test + async def test_query_exception_propagation(self): + await self.client.close() + self.client = InfluxDBClientAsync(url="http://localhost:8086", token="wrong", org="my-org") + + with pytest.raises(InfluxDBError) as e: + await self.client.query_api().query("buckets()", "my-org") + self.assertEqual("unauthorized access", e.value.message) + async def _prepare_data(self, measurement: str): _point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3) _point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3) From 696da818402103012c3ad75c074458f7b4ab0e6d Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 7 Oct 2022 14:20:57 +0200 Subject: [PATCH 2/2] docs: update CHANGELOG.md --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2aaa175a..afafbe49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ### Features 1. [#510](https://github.com/influxdata/influxdb-client-python/pull/510): Allow to use client's optional configs for initialization from file or environment properties +### Bug Fixes +1. [#512](https://github.com/influxdata/influxdb-client-python/pull/512): Exception propagation for asynchronous `QueryApi` [async/await] + ## 1.33.0 [2022-09-29] ### Features