Skip to content

Commit bfa0ac4

Browse files
authored
feat: Added possibility to use datetime nanoseconds precision by pandas.Timestamp (influxdata#141)
1 parent 7d5f2c2 commit bfa0ac4

11 files changed

+261
-40
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
### Features
44
1. [#136](https://github.com/influxdata/influxdb-client-python/pull/136): Allows users to skip of verifying SSL certificate
5-
1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties
5+
1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties
6+
1. [#141](https://github.com/influxdata/influxdb-client-python/pull/141): Added possibility to use datetime nanoseconds precision by `pandas.Timestamp`
67

78
## 1.9.0 [2020-07-17]
89

README.rst

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,10 +899,76 @@ The following forward compatible APIs are available:
899899

900900
For detail info see `InfluxDB 1.8 example <examples/influxdb_18_example.py>`_.
901901

902+
Nanosecond precision
903+
^^^^^^^^^^^^^^^^^^^^
904+
905+
The Python's `datetime <https://docs.python.org/3/library/datetime.html>`_ doesn't support precision with nanoseconds
906+
so the library during writes and queries ignores everything after microseconds.
907+
908+
If you would like to use ``datetime`` with nanosecond precision you should use
909+
`pandas.Timestamp <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Timestamp.html#pandas.Timestamp>`_
910+
that is replacement for python ``datetime.datetime`` object and also you should set a proper ``DateTimeHelper`` to the client.
911+
912+
* sources - `nanosecond_precision.py <https://github.com/influxdata/influxdb-client-python/blob/master/examples/nanosecond_precision.py>`_
913+
914+
.. code-block:: python
915+
916+
from influxdb_client import Point, InfluxDBClient
917+
from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
918+
from influxdb_client.client.write_api import SYNCHRONOUS
919+
920+
"""
921+
Set PandasDate helper which supports nanoseconds.
922+
"""
923+
import influxdb_client.client.util.date_utils as date_utils
924+
925+
date_utils.date_helper = PandasDateTimeHelper()
926+
927+
"""
928+
Prepare client.
929+
"""
930+
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
931+
932+
write_api = client.write_api(write_options=SYNCHRONOUS)
933+
query_api = client.query_api()
934+
935+
"""
936+
Prepare data
937+
"""
938+
939+
point = Point("h2o_feet") \
940+
.field("water_level", 10) \
941+
.tag("location", "pacific") \
942+
.time('1996-02-25T21:20:00.001001231Z')
943+
944+
print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}')
945+
print()
946+
947+
write_api.write(bucket="my-bucket", record=point)
948+
949+
"""
950+
Query: using Stream
951+
"""
952+
query = '''
953+
from(bucket:"my-bucket")
954+
|> range(start: 0, stop: now())
955+
|> filter(fn: (r) => r._measurement == "h2o_feet")
956+
'''
957+
records = query_api.query_stream(query)
958+
959+
for record in records:
960+
print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}')
961+
962+
"""
963+
Close client
964+
"""
965+
client.__del__()
966+
967+
902968
Local tests
903969
-----------
904970

905-
.. code-block:: python
971+
.. code-block:: console
906972
907973
# start/restart InfluxDB2 on local machine using docker
908974
./scripts/influxdb-restart.sh

examples/nanosecond_precision.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from influxdb_client import Point, InfluxDBClient
2+
from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
3+
from influxdb_client.client.write_api import SYNCHRONOUS
4+
5+
"""
6+
Set PandasDate helper which supports nanoseconds.
7+
"""
8+
import influxdb_client.client.util.date_utils as date_utils
9+
10+
date_utils.date_helper = PandasDateTimeHelper()
11+
12+
"""
13+
Prepare client.
14+
"""
15+
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
16+
17+
write_api = client.write_api(write_options=SYNCHRONOUS)
18+
query_api = client.query_api()
19+
20+
"""
21+
Prepare data
22+
"""
23+
24+
point = Point("h2o_feet") \
25+
.field("water_level", 10) \
26+
.tag("location", "pacific") \
27+
.time('1996-02-25T21:20:00.001001231Z')
28+
29+
print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}')
30+
print()
31+
32+
write_api.write(bucket="my-bucket", record=point)
33+
34+
"""
35+
Query: using Stream
36+
"""
37+
query = '''
38+
from(bucket:"my-bucket")
39+
|> range(start: 0, stop: now())
40+
|> filter(fn: (r) => r._measurement == "h2o_feet")
41+
'''
42+
records = query_api.query_stream(query)
43+
44+
for record in records:
45+
print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}')
46+
47+
"""
48+
Close client
49+
"""
50+
client.__del__()

influxdb_client/client/date_utils.py

Lines changed: 0 additions & 18 deletions
This file was deleted.

influxdb_client/client/flux_csv_parser.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from urllib3 import HTTPResponse
1111

12-
from influxdb_client.client.date_utils import get_date_parse_function
12+
from influxdb_client.client.util.date_utils import get_date_helper
1313
from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord
1414

1515

@@ -208,10 +208,7 @@ def _to_value(self, str_val, column):
208208
return base64.b64decode(str_val)
209209

210210
if "dateTime:RFC3339" == column.data_type or "dateTime:RFC3339Nano" == column.data_type:
211-
# todo nanosecods precision
212-
# return str_val
213-
return get_date_parse_function()(str_val)
214-
# return timestamp_parser(str_val)
211+
return get_date_helper().parse_date(str_val)
215212

216213
if "duration" == column.data_type:
217214
# todo better type ?
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Utils package."""
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""Utils to get right Date parsing function."""
2+
3+
from dateutil import parser
4+
5+
date_helper = None
6+
7+
8+
class DateHelper:
9+
"""DateHelper to groups different implementations of date operations."""
10+
11+
def parse_date(self, date_string: str):
12+
"""
13+
Parse string into Date or Timestamp.
14+
15+
:return: Returns a :class:`datetime.datetime` object or compliant implementation
16+
like :class:`class 'pandas._libs.tslibs.timestamps.Timestamp`
17+
"""
18+
pass
19+
20+
def to_nanoseconds(self, delta):
21+
"""
22+
Get number of nanoseconds in timedelta.
23+
24+
Solution comes from v1 client. Thx.
25+
https://github.com/influxdata/influxdb-python/pull/811
26+
"""
27+
nanoseconds_in_days = delta.days * 86400 * 10 ** 9
28+
nanoseconds_in_seconds = delta.seconds * 10 ** 9
29+
nanoseconds_in_micros = delta.microseconds * 10 ** 3
30+
31+
return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros
32+
33+
34+
def get_date_helper() -> DateHelper:
35+
"""
36+
Return DateHelper with proper implementation.
37+
38+
If there is a 'ciso8601' than use 'ciso8601.parse_datetime' else use 'dateutil.parse'.
39+
"""
40+
global date_helper
41+
if date_helper is None:
42+
date_helper = DateHelper()
43+
try:
44+
import ciso8601
45+
date_helper.parse_date = ciso8601.parse_datetime
46+
except ModuleNotFoundError:
47+
date_helper.parse_date = parser.parse
48+
49+
return date_helper
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
"""Pandas date utils."""
2+
from influxdb_client.client.util.date_utils import DateHelper
3+
from influxdb_client.extras import pd
4+
5+
6+
class PandasDateTimeHelper(DateHelper):
7+
"""DateHelper that use Pandas library with nanosecond precision."""
8+
9+
def parse_date(self, date_string: str):
10+
"""Parse date string into `class 'pandas._libs.tslibs.timestamps.Timestamp`."""
11+
return pd.to_datetime(date_string)
12+
13+
def to_nanoseconds(self, delta):
14+
"""Get number of nanoseconds with nanos precision."""
15+
return super().to_nanoseconds(delta) + (delta.nanoseconds if hasattr(delta, 'nanoseconds') else 0)

influxdb_client/client/write/point.py

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from pytz import UTC
1111
from six import iteritems
1212

13-
from influxdb_client.client.date_utils import get_date_parse_function
13+
from influxdb_client.client.util.date_utils import get_date_helper
1414
from influxdb_client.domain.write_precision import WritePrecision
1515

1616
EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
@@ -164,24 +164,13 @@ def _escape_string(value):
164164
return str(value).translate(_ESCAPE_STRING)
165165

166166

167-
def _to_nanoseconds(delta):
168-
"""
169-
Solution comes from v1 client. Thx.
170-
171-
https://github.com/influxdata/influxdb-python/pull/811
172-
"""
173-
nanoseconds_in_days = delta.days * 86400 * 10 ** 9
174-
nanoseconds_in_seconds = delta.seconds * 10 ** 9
175-
nanoseconds_in_micros = delta.microseconds * 10 ** 3
176-
return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros
177-
178-
179167
def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION):
168+
date_helper = get_date_helper()
180169
if isinstance(timestamp, Integral):
181170
return timestamp # assume precision is correct if timestamp is int
182171

183172
if isinstance(timestamp, str):
184-
timestamp = get_date_parse_function()(timestamp)
173+
timestamp = date_helper.parse_date(timestamp)
185174

186175
if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime):
187176

@@ -192,7 +181,7 @@ def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION):
192181
timestamp = timestamp.astimezone(UTC)
193182
timestamp = timestamp - EPOCH
194183

195-
ns = _to_nanoseconds(timestamp)
184+
ns = date_helper.to_nanoseconds(timestamp)
196185

197186
if precision is None or precision == WritePrecision.NS:
198187
return ns

tests/test_PandasDateTimeHelper.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import unittest
2+
from datetime import datetime, timedelta
3+
4+
from pytz import UTC
5+
6+
from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
7+
8+
9+
class PandasDateTimeHelperTest(unittest.TestCase):
10+
11+
def setUp(self) -> None:
12+
self.helper = PandasDateTimeHelper()
13+
14+
def test_parse_date(self):
15+
date = self.helper.parse_date('2020-08-07T06:21:57.331249158Z')
16+
17+
self.assertEqual(date.year, 2020)
18+
self.assertEqual(date.month, 8)
19+
self.assertEqual(date.day, 7)
20+
self.assertEqual(date.hour, 6)
21+
self.assertEqual(date.minute, 21)
22+
self.assertEqual(date.second, 57)
23+
self.assertEqual(date.microsecond, 331249)
24+
self.assertEqual(date.nanosecond, 158)
25+
26+
def test_to_nanoseconds(self):
27+
date = self.helper.parse_date('2020-08-07T06:21:57.331249158Z')
28+
nanoseconds = self.helper.to_nanoseconds(date - UTC.localize(datetime.utcfromtimestamp(0)))
29+
30+
self.assertEqual(nanoseconds, 1596781317331249158)
31+
32+
def test_to_nanoseconds_buildin_timedelta(self):
33+
nanoseconds = self.helper.to_nanoseconds(timedelta(days=1))
34+
35+
self.assertEqual(nanoseconds, 86400000000000)

tests/test_WriteApi.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,42 @@ def test_check_write_permission_by_empty_data(self):
383383

384384
client.__del__()
385385

386+
def test_write_query_data_nanoseconds(self):
387+
388+
from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
389+
import influxdb_client.client.util.date_utils as date_utils
390+
391+
date_utils.date_helper = PandasDateTimeHelper()
392+
393+
bucket = self.create_test_bucket()
394+
395+
point = Point("h2o_feet") \
396+
.field("water_level", 155) \
397+
.tag("location", "creek level")\
398+
.time('1996-02-25T21:20:00.001001231Z')
399+
400+
self.write_client.write(bucket.name, self.org, [point])
401+
402+
flux_result = self.client.query_api().query(
403+
f'from(bucket:"{bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z)')
404+
self.assertEqual(1, len(flux_result))
405+
406+
record = flux_result[0].records[0]
407+
408+
self.assertEqual(self.id_tag, record["id"])
409+
self.assertEqual(record["_value"], 155)
410+
self.assertEqual(record["location"], "creek level")
411+
self.assertEqual(record["_time"].year, 1996)
412+
self.assertEqual(record["_time"].month, 2)
413+
self.assertEqual(record["_time"].day, 25)
414+
self.assertEqual(record["_time"].hour, 21)
415+
self.assertEqual(record["_time"].minute, 20)
416+
self.assertEqual(record["_time"].second, 00)
417+
self.assertEqual(record["_time"].microsecond, 1001)
418+
self.assertEqual(record["_time"].nanosecond, 231)
419+
420+
date_utils.date_helper = None
421+
386422

387423
class AsynchronousWriteTest(BaseTest):
388424

0 commit comments

Comments
 (0)