Skip to content

Commit 7a3975c

Browse files
author
Noel Merket
committed
1 parent 8e02ea8 commit 7a3975c

File tree

3 files changed

+60
-5
lines changed

3 files changed

+60
-5
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ docs/build/
2121
.coverage
2222
cover
2323
env
24+
.idea

influxdb/_dataframe_client.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import math
1111

1212
import pandas as pd
13+
import numpy as np
1314

1415
from .client import InfluxDBClient
1516
from .line_protocol import _escape_tag
@@ -213,11 +214,11 @@ def _convert_dataframe_to_json(self,
213214
points = [
214215
{'measurement': measurement,
215216
'tags': dict(list(tag.items()) + list(tags.items())),
216-
'fields': rec,
217+
'fields': rec.replace([np.inf, -np.inf], np.nan).dropna().to_dict(),
217218
'time': int(ts.value / precision_factor)}
218-
for ts, tag, rec in zip(dataframe.index,
219+
for ts, tag, (_, rec) in zip(dataframe.index,
219220
dataframe[tag_columns].to_dict('record'),
220-
dataframe[field_columns].to_dict('record'))
221+
dataframe[field_columns].iterrows())
221222
]
222223

223224
return points
@@ -311,12 +312,14 @@ def _convert_dataframe_to_lines(self,
311312
tags = ''
312313

313314
# Make an array of formatted field keys and values
314-
field_df = dataframe[field_columns]
315+
field_df = dataframe[field_columns].replace([np.inf, -np.inf], np.nan)
316+
nans = pd.isnull(field_df)
315317
field_df = self._stringify_dataframe(
316318
field_df, numeric_precision, datatype='field')
317319
field_df = (field_df.columns.values + '=').tolist() + field_df
318320
field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
319-
fields = field_df.sum(axis=1)
321+
field_df[nans] = ''
322+
fields = field_df.sum(axis=1).map(lambda x: x.lstrip(','))
320323
del field_df
321324

322325
# Generate line protocol string

influxdb/tests/dataframe_client_test.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import pandas as pd
2222
from pandas.util.testing import assert_frame_equal
2323
from influxdb import DataFrameClient
24+
import numpy as np
2425

2526

2627
@skipIfPYpy
@@ -571,3 +572,53 @@ def test_dsn_constructor(self):
571572
client = DataFrameClient.from_DSN('influxdb://localhost:8086')
572573
self.assertIsInstance(client, DataFrameClient)
573574
self.assertEqual('http://localhost:8086', client._baseurl)
575+
576+
def test_write_points_from_dataframe_with_nan_line(self):
577+
now = pd.Timestamp('1970-01-01 00:00+00:00')
578+
dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]],
579+
index=[now, now + timedelta(hours=1)],
580+
columns=["column_one", "column_two",
581+
"column_three"])
582+
expected = (
583+
b"foo column_one=\"1\",column_two=1i 0\n"
584+
b"foo column_one=\"2\",column_two=2i "
585+
b"3600000000000\n"
586+
)
587+
588+
with requests_mock.Mocker() as m:
589+
m.register_uri(requests_mock.POST,
590+
"http://localhost:8086/write",
591+
status_code=204)
592+
593+
cli = DataFrameClient(database='db')
594+
595+
cli.write_points(dataframe, 'foo', protocol='line')
596+
self.assertEqual(m.last_request.body, expected)
597+
598+
cli.write_points(dataframe, 'foo', tags=None, protocol='line')
599+
self.assertEqual(m.last_request.body, expected)
600+
601+
def test_write_points_from_dataframe_with_nan_json(self):
602+
now = pd.Timestamp('1970-01-01 00:00+00:00')
603+
dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]],
604+
index=[now, now + timedelta(hours=1)],
605+
columns=["column_one", "column_two",
606+
"column_three"])
607+
expected = (
608+
b"foo column_one=\"1\",column_two=1i 0\n"
609+
b"foo column_one=\"2\",column_two=2i "
610+
b"3600000000000\n"
611+
)
612+
613+
with requests_mock.Mocker() as m:
614+
m.register_uri(requests_mock.POST,
615+
"http://localhost:8086/write",
616+
status_code=204)
617+
618+
cli = DataFrameClient(database='db')
619+
620+
cli.write_points(dataframe, 'foo', protocol='json')
621+
self.assertEqual(m.last_request.body, expected)
622+
623+
cli.write_points(dataframe, 'foo', tags=None, protocol='json')
624+
self.assertEqual(m.last_request.body, expected)

0 commit comments

Comments
 (0)