Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

[Fix](#649) changes to support accurate ns timepoints #650

Closed
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ matrix:

install:
- pip install tox-travis
- pip install setuptools
- pip install setuptools==30.0.0
- pip install coveralls
- mkdir -p "influxdb_install/${INFLUXDB_VER}"
- if [ -n "${INFLUXDB_VER}" ] ; then wget "https://dl.influxdata.com/influxdb/releases/influxdb_${INFLUXDB_VER}_amd64.deb" ; fi
Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Main dependency is:
Additional dependencies are:

- pandas: for writing from and reading to DataFrames (http://pandas.pydata.org/)
- pandas: for ns resolution of timepoints
- Sphinx: Tool to create and manage the documentation (http://sphinx-doc.org/)
- Nose: to auto-discover tests (http://nose.readthedocs.org/en/latest/)
- Mock: to mock tests (https://pypi.python.org/pypi/mock)
Expand Down
20 changes: 10 additions & 10 deletions influxdb/_dataframe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def _convert_dataframe_to_json(dataframe,
{'measurement': measurement,
'tags': dict(list(tag.items()) + list(tags.items())),
'fields': rec,
'time': np.int64(ts.value / precision_factor)}
'time': np.int64(ts.value // precision_factor)}
for ts, tag, rec in zip(dataframe.index,
dataframe[tag_columns].to_dict('record'),
dataframe[field_columns].to_dict('record'))
Expand Down Expand Up @@ -342,10 +342,10 @@ def _convert_dataframe_to_lines(self,

# Make array of timestamp ints
if isinstance(dataframe.index, pd.PeriodIndex):
time = ((dataframe.index.to_timestamp().values.astype(np.int64) /
time = ((dataframe.index.to_timestamp().values.astype(np.int64) //
precision_factor).astype(np.int64).astype(str))
else:
time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) /
time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) //
precision_factor).astype(np.int64).astype(str))

# If tag columns exist, make an array of formatted tag keys and values
Expand Down Expand Up @@ -452,16 +452,16 @@ def _stringify_dataframe(dframe, numeric_precision, datatype='field'):
return dframe

def _datetime_to_epoch(self, datetime, time_precision='s'):
seconds = (datetime - self.EPOCH).total_seconds()
nanoseconds = (datetime - self.EPOCH).value
if time_precision == 'h':
return seconds / 3600
return np.int64(nanoseconds // 1e9 // 3600)
elif time_precision == 'm':
return seconds / 60
return np.int64(nanoseconds // 1e9 // 60)
elif time_precision == 's':
return seconds
return np.int64(nanoseconds // 1e9)
elif time_precision == 'ms':
return seconds * 1e3
return np.int64(nanoseconds // 1e6)
elif time_precision == 'u':
return seconds * 1e6
return np.int64(nanoseconds // 1e3)
elif time_precision == 'n':
return seconds * 1e9
return nanoseconds
32 changes: 20 additions & 12 deletions influxdb/line_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,45 @@
from datetime import datetime
from numbers import Integral

from pytz import UTC
from dateutil.parser import parse
from six import iteritems, binary_type, text_type, integer_types, PY2

EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
import pandas as pd # Provide for ns timestamps
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add pandas to requirements.txt (and remove pytz and python-dateutil), build is currently failing because of that, see pypy build logs: https://travis-ci.org/influxdata/influxdb-python/jobs/516866320

Great fix btw, looking forward to being merged.


EPOCH = pd.Timestamp(0, tz='UTC')


def _convert_timestamp(timestamp, precision=None):
if isinstance(timestamp, Integral):
return timestamp # assume precision is correct if timestamp is int

if isinstance(_get_unicode(timestamp), text_type):
timestamp = parse(timestamp)
timestamp = pd.Timestamp(timestamp)

if isinstance(timestamp, datetime):
if isinstance(timestamp, datetime): # change to pandas.Timestamp
if not timestamp.tzinfo:
timestamp = UTC.localize(timestamp)
timestamp = pd.Timestamp(timestamp, tz='UTC')
else:
timestamp = pd.Timestamp(timestamp)

if isinstance(timestamp, pd._libs.tslib.Timestamp):
if not timestamp.tzinfo: # set to UTC for time since EPOCH
timestamp = pd.Timestamp(timestamp, tz='UTC')
else:
timestamp = timestamp.astimezone('UTC')

ns = (timestamp - EPOCH).total_seconds() * 1e9
ns = (timestamp - EPOCH).value
if precision is None or precision == 'n':
return ns
elif precision == 'u':
return ns / 1e3
return ns // 1e3
elif precision == 'ms':
return ns / 1e6
return ns // 1e6
elif precision == 's':
return ns / 1e9
return ns // 1e9
elif precision == 'm':
return ns / 1e9 / 60
return ns // 1e9 // 60
elif precision == 'h':
return ns / 1e9 / 3600
return ns // 1e9 // 3600

raise ValueError(timestamp)

Expand Down
28 changes: 14 additions & 14 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def setUp(self):
"host": "server01",
"region": "us-west"
},
"time": "2009-11-10T23:00:00.123456Z",
"time": "2009-11-10 23:10:55.123456789",
"fields": {
"value": 0.64
}
Expand Down Expand Up @@ -202,7 +202,7 @@ def test_write_points(self):
)
self.assertEqual(
'cpu_load_short,host=server01,region=us-west '
'value=0.64 1257894000123456000\n',
'value=0.64 1257894655123456789\n',
m.last_request.body.decode('utf-8'),
)

Expand All @@ -224,7 +224,7 @@ def test_write_points_toplevel_attributes(self):
)
self.assertEqual(
'cpu_load_short,host=server01,region=us-west,tag=hello '
'value=0.64 1257894000123456000\n',
'value=0.64 1257894655123456789\n',
m.last_request.body.decode('utf-8'),
)

Expand Down Expand Up @@ -273,7 +273,7 @@ def test_write_points_udp(self):

self.assertEqual(
'cpu_load_short,host=server01,region=us-west '
'value=0.64 1257894000123456000\n',
'value=0.64 1257894655123456789\n',
received_data.decode()
)

Expand All @@ -298,35 +298,35 @@ def test_write_points_with_precision(self):
cli.write_points(self.dummy_points, time_precision='n')
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123456000\n',
b'value=0.64 1257894655123456789\n',
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='u')
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123456\n',
b'value=0.64 1257894655123456\n',
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='ms')
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123\n',
b'value=0.64 1257894655123\n',
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='s')
self.assertEqual(
b"cpu_load_short,host=server01,region=us-west "
b"value=0.64 1257894000\n",
b"value=0.64 1257894655\n",
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='m')
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 20964900\n',
b'value=0.64 20964910\n',
m.last_request.body,
)

Expand All @@ -352,39 +352,39 @@ def test_write_points_with_precision_udp(self):
received_data, addr = s.recvfrom(1024)
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123456000\n',
b'value=0.64 1257894655123456789\n',
received_data,
)

cli.write_points(self.dummy_points, time_precision='u')
received_data, addr = s.recvfrom(1024)
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123456\n',
b'value=0.64 1257894655123456\n',
received_data,
)

cli.write_points(self.dummy_points, time_precision='ms')
received_data, addr = s.recvfrom(1024)
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123\n',
b'value=0.64 1257894655123\n',
received_data,
)

cli.write_points(self.dummy_points, time_precision='s')
received_data, addr = s.recvfrom(1024)
self.assertEqual(
b"cpu_load_short,host=server01,region=us-west "
b"value=0.64 1257894000\n",
b"value=0.64 1257894655\n",
received_data,
)

cli.write_points(self.dummy_points, time_precision='m')
received_data, addr = s.recvfrom(1024)
self.assertEqual(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 20964900\n',
b'value=0.64 20964910\n',
received_data,
)

Expand Down
16 changes: 8 additions & 8 deletions influxdb/tests/dataframe_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,36 +930,36 @@ def test_get_list_database(self):

def test_datetime_to_epoch(self):
"""Test convert datetime to epoch in TestDataFrameClient object."""
timestamp = pd.Timestamp('2013-01-01 00:00:00.000+00:00')
timestamp = pd.Timestamp('2013-01-01 23:10:55.123456789+00:00')
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')

self.assertEqual(
cli._datetime_to_epoch(timestamp),
1356998400.0
1357081855
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='h'),
1356998400.0 / 3600
1357081855 // 3600
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='m'),
1356998400.0 / 60
1357081855 // 60
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='s'),
1356998400.0
1357081855
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='ms'),
1356998400000.0
1357081855123
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='u'),
1356998400000000.0
1357081855123456
)
self.assertEqual(
cli._datetime_to_epoch(timestamp, time_precision='n'),
1356998400000000000.0
1357081855123456789
)

def test_dsn_constructor(self):
Expand Down
47 changes: 36 additions & 11 deletions influxdb/tests/test_line_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pytz import UTC, timezone

from influxdb import line_protocol
import pandas as pd


class TestLineProtocol(unittest.TestCase):
Expand Down Expand Up @@ -48,31 +49,55 @@ def test_make_lines(self):

def test_timezone(self):
"""Test timezone in TestLineProtocol object."""
# datetime tests
dt = datetime(2009, 11, 10, 23, 0, 0, 123456)
utc = UTC.localize(dt)
berlin = timezone('Europe/Berlin').localize(dt)
eastern = berlin.astimezone(timezone('US/Eastern'))
data = {
"points": [
{"measurement": "A", "fields": {"val": 1},
"time": 0},
{"measurement": "A", "fields": {"val": 1},
"time": "2009-11-10T23:00:00.123456Z"},
{"measurement": "A", "fields": {"val": 1}, "time": dt},
{"measurement": "A", "fields": {"val": 1}, "time": utc},
{"measurement": "A", "fields": {"val": 1}, "time": berlin},
{"measurement": "A", "fields": {"val": 1}, "time": eastern},
]
# pandas ns timestamp tests
pddt = pd.Timestamp('2009-11-10 23:00:00.123456789')
pdutc = pd.Timestamp(pddt, tz='UTC')
pdberlin = pdutc.astimezone('Europe/Berlin')
pdeastern = pdberlin.astimezone('US/Eastern')

data = {"points": [
{"measurement": "A", "fields": {"val": 1}, "time": 0},
# string representations
# String version for datetime
{"measurement": "A", "fields": {"val": 1},
"time": "2009-11-10T23:00:00.123456Z"},
# String version for pandas ns timestamp
{"measurement": "A", "fields": {"val": 1},
"time": "2009-11-10 23:00:00.123456789"},
# datetime
{"measurement": "A", "fields": {"val": 1}, "time": dt},
{"measurement": "A", "fields": {"val": 1}, "time": utc},
{"measurement": "A", "fields": {"val": 1}, "time": berlin},
{"measurement": "A", "fields": {"val": 1}, "time": eastern},
# pandas timestamp
{"measurement": "A", "fields": {"val": 1}, "time": pddt},
{"measurement": "A", "fields": {"val": 1}, "time": pdutc},
{"measurement": "A", "fields": {"val": 1}, "time": pdberlin},
{"measurement": "A", "fields": {"val": 1}, "time": pdeastern},
]
}

self.assertEqual(
line_protocol.make_lines(data),
'\n'.join([
'A val=1i 0',
'A val=1i 1257894000123456000',
'A val=1i 1257894000123456789',
# datetime results
'A val=1i 1257894000123456000',
'A val=1i 1257894000123456000',
'A val=1i 1257890400123456000',
'A val=1i 1257890400123456000',
# pandas ns timestamp results
'A val=1i 1257894000123456789',
'A val=1i 1257894000123456789',
'A val=1i 1257894000123456789',
'A val=1i 1257894000123456789',
]) + '\n'
)

Expand Down