Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit 7c858bf

Browse files
authored
feat(dataframe_client): handle np.nan, np.inf values in DataFrameClient (#812)
* feat(dataframe_client): handle np.nan, np.inf values in DataFrameClient * chore(dataframe): handle cases where tagset is empty * chore(dataframe): add tests for Nan lines but with tag values
1 parent 516274f commit 7c858bf

File tree

3 files changed

+147
-16
lines changed

3 files changed

+147
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1717
- Add support for messagepack (#734 thx @lovasoa)
1818
- Add support for 'show series' (#357 thx @gaker)
1919
- Add support for custom request session in InfluxDBClient (#360 thx @dschien)
20+
- Add support for handling np.nan and np.inf values in DataFrameClient (#436 thx @nmerket)
2021

2122
### Changed
2223
- Clean up stale CI config (#755)

influxdb/_dataframe_client.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -270,14 +270,31 @@ def _convert_dataframe_to_json(dataframe,
270270
"h": 1e9 * 3600,
271271
}.get(time_precision, 1)
272272

273+
if not tag_columns:
274+
points = [
275+
{'measurement': measurement,
276+
'fields':
277+
rec.replace([np.inf, -np.inf], np.nan).dropna().to_dict(),
278+
'time': np.int64(ts.value / precision_factor)}
279+
for ts, (_, rec) in zip(
280+
dataframe.index,
281+
dataframe[field_columns].iterrows()
282+
)
283+
]
284+
285+
return points
286+
273287
points = [
274288
{'measurement': measurement,
275289
'tags': dict(list(tag.items()) + list(tags.items())),
276-
'fields': rec,
290+
'fields':
291+
rec.replace([np.inf, -np.inf], np.nan).dropna().to_dict(),
277292
'time': np.int64(ts.value / precision_factor)}
278-
for ts, tag, rec in zip(dataframe.index,
279-
dataframe[tag_columns].to_dict('record'),
280-
dataframe[field_columns].to_dict('record'))
293+
for ts, tag, (_, rec) in zip(
294+
dataframe.index,
295+
dataframe[tag_columns].to_dict('record'),
296+
dataframe[field_columns].iterrows()
297+
)
281298
]
282299

283300
return points
@@ -379,21 +396,18 @@ def _convert_dataframe_to_lines(self,
379396
tags = ''
380397

381398
# Make an array of formatted field keys and values
382-
field_df = dataframe[field_columns]
383-
# Keep the positions where Null values are found
384-
mask_null = field_df.isnull().values
399+
field_df = dataframe[field_columns].replace([np.inf, -np.inf], np.nan)
400+
nans = pd.isnull(field_df)
385401

386402
field_df = self._stringify_dataframe(field_df,
387403
numeric_precision,
388404
datatype='field')
389405

390406
field_df = (field_df.columns.values + '=').tolist() + field_df
391-
field_df[field_df.columns[1:]] = ',' + field_df[
392-
field_df.columns[1:]]
393-
field_df = field_df.where(~mask_null, '') # drop Null entries
394-
fields = field_df.sum(axis=1)
395-
# take out leading , where first column has a Null value
396-
fields = fields.str.lstrip(",")
407+
field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
408+
field_df[nans] = ''
409+
410+
fields = field_df.sum(axis=1).map(lambda x: x.lstrip(','))
397411
del field_df
398412

399413
# Generate line protocol string

influxdb/tests/dataframe_client_test.py

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@
1313
import warnings
1414
import requests_mock
1515

16-
from influxdb.tests import skip_if_pypy, using_pypy
1716
from nose.tools import raises
17+
from influxdb.tests import skip_if_pypy, using_pypy
1818

1919
from .client_test import _mocked_session
2020

2121
if not using_pypy:
2222
import pandas as pd
2323
from pandas.util.testing import assert_frame_equal
2424
from influxdb import DataFrameClient
25-
import numpy
25+
import numpy as np
2626

2727

2828
@skip_if_pypy
@@ -462,7 +462,7 @@ def test_write_points_from_dataframe_with_numeric_precision(self):
462462
["2", 2, 2.2222222222222]],
463463
index=[now, now + timedelta(hours=1)])
464464

465-
if numpy.lib.NumpyVersion(numpy.__version__) <= '1.13.3':
465+
if np.lib.NumpyVersion(np.__version__) <= '1.13.3':
466466
expected_default_precision = (
467467
b'foo,hello=there 0=\"1\",1=1i,2=1.11111111111 0\n'
468468
b'foo,hello=there 0=\"2\",1=2i,2=2.22222222222 3600000000000\n'
@@ -1032,3 +1032,119 @@ def test_dsn_constructor(self):
10321032
client = DataFrameClient.from_dsn('influxdb://localhost:8086')
10331033
self.assertIsInstance(client, DataFrameClient)
10341034
self.assertEqual('http://localhost:8086', client._baseurl)
1035+
1036+
def test_write_points_from_dataframe_with_nan_line(self):
1037+
"""Test write points from dataframe with Nan lines."""
1038+
now = pd.Timestamp('1970-01-01 00:00+00:00')
1039+
dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]],
1040+
index=[now, now + timedelta(hours=1)],
1041+
columns=["column_one", "column_two",
1042+
"column_three"])
1043+
expected = (
1044+
b"foo column_one=\"1\",column_two=1i 0\n"
1045+
b"foo column_one=\"2\",column_two=2i "
1046+
b"3600000000000\n"
1047+
)
1048+
1049+
with requests_mock.Mocker() as m:
1050+
m.register_uri(requests_mock.POST,
1051+
"http://localhost:8086/write",
1052+
status_code=204)
1053+
1054+
cli = DataFrameClient(database='db')
1055+
1056+
cli.write_points(dataframe, 'foo', protocol='line')
1057+
self.assertEqual(m.last_request.body, expected)
1058+
1059+
cli.write_points(dataframe, 'foo', tags=None, protocol='line')
1060+
self.assertEqual(m.last_request.body, expected)
1061+
1062+
def test_write_points_from_dataframe_with_nan_json(self):
1063+
"""Test write points from json with NaN lines."""
1064+
now = pd.Timestamp('1970-01-01 00:00+00:00')
1065+
dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]],
1066+
index=[now, now + timedelta(hours=1)],
1067+
columns=["column_one", "column_two",
1068+
"column_three"])
1069+
expected = (
1070+
b"foo column_one=\"1\",column_two=1i 0\n"
1071+
b"foo column_one=\"2\",column_two=2i "
1072+
b"3600000000000\n"
1073+
)
1074+
1075+
with requests_mock.Mocker() as m:
1076+
m.register_uri(requests_mock.POST,
1077+
"http://localhost:8086/write",
1078+
status_code=204)
1079+
1080+
cli = DataFrameClient(database='db')
1081+
1082+
cli.write_points(dataframe, 'foo', protocol='json')
1083+
self.assertEqual(m.last_request.body, expected)
1084+
1085+
cli.write_points(dataframe, 'foo', tags=None, protocol='json')
1086+
self.assertEqual(m.last_request.body, expected)
1087+
1088+
def test_write_points_from_dataframe_with_tags_and_nan_line(self):
1089+
"""Test write points from dataframe with NaN lines and tags."""
1090+
now = pd.Timestamp('1970-01-01 00:00+00:00')
1091+
dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, np.inf],
1092+
['red', 0, "2", 2, np.nan]],
1093+
index=[now, now + timedelta(hours=1)],
1094+
columns=["tag_one", "tag_two", "column_one",
1095+
"column_two", "column_three"])
1096+
expected = (
1097+
b"foo,tag_one=blue,tag_two=1 "
1098+
b"column_one=\"1\",column_two=1i "
1099+
b"0\n"
1100+
b"foo,tag_one=red,tag_two=0 "
1101+
b"column_one=\"2\",column_two=2i "
1102+
b"3600000000000\n"
1103+
)
1104+
1105+
with requests_mock.Mocker() as m:
1106+
m.register_uri(requests_mock.POST,
1107+
"http://localhost:8086/write",
1108+
status_code=204)
1109+
1110+
cli = DataFrameClient(database='db')
1111+
1112+
cli.write_points(dataframe, 'foo', protocol='line',
1113+
tag_columns=['tag_one', 'tag_two'])
1114+
self.assertEqual(m.last_request.body, expected)
1115+
1116+
cli.write_points(dataframe, 'foo', tags=None, protocol='line',
1117+
tag_columns=['tag_one', 'tag_two'])
1118+
self.assertEqual(m.last_request.body, expected)
1119+
1120+
def test_write_points_from_dataframe_with_tags_and_nan_json(self):
1121+
"""Test write points from json with NaN lines and tags."""
1122+
now = pd.Timestamp('1970-01-01 00:00+00:00')
1123+
dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, np.inf],
1124+
['red', 0, "2", 2, np.nan]],
1125+
index=[now, now + timedelta(hours=1)],
1126+
columns=["tag_one", "tag_two", "column_one",
1127+
"column_two", "column_three"])
1128+
expected = (
1129+
b"foo,tag_one=blue,tag_two=1 "
1130+
b"column_one=\"1\",column_two=1i "
1131+
b"0\n"
1132+
b"foo,tag_one=red,tag_two=0 "
1133+
b"column_one=\"2\",column_two=2i "
1134+
b"3600000000000\n"
1135+
)
1136+
1137+
with requests_mock.Mocker() as m:
1138+
m.register_uri(requests_mock.POST,
1139+
"http://localhost:8086/write",
1140+
status_code=204)
1141+
1142+
cli = DataFrameClient(database='db')
1143+
1144+
cli.write_points(dataframe, 'foo', protocol='json',
1145+
tag_columns=['tag_one', 'tag_two'])
1146+
self.assertEqual(m.last_request.body, expected)
1147+
1148+
cli.write_points(dataframe, 'foo', tags=None, protocol='json',
1149+
tag_columns=['tag_one', 'tag_two'])
1150+
self.assertEqual(m.last_request.body, expected)

0 commit comments

Comments
 (0)