From 2d15d6b47c144d2601d90f9aa4e218813eb27ae6 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 13 Oct 2022 14:54:26 +0200 Subject: [PATCH 1/2] fix(async): parsing query response with two-bytes UTF-8 character --- influxdb_client/client/flux_csv_parser.py | 6 +++++- tests/test_InfluxDBClientAsync.py | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/influxdb_client/client/flux_csv_parser.py b/influxdb_client/client/flux_csv_parser.py index db4c4c7e..2881afb1 100644 --- a/influxdb_client/client/flux_csv_parser.py +++ b/influxdb_client/client/flux_csv_parser.py @@ -384,6 +384,10 @@ def _print_profiler_info(self, flux_record: FluxRecord): class _StreamReaderToWithAsyncRead: def __init__(self, response): self.response = response + self.decoder = codecs.getincrementaldecoder(_UTF_8_encoding)() async def read(self, size: int) -> str: - return (await self.response.read(size)).decode(_UTF_8_encoding) + raw_bytes = (await self.response.read(size)) + if not raw_bytes: + return self.decoder.decode(b'', final=True) + return self.decoder.decode(raw_bytes, final=False) diff --git a/tests/test_InfluxDBClientAsync.py b/tests/test_InfluxDBClientAsync.py index ddd005c7..3283da40 100644 --- a/tests/test_InfluxDBClientAsync.py +++ b/tests/test_InfluxDBClientAsync.py @@ -383,6 +383,25 @@ async def test_query_exception_propagation(self): await self.client.query_api().query("buckets()", "my-org") self.assertEqual("unauthorized access", e.value.message) + @async_test + @aioresponses() + async def test_parse_utf8_two_bytes_character(self, mocked): + await self.client.close() + self.client = InfluxDBClientAsync("http://localhost") + + body = '''#group,false,false,false,false,true,true,true +#datatype,string,long,dateTime:RFC3339,string,string,string,string +#default,_result,,,,,, +,result,table,_time,_value,_field,_measurement,type +''' + for i in range(1000): + body += f",,0,2022-10-13T12:28:31.{i}Z,ÂÂÂ,value,async,error\n" + + mocked.post('http://localhost/api/v2/query?org=my-org', status=200, body=body) + + data_frame = await self.client.query_api().query_data_frame("from()", "my-org") + self.assertEqual(1000, len(data_frame)) + async def _prepare_data(self, measurement: str): _point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3) _point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3) From 6bc35df339f37c6e7e65579f78b03ad745e0cda9 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 13 Oct 2022 14:58:08 +0200 Subject: [PATCH 2/2] docs: update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index afafbe49..68ce0f4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Bug Fixes 1. [#512](https://github.com/influxdata/influxdb-client-python/pull/512): Exception propagation for asynchronous `QueryApi` [async/await] +1. [#518](https://github.com/influxdata/influxdb-client-python/pull/518): Parsing query response with two-bytes UTF-8 character [async/await] ## 1.33.0 [2022-09-29]