Skip to content

Commit 0891fc5

Browse files
committed
fix: parsing response with new line in column [async/await]
1 parent 942dad6 commit 0891fc5

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

README.rst

+3-3
Original file line numberDiff line numberDiff line change
@@ -1371,12 +1371,12 @@ How to use Asyncio
13711371
.. marker-asyncio-start
13721372
13731373
Starting from version 1.27.0 for Python 3.7+ the ``influxdb-client`` package supports ``async/await`` based on
1374-
`asyncio <https://docs.python.org/3/library/asyncio.html>`_ and `aiohttp <https://docs.aiohttp.org>`_.
1375-
You can install ``aiohttp`` directly:
1374+
`asyncio <https://docs.python.org/3/library/asyncio.html>`_, `aiohttp <https://docs.aiohttp.org>`_ and `aiocsv <https://pypi.org/project/aiocsv/>`_.
1375+
You can install ``aiohttp`` and ``aiocsv`` directly:
13761376

13771377
.. code-block:: bash
13781378
1379-
$ python -m pip install influxdb-client aiohttp
1379+
$ python -m pip install influxdb-client aiohttp aiocsv
13801380
13811381
or use the ``[async]`` extra:
13821382

influxdb_client/client/flux_csv_parser.py

+13-6
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
9999

100100
async def __aenter__(self) -> 'FluxCsvParser':
101101
"""Initialize CSV reader."""
102-
self._reader = self._response.content
102+
from aiocsv import AsyncReader
103+
self._reader = AsyncReader(_StreamReaderToWithAsyncRead(self._response.content))
103104

104105
return self
105106

@@ -134,11 +135,9 @@ async def _parse_flux_response_async(self):
134135
metadata = _FluxCsvParserMetadata()
135136

136137
try:
137-
async for line in self._reader:
138-
csv = list(csv_parser.reader([line.decode(_UTF_8_encoding)]))
139-
if len(csv) >= 1:
140-
for val in self._parse_flux_response_row(metadata, csv[0]):
141-
yield val
138+
async for csv in self._reader:
139+
for val in self._parse_flux_response_row(metadata, csv):
140+
yield val
142141

143142
# Return latest DataFrame
144143
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
@@ -371,3 +370,11 @@ def _print_profiler_info(self, flux_record: FluxRecord):
371370
print(f"{name:<20}: \n\n{val}")
372371
elif val is not None:
373372
print(f"{name:<20}: {val:<20}")
373+
374+
375+
class _StreamReaderToWithAsyncRead:
376+
def __init__(self, response):
377+
self.response = response
378+
379+
async def read(self, size: int) -> str:
380+
return (await self.response.read(size)).decode(_UTF_8_encoding)

setup.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
]
4040

4141
async_requires = [
42-
'aiohttp>=3.8.1'
42+
'aiohttp>=3.8.1',
43+
'aiocsv>=1.2.2'
4344
]
4445

4546
with open('README.rst', 'r') as f:

0 commit comments

Comments
 (0)