Skip to content

Commit ff5e8d0

Browse files
committed
feat: Add support for stream response (#30)
1 parent bc2813b commit ff5e8d0

File tree

3 files changed

+98
-30
lines changed

3 files changed

+98
-30
lines changed

influxdb_client/client/flux_csv_parser.py

+29-21
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
import codecs
33
import csv as csv_parser
44

5-
from dateutil.parser import parse as timestamp_parser
65
import ciso8601
6+
from urllib3 import HTTPResponse
77

88
from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord
99

@@ -20,21 +20,32 @@ class FluxCsvParserException(Exception):
2020

2121
class FluxCsvParser(object):
2222

23-
def __init__(self) -> None:
23+
def __init__(self, response: HTTPResponse, stream: bool) -> None:
24+
self._response = response
25+
self.tables = []
26+
self._stream = stream
2427
pass
2528

26-
def parse_flux_response(self, response, cancellable, consumer):
29+
def __enter__(self):
30+
self._reader = csv_parser.reader(codecs.iterdecode(self._response, 'utf-8'))
31+
return self
32+
33+
def __exit__(self, exc_type, exc_val, exc_tb):
34+
self._response.close()
35+
36+
def generator(self):
37+
with self as parser:
38+
yield from parser._parse_flux_response()
39+
40+
def _parse_flux_response(self):
2741
table_index = 0
2842
start_new_table = False
2943
table = None
3044
parsing_state_error = False
31-
reader = csv_parser.reader(codecs.iterdecode(response, 'utf-8'))
3245

33-
for csv in reader:
46+
for csv in self._reader:
3447
# debug
3548
# print("parsing: ", csv)
36-
if (cancellable is not None) and cancellable.canceled:
37-
return
3849

3950
# Response has HTTP status ok, but response is error.
4051
if len(csv) < 1:
@@ -55,7 +66,7 @@ def parse_flux_response(self, response, cancellable, consumer):
5566
if "#datatype" == token:
5667
start_new_table = True
5768
table = FluxTable()
58-
consumer.accept_table(index=table_index, cancellable=cancellable, flux_table=table)
69+
self._insert_table(table, table_index)
5970
table_index = table_index + 1
6071
elif table is None:
6172
raise FluxCsvParserException("Unable to parse CSV response. FluxTable definition was not found.")
@@ -85,11 +96,16 @@ def parse_flux_response(self, response, cancellable, consumer):
8596
flux_columns = table.columns
8697
table = FluxTable()
8798
table.columns.extend(flux_columns)
88-
consumer.accept_table(table_index, cancellable, table)
99+
self._insert_table(table, table_index)
89100
table_index = table_index + 1
90101

91102
flux_record = self.parse_record(table_index - 1, table, csv)
92-
consumer.accept_record(table_index - 1, cancellable, flux_record)
103+
104+
if not self._stream:
105+
self.tables[table_index - 1].records.append(flux_record)
106+
107+
yield flux_record
108+
93109
# debug
94110
# print(flux_record)
95111

@@ -163,14 +179,6 @@ def add_column_names_and_tags(table, csv):
163179
column.label = csv[i]
164180
i += 1
165181

166-
167-
class FluxResponseConsumerTable:
168-
169-
def __init__(self) -> None:
170-
self.tables = []
171-
172-
def accept_table(self, index, cancellable, flux_table):
173-
self.tables.insert(index, flux_table)
174-
175-
def accept_record(self, index, cancellable, flux_record):
176-
self.tables[index].records.append(flux_record)
182+
def _insert_table(self, table, table_index):
183+
if not self._stream:
184+
self.tables.insert(table_index, table)

influxdb_client/client/query_api.py

+15-9
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import codecs
22
import csv
3-
from typing import List
3+
from typing import List, Union, Iterable
44

55
from influxdb_client import Dialect
66
from influxdb_client import Query, QueryService
7-
from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxResponseConsumerTable
8-
from influxdb_client.client.flux_table import FluxTable
7+
from influxdb_client.client.flux_csv_parser import FluxCsvParser
8+
from influxdb_client.client.flux_table import FluxTable, FluxRecord
99

1010

1111
class QueryApi(object):
@@ -35,6 +35,7 @@ def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect):
3535
org = self._influxdb_client.org
3636
response = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False,
3737
_preload_content=False)
38+
3839
return csv.reader(codecs.iterdecode(response, 'utf-8'))
3940

4041
def query_raw(self, query: str, org=None, dialect=default_dialect):
@@ -50,27 +51,32 @@ def query_raw(self, query: str, org=None, dialect=default_dialect):
5051
org = self._influxdb_client.org
5152
result = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False,
5253
_preload_content=False)
54+
5355
return result
54-
# return codecs.iterdecode(result, 'utf-8')
5556

56-
def query(self, query: str, org=None, dialect=default_dialect) -> List['FluxTable']:
57+
def query(self, query: str, org=None, dialect=default_dialect, stream=False) \
58+
-> Union[List['FluxTable'], Iterable['FluxRecord']]:
5759
"""
5860
Synchronously executes the Flux query and return result as a List['FluxTable']
5961
6062
:param query: the Flux query
6163
:param org: organization name (optional if already specified in InfluxDBClient)
6264
:param dialect: csv dialect format
65+
:param stream: csv dialect format
6366
:return:
6467
"""
6568
if org is None:
6669
org = self._influxdb_client.org
6770
response = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False,
6871
_preload_content=False, _return_http_data_only=False)
69-
consumer = FluxResponseConsumerTable()
70-
parser = FluxCsvParser()
7172

72-
parser.parse_flux_response(response=response, cancellable=None, consumer=consumer)
73-
return consumer.tables
73+
_parser = FluxCsvParser(response=response, stream=stream)
74+
if stream:
75+
return _parser.generator()
76+
77+
list(_parser.generator())
78+
79+
return _parser.tables
7480

7581
# private helper for c
7682
@staticmethod

tests/test_QueryApiStream.py

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import itertools
2+
import time
3+
import types
4+
5+
from influxdb_client import WritePrecision
6+
from influxdb_client.client.write_api import SYNCHRONOUS
7+
from tests.base_test import BaseTest
8+
9+
10+
class QueryStreamApi(BaseTest):
11+
12+
def setUp(self) -> None:
13+
super().setUp()
14+
self.write_client = self.client.write_api(write_options=SYNCHRONOUS)
15+
self.bucket = self.create_test_bucket()
16+
17+
def tearDown(self) -> None:
18+
self.write_client.__del__()
19+
super().tearDown()
20+
21+
def test_block(self):
22+
self._prepareData()
23+
24+
_result = self.query_api.query(
25+
f'from(bucket:"{self.bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z)', self.org)
26+
27+
self.assertEqual(len(_result), 1)
28+
self.assertEqual(len(_result[0].records), 100)
29+
30+
def test_stream(self):
31+
self._prepareData()
32+
33+
_result = self.query_api.query(
34+
f'from(bucket:"{self.bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z)', self.org, stream=True)
35+
36+
self.assertTrue(isinstance(_result, types.GeneratorType))
37+
_result_list = list(_result)
38+
39+
self.assertEqual(len(_result_list), 100)
40+
41+
def test_stream_break(self):
42+
self._prepareData()
43+
44+
_result = self.query_api.query(
45+
f'from(bucket:"{self.bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z)', self.org, stream=True)
46+
47+
_result_list = list(itertools.islice(_result, 10))
48+
49+
self.assertEqual(len(_result_list), 10)
50+
51+
def _prepareData(self):
52+
_list = [f'h2o_feet,location=coyote_creek water_level={x} {x}' for x in range(1, 101)]
53+
self.write_client.write(self.bucket.name, self.org, _list, write_precision=WritePrecision.S)
54+
time.sleep(1)

0 commit comments

Comments
 (0)