Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

[Fix](#649) changes to support accurate ns timepoints #650

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ matrix:

install:
- pip install tox-travis
- pip install setuptools
- pip install setuptools==30.0.0
- pip install coveralls
- mkdir -p "influxdb_install/${INFLUXDB_VER}"
- if [ -n "${INFLUXDB_VER}" ] ; then wget "https://dl.influxdata.com/influxdb/releases/influxdb_${INFLUXDB_VER}_amd64.deb" ; fi
Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Main dependency is:
Additional dependencies are:

- pandas: for writing from and reading to DataFrames (http://pandas.pydata.org/)
- pandas: for ns resolution of timepoints
- Sphinx: Tool to create and manage the documentation (http://sphinx-doc.org/)
- Nose: to auto-discover tests (http://nose.readthedocs.org/en/latest/)
- Mock: to mock tests (https://pypi.python.org/pypi/mock)
Expand Down
61 changes: 29 additions & 32 deletions influxdb/_dataframe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,27 @@
from .client import InfluxDBClient
from .line_protocol import _escape_tag

# Precisions factors must be int for correct calculation to ints.
# if precision is a float the result of a floor calc is an approximation
# Example : the issue is only observable with nanosecond resolution
# values are greater than 895ns
# ts = pd.Timestamp('2013-01-01 23:10:55.123456987+00:00')
# ts_ns = np.int64(ts.value)
# # For conversion to microsecond
# precision_factor=1e3
# expected_ts_us = 1357081855123456
# # following is INCORRECT 1357081855123457
# np.int64(ts_ns // precision_factor)
# # following is CORRECT 1357081855123456
# np.int64(ts_ns // np.int64(precision_factor)

_time_precision_factors = {"n": 1,
"u": np.int64(1e3),
"ms": np.int64(1e6),
"s": np.int64(1e9),
"m": np.int64(1e9 * 60),
"h": np.int64(1e9 * 3600), }


def _pandas_time_unit(time_precision):
unit = time_precision
Expand Down Expand Up @@ -261,20 +282,13 @@ def _convert_dataframe_to_json(dataframe,
# Convert dtype for json serialization
dataframe = dataframe.astype('object')

precision_factor = {
"n": 1,
"u": 1e3,
"ms": 1e6,
"s": 1e9,
"m": 1e9 * 60,
"h": 1e9 * 3600,
}.get(time_precision, 1)
precision_factor = _time_precision_factors.get(time_precision, 1)

points = [
{'measurement': measurement,
'tags': dict(list(tag.items()) + list(tags.items())),
'fields': rec,
'time': np.int64(ts.value / precision_factor)}
'time': np.int64(ts.value // precision_factor)}
for ts, tag, rec in zip(dataframe.index,
dataframe[tag_columns].to_dict('record'),
dataframe[field_columns].to_dict('record'))
Expand Down Expand Up @@ -331,21 +345,14 @@ def _convert_dataframe_to_lines(self,
field_columns = list(column_series[~column_series.isin(
tag_columns)])

precision_factor = {
"n": 1,
"u": 1e3,
"ms": 1e6,
"s": 1e9,
"m": 1e9 * 60,
"h": 1e9 * 3600,
}.get(time_precision, 1)
precision_factor = _time_precision_factors.get(time_precision, 1)

# Make array of timestamp ints
if isinstance(dataframe.index, pd.PeriodIndex):
time = ((dataframe.index.to_timestamp().values.astype(np.int64) /
time = ((dataframe.index.to_timestamp().values.astype(np.int64) //
precision_factor).astype(np.int64).astype(str))
else:
time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) /
time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) //
precision_factor).astype(np.int64).astype(str))

# If tag columns exist, make an array of formatted tag keys and values
Expand Down Expand Up @@ -454,16 +461,6 @@ def _stringify_dataframe(dframe, numeric_precision, datatype='field'):
return dframe

def _datetime_to_epoch(self, datetime, time_precision='s'):
seconds = (datetime - self.EPOCH).total_seconds()
if time_precision == 'h':
return seconds / 3600
elif time_precision == 'm':
return seconds / 60
elif time_precision == 's':
return seconds
elif time_precision == 'ms':
return seconds * 1e3
elif time_precision == 'u':
return seconds * 1e6
elif time_precision == 'n':
return seconds * 1e9
nanoseconds = (datetime - self.EPOCH).value
precision_factor = _time_precision_factors.get(time_precision, 1)
return np.int64(nanoseconds // np.int64(precision_factor))
63 changes: 37 additions & 26 deletions influxdb/line_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,55 @@

from datetime import datetime
from numbers import Integral

from pytz import UTC
from dateutil.parser import parse
from six import binary_type, text_type, integer_types, PY2

EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
import pandas as pd # Provide for ns timestamps
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add pandas to requirements.txt (and remove pytz and python-dateutil), build is currently failing because of that, see pypy build logs: https://travis-ci.org/influxdata/influxdb-python/jobs/516866320

Great fix btw, looking forward to being merged.

import numpy as np # Provided for accurate precision_factor conversion

EPOCH = pd.Timestamp(0, tz='UTC')

# Precisions factors must be int for correct calculation to ints.
# if precision is float the result of a floor calc is an approximation
# Example : the issue is only observable with nanosecond resolution
# values are greater than 895ns
# ts = pd.Timestamp('2013-01-01 23:10:55.123456987+00:00')
# ts_ns = np.int64(ts.value)
# # For conversion to microsecond
# precision_factor=1e3
# expected_ts_us = 1357081855123456
# np.int64(ts_ns // precision_factor) # is INCORRECT 1357081855123457
# np.int64(ts_ns // np.int64(precision_factor) # is CORRECT 1357081855123456

_time_precision_factors = {"n": 1,
"u": np.int64(1e3),
"ms": np.int64(1e6),
"s": np.int64(1e9),
"m": np.int64(1e9 * 60),
"h": np.int64(1e9 * 3600), }

def _convert_timestamp(timestamp, precision=None):

def _convert_timestamp(timestamp, time_precision=None):
if isinstance(timestamp, Integral):
return timestamp # assume precision is correct if timestamp is int

if isinstance(_get_unicode(timestamp), text_type):
timestamp = parse(timestamp)
timestamp = pd.Timestamp(timestamp)

if isinstance(timestamp, datetime):
if isinstance(timestamp, datetime): # change to pandas.Timestamp
if not timestamp.tzinfo:
timestamp = UTC.localize(timestamp)

ns = (timestamp - EPOCH).total_seconds() * 1e9
if precision is None or precision == 'n':
return ns

if precision == 'u':
return ns / 1e3

if precision == 'ms':
return ns / 1e6

if precision == 's':
return ns / 1e9

if precision == 'm':
return ns / 1e9 / 60
timestamp = pd.Timestamp(timestamp, tz='UTC')
else:
timestamp = pd.Timestamp(timestamp)

if precision == 'h':
return ns / 1e9 / 3600
if isinstance(timestamp, pd._libs.tslib.Timestamp):
if not timestamp.tzinfo: # set to UTC for time since EPOCH
timestamp = pd.Timestamp(timestamp, tz='UTC')
else:
timestamp = timestamp.astimezone('UTC')

nanoseconds = (timestamp - EPOCH).value
precision_factor = _time_precision_factors.get(time_precision, 1)
return np.int64(nanoseconds // np.int64(precision_factor))
raise ValueError(timestamp)


Expand Down
28 changes: 14 additions & 14 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def setUp(self):
"host": "server01",
"region": "us-west"
},
"time": "2009-11-10T23:00:00.123456Z",
"time": "2009-11-10 23:10:55.123456987",
"fields": {
"value": 0.64
}
Expand Down Expand Up @@ -210,7 +210,7 @@ def test_write_points(self):
)
self.assertEqual(
'cpu_load_short,host=server01,region=us-west '
'value=0.64 1257894000123456000\n',
'value=0.64 1257894655123456987\n',
m.last_request.body.decode('utf-8'),
)

Expand All @@ -232,7 +232,7 @@ def test_write_points_toplevel_attributes(self):
)
self.assertEqual(
'cpu_load_short,host=server01,region=us-west,tag=hello '
'value=0.64 1257894000123456000\n',
'value=0.64 1257894655123456987\n',
m.last_request.body.decode('utf-8'),
)

Expand Down Expand Up @@ -281,7 +281,7 @@ def test_write_points_udp(self):

self.assertEqual(
'cpu_load_short,host=server01,region=us-west '
'value=0.64 1257894000123456000\n',
'value=0.64 1257894655123456987\n',
received_data.decode()
)

Expand All @@ -306,35 +306,35 @@ def test_write_points_with_precision(self):
cli.write_points(self.dummy_points, time_precision='n')
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123456000\n',
b'value=0.64 1257894655123456987\n',
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='u')
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123456\n',
b'value=0.64 1257894655123456\n',
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='ms')
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123\n',
b'value=0.64 1257894655123\n',
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='s')
self.assertEqual(
b"cpu_load_short,host=server01,region=us-west "
b"value=0.64 1257894000\n",
b"value=0.64 1257894655\n",
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='m')
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 20964900\n',
b'value=0.64 20964910\n',
m.last_request.body,
)

Expand Down Expand Up @@ -377,39 +377,39 @@ def test_write_points_with_precision_udp(self):
received_data, addr = s.recvfrom(1024)
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123456000\n',
b'value=0.64 1257894655123456987\n',
received_data,
)

cli.write_points(self.dummy_points, time_precision='u')
received_data, addr = s.recvfrom(1024)
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123456\n',
b'value=0.64 1257894655123456\n',
received_data,
)

cli.write_points(self.dummy_points, time_precision='ms')
received_data, addr = s.recvfrom(1024)
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123\n',
b'value=0.64 1257894655123\n',
received_data,
)

cli.write_points(self.dummy_points, time_precision='s')
received_data, addr = s.recvfrom(1024)
self.assertEqual(
b"cpu_load_short,host=server01,region=us-west "
b"value=0.64 1257894000\n",
b"value=0.64 1257894655\n",
received_data,
)

cli.write_points(self.dummy_points, time_precision='m')
received_data, addr = s.recvfrom(1024)
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 20964900\n',
b'value=0.64 20964910\n',
received_data,
)

Expand Down
36 changes: 27 additions & 9 deletions influxdb/tests/dataframe_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,37 +994,55 @@ def test_get_list_database(self):
)

def test_datetime_to_epoch(self):
"""Test convert datetime to epoch in TestDataFrameClient object."""
timestamp = pd.Timestamp('2013-01-01 00:00:00.000+00:00')
"""Test convert datetime to epoch in TestDataFrameClient object.

Precisions factors must be int for correct calculation to ints.
if precision is float the result of a floor calc is an approximation
Choosing the test value is important that nanosecond resolution
values are greater than 895ns

Example : the issue is only observable ns > 895ns
# ts = pd.Timestamp('2013-01-01 23:10:55.123456987+00:00')
# ts_ns = np.int64(ts.value)
# # For conversion to microsecond
# precision_factor=1e3
# expected_ts_us = 1357081855123456
# following is INCORRECT 1357081855123457
# np.int64(ts_ns // precision_factor)
# following is CORRECT 1357081855123456
# np.int64(ts_ns // np.int64(precision_factor)

"""
timestamp = pd.Timestamp('2013-01-01 23:10:55.123456987+00:00')
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')

self.assertEqual(
cli._datetime_to_epoch(timestamp),
1356998400.0
1357081855
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='h'),
1356998400.0 / 3600
1357081855 // 3600
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='m'),
1356998400.0 / 60
1357081855 // 60
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='s'),
1356998400.0
1357081855
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='ms'),
1356998400000.0
1357081855123
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='u'),
1356998400000000.0
1357081855123456
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='n'),
1356998400000000000.0
1357081855123456987
)

def test_dsn_constructor(self):
Expand Down
Loading