Skip to content

Commit 11b9034

Browse files
authored
fix: Optimize serializing data into Pandas DataFrame (#74)
1 parent ab18935 commit 11b9034

File tree

3 files changed

+56
-7
lines changed

3 files changed

+56
-7
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.6.0 [unreleased]
22

3+
### Bugs
4+
1. [#72](https://github.com/influxdata/influxdb-client-python/issues/72): Optimize serializing data into Pandas DataFrame
5+
36
## 1.5.0 [2020-03-13]
47

58
### Features

influxdb_client/client/flux_csv_parser.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def __init__(self, response: HTTPResponse, serialization_mode: FluxSerialization
3434
self.tables = []
3535
self._serialization_mode = serialization_mode
3636
self._data_frame_index = data_frame_index
37+
self._data_frame_values = []
3738
pass
3839

3940
def __enter__(self):
@@ -135,20 +136,27 @@ def _parse_flux_response(self):
135136
yield flux_record
136137

137138
if self._serialization_mode is FluxSerializationMode.dataFrame:
138-
self._data_frame.loc[len(self._data_frame.index)] = flux_record.values
139+
self._data_frame_values.append(flux_record.values)
139140
pass
140141

141-
# debug
142-
# print(flux_record)
143-
144142
# Return latest DataFrame
145143
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
146144
yield self._prepare_data_frame()
147145

148146
def _prepare_data_frame(self):
147+
from ..extras import pd
148+
149+
# We have to create temporary DataFrame because we want to preserve default column values
150+
_temp_df = pd.DataFrame(self._data_frame_values)
151+
self._data_frame_values = []
152+
153+
# Custom DataFrame index
149154
if self._data_frame_index:
150155
self._data_frame = self._data_frame.set_index(self._data_frame_index)
151-
return self._data_frame
156+
_temp_df = _temp_df.set_index(self._data_frame_index)
157+
158+
# Append data
159+
return self._data_frame.append(_temp_df)
152160

153161
def parse_record(self, table_index, table, csv):
154162
record = FluxRecord(table_index)

tests/test_QueryApiDataFrame.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
import random
2+
13
import httpretty
4+
import rx
25
from pandas import DataFrame
36
from pandas._libs.tslibs.timestamps import Timestamp
7+
from rx import operators as ops
48

5-
from influxdb_client import InfluxDBClient
6-
from tests.base_test import BaseTest
9+
from influxdb_client import InfluxDBClient, Point, WritePrecision, WriteOptions
10+
from tests.base_test import BaseTest, current_milli_time
711

812

913
class QueryDataFrameApi(BaseTest):
@@ -250,3 +254,37 @@ def test_more_table_custom_index(self):
250254
Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
251255
Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[2].index))
252256
self.assertEqual(5, len(_dataFrames[2]))
257+
258+
259+
class QueryDataFrameIntegrationApi(BaseTest):
260+
261+
def test_large_amount_of_data(self):
262+
_measurement_name = "data_frame_" + str(current_milli_time())
263+
264+
def _create_point(index) -> Point:
265+
return Point(_measurement_name) \
266+
.tag("deviceType", str(random.choice(['A', 'B']))) \
267+
.tag("name", random.choice(['A', 'B'])) \
268+
.field("uuid", random.randint(0, 10_000)) \
269+
.field("co2", random.randint(0, 10_000)) \
270+
.field("humid", random.randint(0, 10_000)) \
271+
.field("lux", random.randint(0, 10_000)) \
272+
.field("water", random.randint(0, 10_000)) \
273+
.field("shine", random.randint(0, 10_000)) \
274+
.field("temp", random.randint(0, 10_000)) \
275+
.field("voc", random.randint(0, 10_000)) \
276+
.time(time=(1583828781 + index), write_precision=WritePrecision.S)
277+
278+
data = rx.range(0, 2_000).pipe(ops.map(lambda index: _create_point(index)))
279+
280+
write_api = self.client.write_api(write_options=WriteOptions(batch_size=500))
281+
write_api.write(org="my-org", bucket="my-bucket", record=data, write_precision=WritePrecision.S)
282+
write_api.__del__()
283+
284+
query = 'from(bucket: "my-bucket")' \
285+
'|> range(start: 2020-02-19T23:30:00Z, stop: now())' \
286+
f'|> filter(fn: (r) => r._measurement == "{_measurement_name}")'
287+
288+
result = self.client.query_api().query_data_frame(org="my-org", query=query)
289+
290+
self.assertGreater(len(result), 1)

0 commit comments

Comments
 (0)