Skip to content

Commit eedf2cd

Browse files
committed
fix: Fixed serialization of DataFrame with empty (NaN) values #118
1 parent 1bb088f commit eedf2cd

File tree

4 files changed

+165
-56
lines changed

4 files changed

+165
-56
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
### Bug Fixes
88
1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point
9+
1. [#118](https://github.com/influxdata/influxdb-client-python/issues/118): Fixed serialization of DataFrame with empty (NaN) values
910

1011
## 1.8.0 [2020-06-19]
1112

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import re
2+
from functools import reduce
3+
from itertools import chain
4+
5+
from influxdb_client.client.write.point import _ESCAPE_KEY
6+
7+
"""
8+
Functions for serialize Pandas DataFrame.
9+
Much of the code here is inspired by that in the aioinflux packet found here: https://github.com/gusutabopb/aioinflux
10+
"""
11+
12+
13+
def _replace(data_frame):
14+
from ...extras import np
15+
16+
# string columns
17+
obj_cols = {k for k, v in dict(data_frame.dtypes).items() if v is np.dtype('O')}
18+
19+
# number columns
20+
other_cols = set(data_frame.columns) - obj_cols
21+
22+
obj_nans = (f'{k}=nan' for k in obj_cols)
23+
other_nans = (f'{k}=nani?' for k in other_cols)
24+
25+
replacements = [
26+
('|'.join(chain(obj_nans, other_nans)), ''),
27+
(',{2,}', ','),
28+
('|'.join([', ,', ', ', ' ,']), ' '),
29+
]
30+
31+
return replacements
32+
33+
34+
def _itertuples(data_frame):
35+
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
36+
return zip(data_frame.index, *cols)
37+
38+
39+
def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
40+
from ...extras import pd, np
41+
if not isinstance(data_frame, pd.DataFrame):
42+
raise TypeError('Must be DataFrame, but type was: {0}.'
43+
.format(type(data_frame)))
44+
45+
if 'data_frame_measurement_name' not in kwargs:
46+
raise TypeError('"data_frame_measurement_name" is a Required Argument')
47+
48+
if isinstance(data_frame.index, pd.PeriodIndex):
49+
data_frame.index = data_frame.index.to_timestamp()
50+
else:
51+
data_frame.index = pd.to_datetime(data_frame.index)
52+
53+
if data_frame.index.tzinfo is None:
54+
data_frame.index = data_frame.index.tz_localize('UTC')
55+
56+
measurement_name = kwargs.get('data_frame_measurement_name')
57+
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
58+
data_frame_tag_columns = set(data_frame_tag_columns or [])
59+
60+
tags = []
61+
fields = []
62+
63+
if point_settings.defaultTags:
64+
for key, value in point_settings.defaultTags.items():
65+
data_frame[key] = value
66+
data_frame_tag_columns.add(key)
67+
68+
for index, (key, value) in enumerate(data_frame.dtypes.items()):
69+
key = str(key).translate(_ESCAPE_KEY)
70+
71+
if key in data_frame_tag_columns:
72+
tags.append(f"{key}={{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}")
73+
elif issubclass(value.type, np.integer):
74+
fields.append(f"{key}={{p[{index + 1}]}}i")
75+
elif issubclass(value.type, (np.float, np.bool_)):
76+
fields.append(f"{key}={{p[{index + 1}]}}")
77+
else:
78+
fields.append(f"{key}=\"{{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}\"")
79+
80+
fmt = (f'{measurement_name}', f'{"," if tags else ""}', ','.join(tags),
81+
' ', ','.join(fields), ' {p[0].value}')
82+
f = eval("lambda p: f'{}'".format(''.join(fmt)))
83+
84+
for k, v in dict(data_frame.dtypes).items():
85+
if k in data_frame_tag_columns:
86+
data_frame[k].replace('', np.nan, inplace=True)
87+
88+
isnull = data_frame.isnull().any(axis=1)
89+
90+
if isnull.any():
91+
rep = _replace(data_frame)
92+
lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
93+
for p in _itertuples(data_frame))
94+
return list(lp)
95+
else:
96+
return list(map(f, _itertuples(data_frame)))

influxdb_client/client/write_api.py

+6-55
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515

1616
from influxdb_client import WritePrecision, WriteService
1717
from influxdb_client.client.abstract_client import AbstractClient
18-
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, _ESCAPE_KEY
18+
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
19+
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
1920
from influxdb_client.rest import ApiException
2021

2122
logger = logging.getLogger(__name__)
@@ -253,9 +254,10 @@ def _serialize(self, record, write_precision, payload, **kwargs):
253254
self._serialize(record.to_line_protocol(), record.write_precision, payload, **kwargs)
254255

255256
elif isinstance(record, dict):
256-
self._serialize(Point.from_dict(record, write_precision=write_precision), write_precision, payload, **kwargs)
257+
self._serialize(Point.from_dict(record, write_precision=write_precision), write_precision, payload,
258+
**kwargs)
257259
elif 'DataFrame' in type(record).__name__:
258-
_data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs)
260+
_data = data_frame_to_list_of_points(record, self._point_settings, **kwargs)
259261
self._serialize(_data, write_precision, payload, **kwargs)
260262

261263
elif isinstance(record, list):
@@ -281,7 +283,7 @@ def _write_batching(self, bucket, org, data,
281283
precision, **kwargs)
282284

283285
elif 'DataFrame' in type(data).__name__:
284-
self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, precision, **kwargs),
286+
self._write_batching(bucket, org, data_frame_to_list_of_points(data, self._point_settings, **kwargs),
285287
precision, **kwargs)
286288

287289
elif isinstance(data, list):
@@ -303,57 +305,6 @@ def _append_default_tag(self, key, val, record):
303305
for item in record:
304306
self._append_default_tag(key, val, item)
305307

306-
def _itertuples(self, data_frame):
307-
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
308-
return zip(data_frame.index, *cols)
309-
310-
def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
311-
from ..extras import pd, np
312-
if not isinstance(data_frame, pd.DataFrame):
313-
raise TypeError('Must be DataFrame, but type was: {0}.'
314-
.format(type(data_frame)))
315-
316-
if 'data_frame_measurement_name' not in kwargs:
317-
raise TypeError('"data_frame_measurement_name" is a Required Argument')
318-
319-
if isinstance(data_frame.index, pd.PeriodIndex):
320-
data_frame.index = data_frame.index.to_timestamp()
321-
else:
322-
data_frame.index = pd.to_datetime(data_frame.index)
323-
324-
if data_frame.index.tzinfo is None:
325-
data_frame.index = data_frame.index.tz_localize('UTC')
326-
327-
measurement_name = kwargs.get('data_frame_measurement_name')
328-
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
329-
data_frame_tag_columns = set(data_frame_tag_columns or [])
330-
331-
tags = []
332-
fields = []
333-
334-
if self._point_settings.defaultTags:
335-
for key, value in self._point_settings.defaultTags.items():
336-
data_frame[key] = value
337-
data_frame_tag_columns.add(key)
338-
339-
for index, (key, value) in enumerate(data_frame.dtypes.items()):
340-
key = str(key).translate(_ESCAPE_KEY)
341-
342-
if key in data_frame_tag_columns:
343-
tags.append(f"{key}={{p[{index + 1}].translate(_ESCAPE_KEY)}}")
344-
elif issubclass(value.type, np.integer):
345-
fields.append(f"{key}={{p[{index + 1}]}}i")
346-
elif issubclass(value.type, (np.float, np.bool_)):
347-
fields.append(f"{key}={{p[{index + 1}]}}")
348-
else:
349-
fields.append(f"{key}=\"{{p[{index + 1}].translate(_ESCAPE_KEY)}}\"")
350-
351-
fmt = (f'{measurement_name}', f'{"," if tags else ""}', ','.join(tags),
352-
' ', ','.join(fields), ' {p[0].value}')
353-
f = eval("lambda p: f'{}'".format(''.join(fmt)))
354-
355-
return list(map(f, self._itertuples(data_frame)))
356-
357308
def _http(self, batch_item: _BatchItem):
358309

359310
logger.debug("Write time series data into InfluxDB: %s", batch_item)

tests/test_WriteApiDataFrame.py

+62-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
from datetime import timedelta
66

77
from influxdb_client import InfluxDBClient, WriteOptions, WriteApi
8-
from influxdb_client.client.write_api import SYNCHRONOUS
8+
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
9+
from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings
910
from tests.base_test import BaseTest
1011

1112

@@ -86,3 +87,63 @@ def test_write_num_py(self):
8687
self.assertEqual(result[0].records[1].get_value(), 200.0)
8788

8889
pass
90+
91+
def test_write_nan(self):
92+
from influxdb_client.extras import pd, np
93+
94+
now = pd.Timestamp('2020-04-05 00:00+00:00')
95+
96+
data_frame = pd.DataFrame(data=[[3.1955, np.nan, 20.514305, np.nan],
97+
[5.7310, np.nan, 23.328710, np.nan],
98+
[np.nan, 3.138664, np.nan, 20.755026],
99+
[5.7310, 5.139563, 23.328710, 19.791240]],
100+
index=[now, now + timedelta(minutes=30), now + timedelta(minutes=60),
101+
now + timedelta(minutes=90)],
102+
columns=["actual_kw_price", "forecast_kw_price", "actual_general_use",
103+
"forecast_general_use"])
104+
105+
points = data_frame_to_list_of_points(data_frame=data_frame, point_settings=PointSettings(),
106+
data_frame_measurement_name='measurement')
107+
108+
self.assertEqual(4, len(points))
109+
self.assertEqual("measurement actual_kw_price=3.1955,actual_general_use=20.514305 1586044800000000000",
110+
points[0])
111+
self.assertEqual("measurement actual_kw_price=5.731,actual_general_use=23.32871 1586046600000000000",
112+
points[1])
113+
self.assertEqual("measurement forecast_kw_price=3.138664,forecast_general_use=20.755026 1586048400000000000",
114+
points[2])
115+
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=5.139563,actual_general_use=23.32871,"
116+
"forecast_general_use=19.79124 1586050200000000000",
117+
points[3])
118+
119+
def test_write_tag_nan(self):
120+
from influxdb_client.extras import pd, np
121+
122+
now = pd.Timestamp('2020-04-05 00:00+00:00')
123+
124+
data_frame = pd.DataFrame(data=[["", 3.1955, 20.514305],
125+
['', 5.7310, 23.328710],
126+
[np.nan, 5.7310, 23.328710],
127+
["tag", 3.138664, 20.755026]],
128+
index=[now, now + timedelta(minutes=30),
129+
now + timedelta(minutes=60), now + timedelta(minutes=90)],
130+
columns=["tag", "actual_kw_price", "forecast_kw_price"])
131+
132+
write_api = self.client.write_api(write_options=SYNCHRONOUS, point_settings=PointSettings())
133+
134+
points = data_frame_to_list_of_points(data_frame=data_frame,
135+
point_settings=PointSettings(),
136+
data_frame_measurement_name='measurement',
137+
data_frame_tag_columns={"tag"})
138+
139+
self.assertEqual(4, len(points))
140+
self.assertEqual("measurement actual_kw_price=3.1955,forecast_kw_price=20.514305 1586044800000000000",
141+
points[0])
142+
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=23.32871 1586046600000000000",
143+
points[1])
144+
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=23.32871 1586048400000000000",
145+
points[2])
146+
self.assertEqual("measurement,tag=tag actual_kw_price=3.138664,forecast_kw_price=20.755026 1586050200000000000",
147+
points[3])
148+
149+
write_api.__del__()

0 commit comments

Comments
 (0)