Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

feat(dataframe_client): handle np.nan, np.inf values in DataFrameClient #812

Merged
merged 3 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Add support for messagepack (#734 thx @lovasoa)
- Add support for 'show series' (#357 thx @gaker)
- Add support for custom request session in InfluxDBClient (#360 thx @dschien)
- Add support for handling np.nan and np.inf values in DataFrameClient (#436 thx @nmerket)

### Changed
- Clean up stale CI config (#755)
Expand Down
40 changes: 27 additions & 13 deletions influxdb/_dataframe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,31 @@ def _convert_dataframe_to_json(dataframe,
"h": 1e9 * 3600,
}.get(time_precision, 1)

if not tag_columns:
points = [
{'measurement': measurement,
'fields':
rec.replace([np.inf, -np.inf], np.nan).dropna().to_dict(),
'time': np.int64(ts.value / precision_factor)}
for ts, (_, rec) in zip(
dataframe.index,
dataframe[field_columns].iterrows()
)
]

return points

points = [
{'measurement': measurement,
'tags': dict(list(tag.items()) + list(tags.items())),
'fields': rec,
'fields':
rec.replace([np.inf, -np.inf], np.nan).dropna().to_dict(),
'time': np.int64(ts.value / precision_factor)}
for ts, tag, rec in zip(dataframe.index,
dataframe[tag_columns].to_dict('record'),
dataframe[field_columns].to_dict('record'))
for ts, tag, (_, rec) in zip(
dataframe.index,
dataframe[tag_columns].to_dict('record'),
dataframe[field_columns].iterrows()
)
]

return points
Expand Down Expand Up @@ -379,21 +396,18 @@ def _convert_dataframe_to_lines(self,
tags = ''

# Make an array of formatted field keys and values
field_df = dataframe[field_columns]
# Keep the positions where Null values are found
mask_null = field_df.isnull().values
field_df = dataframe[field_columns].replace([np.inf, -np.inf], np.nan)
nans = pd.isnull(field_df)

field_df = self._stringify_dataframe(field_df,
numeric_precision,
datatype='field')

field_df = (field_df.columns.values + '=').tolist() + field_df
field_df[field_df.columns[1:]] = ',' + field_df[
field_df.columns[1:]]
field_df = field_df.where(~mask_null, '') # drop Null entries
fields = field_df.sum(axis=1)
# take out leading , where first column has a Null value
fields = fields.str.lstrip(",")
field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
field_df[nans] = ''

fields = field_df.sum(axis=1).map(lambda x: x.lstrip(','))
del field_df

# Generate line protocol string
Expand Down
122 changes: 119 additions & 3 deletions influxdb/tests/dataframe_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
import warnings
import requests_mock

from influxdb.tests import skip_if_pypy, using_pypy
from nose.tools import raises
from influxdb.tests import skip_if_pypy, using_pypy

from .client_test import _mocked_session

if not using_pypy:
import pandas as pd
from pandas.util.testing import assert_frame_equal
from influxdb import DataFrameClient
import numpy
import numpy as np


@skip_if_pypy
Expand Down Expand Up @@ -462,7 +462,7 @@ def test_write_points_from_dataframe_with_numeric_precision(self):
["2", 2, 2.2222222222222]],
index=[now, now + timedelta(hours=1)])

if numpy.lib.NumpyVersion(numpy.__version__) <= '1.13.3':
if np.lib.NumpyVersion(np.__version__) <= '1.13.3':
expected_default_precision = (
b'foo,hello=there 0=\"1\",1=1i,2=1.11111111111 0\n'
b'foo,hello=there 0=\"2\",1=2i,2=2.22222222222 3600000000000\n'
Expand Down Expand Up @@ -1032,3 +1032,119 @@ def test_dsn_constructor(self):
client = DataFrameClient.from_dsn('influxdb://localhost:8086')
self.assertIsInstance(client, DataFrameClient)
self.assertEqual('http://localhost:8086', client._baseurl)

def test_write_points_from_dataframe_with_nan_line(self):
"""Test write points from dataframe with Nan lines."""
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]],
index=[now, now + timedelta(hours=1)],
columns=["column_one", "column_two",
"column_three"])
expected = (
b"foo column_one=\"1\",column_two=1i 0\n"
b"foo column_one=\"2\",column_two=2i "
b"3600000000000\n"
)

with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)

cli = DataFrameClient(database='db')

cli.write_points(dataframe, 'foo', protocol='line')
self.assertEqual(m.last_request.body, expected)

cli.write_points(dataframe, 'foo', tags=None, protocol='line')
self.assertEqual(m.last_request.body, expected)

def test_write_points_from_dataframe_with_nan_json(self):
"""Test write points from json with NaN lines."""
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]],
index=[now, now + timedelta(hours=1)],
columns=["column_one", "column_two",
"column_three"])
expected = (
b"foo column_one=\"1\",column_two=1i 0\n"
b"foo column_one=\"2\",column_two=2i "
b"3600000000000\n"
)

with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)

cli = DataFrameClient(database='db')

cli.write_points(dataframe, 'foo', protocol='json')
self.assertEqual(m.last_request.body, expected)

cli.write_points(dataframe, 'foo', tags=None, protocol='json')
self.assertEqual(m.last_request.body, expected)

def test_write_points_from_dataframe_with_tags_and_nan_line(self):
"""Test write points from dataframe with NaN lines and tags."""
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, np.inf],
['red', 0, "2", 2, np.nan]],
index=[now, now + timedelta(hours=1)],
columns=["tag_one", "tag_two", "column_one",
"column_two", "column_three"])
expected = (
b"foo,tag_one=blue,tag_two=1 "
b"column_one=\"1\",column_two=1i "
b"0\n"
b"foo,tag_one=red,tag_two=0 "
b"column_one=\"2\",column_two=2i "
b"3600000000000\n"
)

with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)

cli = DataFrameClient(database='db')

cli.write_points(dataframe, 'foo', protocol='line',
tag_columns=['tag_one', 'tag_two'])
self.assertEqual(m.last_request.body, expected)

cli.write_points(dataframe, 'foo', tags=None, protocol='line',
tag_columns=['tag_one', 'tag_two'])
self.assertEqual(m.last_request.body, expected)

def test_write_points_from_dataframe_with_tags_and_nan_json(self):
"""Test write points from json with NaN lines and tags."""
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, np.inf],
['red', 0, "2", 2, np.nan]],
index=[now, now + timedelta(hours=1)],
columns=["tag_one", "tag_two", "column_one",
"column_two", "column_three"])
expected = (
b"foo,tag_one=blue,tag_two=1 "
b"column_one=\"1\",column_two=1i "
b"0\n"
b"foo,tag_one=red,tag_two=0 "
b"column_one=\"2\",column_two=2i "
b"3600000000000\n"
)

with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)

cli = DataFrameClient(database='db')

cli.write_points(dataframe, 'foo', protocol='json',
tag_columns=['tag_one', 'tag_two'])
self.assertEqual(m.last_request.body, expected)

cli.write_points(dataframe, 'foo', tags=None, protocol='json',
tag_columns=['tag_one', 'tag_two'])
self.assertEqual(m.last_request.body, expected)