diff --git a/CHANGELOG.md b/CHANGELOG.md index db25655b..bb371dd5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.33.0 [unreleased] +### Bug Fixes +1. [#497](https://github.com/influxdata/influxdb-client-python/pull/497): Parsing InfluxDB response with new line character in CSV column [async/await] + ## 1.32.0 [2022-08-25] :warning: This release drop supports for Python 3.6. As of 2021-12-23, 3.6 has reached the end-of-life phase of its release cycle. 3.6.15 was the final security release. For more info see: https://peps.python.org/pep-0494/#lifespan diff --git a/README.rst b/README.rst index ccccbd9b..8ccfa103 100644 --- a/README.rst +++ b/README.rst @@ -1371,12 +1371,12 @@ How to use Asyncio .. marker-asyncio-start Starting from version 1.27.0 for Python 3.7+ the ``influxdb-client`` package supports ``async/await`` based on -`asyncio `_ and `aiohttp `_. -You can install ``aiohttp`` directly: +`asyncio `_, `aiohttp `_ and `aiocsv `_. +You can install ``aiohttp`` and ``aiocsv`` directly: .. code-block:: bash - $ python -m pip install influxdb-client aiohttp + $ python -m pip install influxdb-client aiohttp aiocsv or use the ``[async]`` extra: diff --git a/influxdb_client/client/flux_csv_parser.py b/influxdb_client/client/flux_csv_parser.py index 0a31fe59..4756d32a 100644 --- a/influxdb_client/client/flux_csv_parser.py +++ b/influxdb_client/client/flux_csv_parser.py @@ -99,7 +99,8 @@ def __exit__(self, exc_type, exc_val, exc_tb): async def __aenter__(self) -> 'FluxCsvParser': """Initialize CSV reader.""" - self._reader = self._response.content + from aiocsv import AsyncReader + self._reader = AsyncReader(_StreamReaderToWithAsyncRead(self._response.content)) return self @@ -134,11 +135,9 @@ async def _parse_flux_response_async(self): metadata = _FluxCsvParserMetadata() try: - async for line in self._reader: - csv = list(csv_parser.reader([line.decode(_UTF_8_encoding)])) - if len(csv) >= 1: - for val in self._parse_flux_response_row(metadata, csv[0]): - yield val + async for csv in self._reader: + for val in self._parse_flux_response_row(metadata, csv): + yield val # Return latest DataFrame if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'): @@ -371,3 +370,11 @@ def _print_profiler_info(self, flux_record: FluxRecord): print(f"{name:<20}: \n\n{val}") elif val is not None: print(f"{name:<20}: {val:<20}") + + +class _StreamReaderToWithAsyncRead: + def __init__(self, response): + self.response = response + + async def read(self, size: int) -> str: + return (await self.response.read(size)).decode(_UTF_8_encoding) diff --git a/setup.py b/setup.py index 14fc3334..176e9f5a 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,8 @@ ] async_requires = [ - 'aiohttp>=3.8.1' + 'aiohttp>=3.8.1', + 'aiocsv>=1.2.2' ] with open('README.rst', 'r') as f: diff --git a/tests/test_InfluxDBClientAsync.py b/tests/test_InfluxDBClientAsync.py index d347d19e..a40aac95 100644 --- a/tests/test_InfluxDBClientAsync.py +++ b/tests/test_InfluxDBClientAsync.py @@ -11,6 +11,7 @@ from influxdb_client import Point, WritePrecision, BucketsService from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync +from influxdb_client.client.query_api import QueryOptions from influxdb_client.client.warnings import MissingPivotFunction from tests.base_test import generate_name @@ -312,6 +313,48 @@ async def test_query_and_debug(self): results = await buckets_service.get_buckets() self.assertIn("my-bucket", list(map(lambda bucket: bucket.name, results.buckets))) + @async_test + @aioresponses() + async def test_parse_csv_with_new_lines_in_column(self, mocked): + await self.client.close() + self.client = InfluxDBClientAsync("http://localhost") + mocked.post('http://localhost/api/v2/query?org=my-org', status=200, body='''#datatype,string,long,dateTime:RFC3339 +#group,false,false,false +#default,_result,, +,result,table,_time +,,0,2022-09-09T10:22:13.744147091Z + +#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,long,long,string +#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false +#default,_profiler,,,,,,,,,,,,,,, +,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,influxdb/scanned-bytes,influxdb/scanned-values,flux/query-plan +,,0,profiler/query,17305459,6292042,116958,0,0,10758125,0,448,0,,0,0,"digraph { + ""ReadRange4"" + ""keep2"" + ""limit3"" + + ""ReadRange4"" -> ""keep2"" + ""keep2"" -> ""limit3"" +} + +" + +#datatype,string,long,string,string,string,long,long,long,long,double +#group,false,false,true,false,false,false,false,false,false,false +#default,_profiler,,,,,,,,, +,result,table,_measurement,Type,Label,Count,MinDuration,MaxDuration,DurationSum,MeanDuration +,,1,profiler/operator,*influxdb.readFilterSource,ReadRange4,1,888209,888209,888209,888209 +,,1,profiler/operator,*universe.schemaMutationTransformation,keep2,4,1875,42042,64209,16052.25 +,,1,profiler/operator,*universe.limitTransformation,limit3,3,1333,38750,47874,15958''') + + records = [] + await self.client\ + .query_api(QueryOptions(profilers=["operator", "query"], + profiler_callback=lambda record: records.append(record))) \ + .query("buckets()", "my-org") + + self.assertEqual(4, len(records)) + async def _prepare_data(self, measurement: str): _point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3) _point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3)