diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 6a66558b..42a1f607 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -10,6 +10,7 @@ from collections import defaultdict import pandas as pd +import numpy as np from .client import InfluxDBClient from .line_protocol import _escape_tag @@ -256,11 +257,11 @@ def _convert_dataframe_to_json(dataframe, points = [ {'measurement': measurement, 'tags': dict(list(tag.items()) + list(tags.items())), - 'fields': rec, + 'fields': rec.replace([np.inf, -np.inf], np.nan).dropna().to_dict(), 'time': int(ts.value / precision_factor)} - for ts, tag, rec in zip(dataframe.index, + for ts, tag, (_, rec) in zip(dataframe.index, dataframe[tag_columns].to_dict('record'), - dataframe[field_columns].to_dict('record')) + dataframe[field_columns].iterrows()) ] return points @@ -356,13 +357,14 @@ def _convert_dataframe_to_lines(self, tags = '' # Make an array of formatted field keys and values - field_df = dataframe[field_columns] - field_df = self._stringify_dataframe(field_df, - numeric_precision, - datatype='field') + field_df = dataframe[field_columns].replace([np.inf, -np.inf], np.nan) + nans = pd.isnull(field_df) + field_df = self._stringify_dataframe( + field_df, numeric_precision, datatype='field') field_df = (field_df.columns.values + '=').tolist() + field_df field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]] - fields = field_df.sum(axis=1) + field_df[nans] = '' + fields = field_df.sum(axis=1).map(lambda x: x.lstrip(',')) del field_df # Generate line protocol string diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index 02aaac5f..3c0a3429 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -22,6 +22,7 @@ import pandas as pd from pandas.util.testing import assert_frame_equal from influxdb import DataFrameClient + import numpy as np @skipIfPYpy @@ -592,3 +593,53 @@ def test_dsn_constructor(self): client = DataFrameClient.from_dsn('influxdb://localhost:8086') self.assertIsInstance(client, DataFrameClient) self.assertEqual('http://localhost:8086', client._baseurl) + + def test_write_points_from_dataframe_with_nan_line(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]], + index=[now, now + timedelta(hours=1)], + columns=["column_one", "column_two", + "column_three"]) + expected = ( + b"foo column_one=\"1\",column_two=1i 0\n" + b"foo column_one=\"2\",column_two=2i " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', protocol='line') + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', tags=None, protocol='line') + self.assertEqual(m.last_request.body, expected) + + def test_write_points_from_dataframe_with_nan_json(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]], + index=[now, now + timedelta(hours=1)], + columns=["column_one", "column_two", + "column_three"]) + expected = ( + b"foo column_one=\"1\",column_two=1i 0\n" + b"foo column_one=\"2\",column_two=2i " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', protocol='json') + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', tags=None, protocol='json') + self.assertEqual(m.last_request.body, expected)