Skip to content

Commit 9e96c58

Browse files
committed
feat: Add on-fly parsing of response to pandas (#29)
1 parent 6407789 commit 9e96c58

File tree

3 files changed

+209
-26
lines changed

3 files changed

+209
-26
lines changed

influxdb_client/client/flux_csv_parser.py

+33-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import base64
22
import codecs
33
import csv as csv_parser
4+
from enum import Enum
45

56
import ciso8601
7+
from pandas import DataFrame
68
from urllib3 import HTTPResponse
79

810
from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord
@@ -18,12 +20,18 @@ class FluxCsvParserException(Exception):
1820
pass
1921

2022

23+
class FluxSerializationMode(Enum):
24+
tables = 1
25+
stream = 2
26+
dataFrame = 3
27+
28+
2129
class FluxCsvParser(object):
2230

23-
def __init__(self, response: HTTPResponse, stream: bool) -> None:
31+
def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode) -> None:
2432
self._response = response
2533
self.tables = []
26-
self._stream = stream
34+
self._serialization_mode = serialization_mode
2735
pass
2836

2937
def __enter__(self):
@@ -64,6 +72,11 @@ def _parse_flux_response(self):
6472
token = csv[0]
6573
# start new table
6674
if "#datatype" == token:
75+
76+
# Return already parsed DataFrame
77+
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_dataFrame'):
78+
yield self._dataFrame
79+
6780
start_new_table = True
6881
table = FluxTable()
6982
self._insert_table(table, table_index)
@@ -86,6 +99,12 @@ def _parse_flux_response(self):
8699
if start_new_table:
87100
self.add_column_names_and_tags(table, csv)
88101
start_new_table = False
102+
# Create DataFrame with default values
103+
if self._serialization_mode is FluxSerializationMode.dataFrame:
104+
self._dataFrame = DataFrame(data=[], columns=[], index=None)
105+
for column in table.columns:
106+
self._dataFrame[column.label] = column.default_value
107+
pass
89108
continue
90109

91110
# to int converions todo
@@ -101,14 +120,23 @@ def _parse_flux_response(self):
101120

102121
flux_record = self.parse_record(table_index - 1, table, csv)
103122

104-
if not self._stream:
123+
if self._serialization_mode is FluxSerializationMode.tables:
105124
self.tables[table_index - 1].records.append(flux_record)
106125

107-
yield flux_record
126+
if self._serialization_mode is FluxSerializationMode.stream:
127+
yield flux_record
128+
129+
if self._serialization_mode is FluxSerializationMode.dataFrame:
130+
self._dataFrame.loc[len(self._dataFrame.index)] = flux_record.values
131+
pass
108132

109133
# debug
110134
# print(flux_record)
111135

136+
# Return latest DataFrame
137+
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_dataFrame'):
138+
yield self._dataFrame
139+
112140
def parse_record(self, table_index, table, csv):
113141
record = FluxRecord(table_index)
114142

@@ -180,5 +208,5 @@ def add_column_names_and_tags(table, csv):
180208
i += 1
181209

182210
def _insert_table(self, table, table_index):
183-
if not self._stream:
211+
if self._serialization_mode is FluxSerializationMode.tables:
184212
self.tables.insert(table_index, table)

influxdb_client/client/query_api.py

+13-21
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22
import csv
33
from typing import List, Generator, Any
44

5-
from pandas import DataFrame
6-
75
from influxdb_client import Dialect
86
from influxdb_client import Query, QueryService
9-
from influxdb_client.client.flux_csv_parser import FluxCsvParser
7+
from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode
108
from influxdb_client.client.flux_table import FluxTable, FluxRecord
119

1210

@@ -70,7 +68,7 @@ def query(self, query: str, org=None) -> List['FluxTable']:
7068
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
7169
async_req=False, _preload_content=False, _return_http_data_only=False)
7270

73-
_parser = FluxCsvParser(response=response, stream=False)
71+
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables)
7472

7573
list(_parser.generator())
7674

@@ -90,13 +88,14 @@ def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, Non
9088
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
9189
async_req=False, _preload_content=False, _return_http_data_only=False)
9290

93-
_parser = FluxCsvParser(response=response, stream=True)
91+
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream)
9492

9593
return _parser.generator()
9694

9795
def query_data_frame(self, query: str, org=None):
9896
"""
99-
Synchronously executes the Flux query and return Pandas DataFrame
97+
Synchronously executes the Flux query and return Pandas DataFrame.
98+
Note that if a query returns more then one table than the client generates a dataframe for each of them.
10099
101100
:param query: the Flux query
102101
:param org: organization name (optional if already specified in InfluxDBClient)
@@ -105,23 +104,16 @@ def query_data_frame(self, query: str, org=None):
105104
if org is None:
106105
org = self._influxdb_client.org
107106

108-
flux_tables = self.query(query=query, org=org)
109-
110-
if len(flux_tables) == 0:
111-
return DataFrame
107+
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
108+
async_req=False, _preload_content=False, _return_http_data_only=False)
112109

113-
if len(flux_tables) > 1:
114-
raise Exception("Flux query result must contain one table.")
110+
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame)
111+
_dataFrames = list(_parser.generator())
115112

116-
table = flux_tables[0]
117-
data = []
118-
column_names = list(map(lambda c: c.label, table.columns))
119-
for record in table:
120-
row = []
121-
for column_name in column_names:
122-
row.append(record[column_name])
123-
data.append(row)
124-
return DataFrame(data=data, columns=column_names, index=None)
113+
if len(_dataFrames) == 1:
114+
return _dataFrames[0]
115+
else:
116+
return _dataFrames
125117

126118
# private helper for c
127119
@staticmethod

tests/test_QueryApiDataFrame.py

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import httpretty
2+
from pandas import DataFrame
3+
from pandas._libs.tslibs.timestamps import Timestamp
4+
5+
from influxdb_client import InfluxDBClient
6+
from tests.base_test import BaseTest
7+
8+
9+
class QueryDataFrameApi(BaseTest):
10+
11+
def setUp(self) -> None:
12+
super(QueryDataFrameApi, self).setUp()
13+
# https://github.com/gabrielfalcao/HTTPretty/issues/368
14+
import warnings
15+
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*")
16+
warnings.filterwarnings("ignore", category=PendingDeprecationWarning, message="isAlive*")
17+
18+
httpretty.enable()
19+
httpretty.reset()
20+
21+
def tearDown(self) -> None:
22+
self.client.__del__()
23+
httpretty.disable()
24+
25+
def test_one_table(self):
26+
query_response = \
27+
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
28+
'#group,false,false,true,true,false,false,true,true,true\n' \
29+
'#default,_result,,,,,,,,\n' \
30+
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
31+
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
32+
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
33+
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
34+
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
35+
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
36+
'\n\n'
37+
38+
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
39+
40+
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
41+
42+
_dataFrame = self.client.query_api().query_data_frame(
43+
'from(bucket: "my-bucket") '
44+
'|> range(start: -5s, stop: now()) '
45+
'|> filter(fn: (r) => r._measurement == "mem") '
46+
'|> filter(fn: (r) => r._field == "used")',
47+
"my-org")
48+
49+
self.assertEqual(DataFrame, type(_dataFrame))
50+
self.assertListEqual(
51+
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
52+
list(_dataFrame.columns))
53+
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrame.index))
54+
self.assertEqual(5, len(_dataFrame))
55+
self.assertEqual("_result", _dataFrame['result'][0])
56+
self.assertEqual("_result", _dataFrame['result'][1])
57+
self.assertEqual("_result", _dataFrame['result'][2])
58+
self.assertEqual("_result", _dataFrame['result'][3])
59+
self.assertEqual("_result", _dataFrame['result'][4])
60+
self.assertEqual(0, _dataFrame['table'][0], None)
61+
self.assertEqual(0, _dataFrame['table'][1], None)
62+
self.assertEqual(0, _dataFrame['table'][2], None)
63+
self.assertEqual(0, _dataFrame['table'][3], None)
64+
self.assertEqual(0, _dataFrame['table'][4], None)
65+
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][0])
66+
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][1])
67+
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][2])
68+
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][3])
69+
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][4])
70+
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][0])
71+
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][1])
72+
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][2])
73+
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][3])
74+
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][4])
75+
self.assertEqual(Timestamp('2019-11-12 08:09:05+0000'), _dataFrame['_time'][0])
76+
self.assertEqual(Timestamp('2019-11-12 08:09:06+0000'), _dataFrame['_time'][1])
77+
self.assertEqual(Timestamp('2019-11-12 08:09:07+0000'), _dataFrame['_time'][2])
78+
self.assertEqual(Timestamp('2019-11-12 08:09:08+0000'), _dataFrame['_time'][3])
79+
self.assertEqual(Timestamp('2019-11-12 08:09:09+0000'), _dataFrame['_time'][4])
80+
self.assertEqual(11125907456, _dataFrame['_value'][0])
81+
self.assertEqual(11127103488, _dataFrame['_value'][1])
82+
self.assertEqual(11127291904, _dataFrame['_value'][2])
83+
self.assertEqual(11126190080, _dataFrame['_value'][3])
84+
self.assertEqual(11127832576, _dataFrame['_value'][4])
85+
self.assertEqual('used', _dataFrame['_field'][0])
86+
self.assertEqual('used', _dataFrame['_field'][1])
87+
self.assertEqual('used', _dataFrame['_field'][2])
88+
self.assertEqual('used', _dataFrame['_field'][3])
89+
self.assertEqual('used', _dataFrame['_field'][4])
90+
self.assertEqual('mem', _dataFrame['_measurement'][0])
91+
self.assertEqual('mem', _dataFrame['_measurement'][1])
92+
self.assertEqual('mem', _dataFrame['_measurement'][2])
93+
self.assertEqual('mem', _dataFrame['_measurement'][3])
94+
self.assertEqual('mem', _dataFrame['_measurement'][4])
95+
self.assertEqual('mac.local', _dataFrame['host'][0])
96+
self.assertEqual('mac.local', _dataFrame['host'][1])
97+
self.assertEqual('mac.local', _dataFrame['host'][2])
98+
self.assertEqual('mac.local', _dataFrame['host'][3])
99+
self.assertEqual('mac.local', _dataFrame['host'][4])
100+
101+
def test_more_table(self):
102+
query_response = \
103+
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
104+
'#group,false,false,true,true,false,false,true,true,true\n' \
105+
'#default,_result,,,,,,,,\n' \
106+
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
107+
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
108+
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
109+
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
110+
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
111+
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
112+
'\n\n' \
113+
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
114+
'#group,false,false,true,true,false,false,true,true,true\n' \
115+
'#default,_result,,,,,,,,\n' \
116+
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
117+
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,6053961728,available,mem,mac.local\n' \
118+
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,6052765696,available,mem,mac.local\n' \
119+
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,6052577280,available,mem,mac.local\n' \
120+
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,6053679104,available,mem,mac.local\n' \
121+
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,6052036608,available,mem,mac.local\n' \
122+
'\n\n' \
123+
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
124+
'#group,false,false,true,true,false,false,true,true,true\n' \
125+
'#default,_result,,,,,,,,\n' \
126+
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
127+
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,18632704,free,mem,mac.local\n' \
128+
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,17420288,free,mem,mac.local\n' \
129+
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,17256448,free,mem,mac.local\n' \
130+
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,18362368,free,mem,mac.local\n' \
131+
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,16723968,free,mem,mac.local\n\n'
132+
133+
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
134+
135+
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
136+
137+
_dataFrames = self.client.query_api().query_data_frame(
138+
'from(bucket: "my-bucket") '
139+
'|> range(start: -5s, stop: now()) '
140+
'|> filter(fn: (r) => r._measurement == "mem") '
141+
'|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")',
142+
"my-org")
143+
144+
self.assertEqual(list, type(_dataFrames))
145+
self.assertEqual(len(_dataFrames), 3)
146+
147+
self.assertListEqual(
148+
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
149+
list(_dataFrames[0].columns))
150+
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[0].index))
151+
self.assertEqual(5, len(_dataFrames[0]))
152+
153+
self.assertListEqual(
154+
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
155+
list(_dataFrames[1].columns))
156+
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[1].index))
157+
self.assertEqual(5, len(_dataFrames[1]))
158+
159+
self.assertListEqual(
160+
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
161+
list(_dataFrames[2].columns))
162+
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[2].index))
163+
self.assertEqual(5, len(_dataFrames[2]))

0 commit comments

Comments
 (0)