Skip to content

Commit 2db3afc

Browse files
authored
feat(data_frame): add possibility to specify timestamp column (#440)
* feat(data_frame): add possibility to specify timestamp column * fix: code style * docs: add documentation about supported `str` formats * docs: add possibility to specify timestamp column * docs: update CHANGELOG.md
1 parent 8543469 commit 2db3afc

File tree

5 files changed

+146
-15
lines changed

5 files changed

+146
-15
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.30.0 [unreleased]
22

3+
### Features
4+
1. [#440](https://github.com/influxdata/influxdb-client-python/pull/440): Add possibility to specify timestamp column and its timezone [DataFrame]
5+
36
## 1.29.1 [2022-05-23]
47

58
### Bug Fixes

influxdb_client/client/write/dataframe_serializer.py

+36-11
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
4141
:param chunk_size: The size of chunk for serializing into chunks.
4242
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
4343
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
44-
"""
44+
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value
45+
formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00`
46+
or other formats and types supported by `pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_ - ``DataFrame``
47+
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
48+
""" # noqa: E501
4549
# This function is hard to understand but for good reason:
4650
# the approach used here is considerably more efficient
4751
# than the alternatives.
@@ -92,19 +96,32 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
9296
if data_frame_measurement_name is None:
9397
raise TypeError('"data_frame_measurement_name" is a Required Argument')
9498

99+
timestamp_column = kwargs.get('data_frame_timestamp_column', None)
100+
timestamp_timezone = kwargs.get('data_frame_timestamp_timezone', None)
95101
data_frame = data_frame.copy(deep=False)
96-
if isinstance(data_frame.index, pd.PeriodIndex):
97-
data_frame.index = data_frame.index.to_timestamp()
102+
data_frame_timestamp = data_frame.index if timestamp_column is None else data_frame[timestamp_column]
103+
if isinstance(data_frame_timestamp, pd.PeriodIndex):
104+
data_frame_timestamp = data_frame_timestamp.to_timestamp()
98105
else:
99106
# TODO: this is almost certainly not what you want
100107
# when the index is the default RangeIndex.
101108
# Instead, it would probably be better to leave
102109
# out the timestamp unless a time column is explicitly
103110
# enabled.
104-
data_frame.index = pd.to_datetime(data_frame.index, unit=precision)
111+
data_frame_timestamp = pd.to_datetime(data_frame_timestamp, unit=precision)
112+
113+
if timestamp_timezone:
114+
if isinstance(data_frame_timestamp, pd.DatetimeIndex):
115+
data_frame_timestamp = data_frame_timestamp.tz_localize(timestamp_timezone)
116+
else:
117+
data_frame_timestamp = data_frame_timestamp.dt.tz_localize(timestamp_timezone)
105118

106-
if data_frame.index.tzinfo is None:
107-
data_frame.index = data_frame.index.tz_localize('UTC')
119+
if hasattr(data_frame_timestamp, 'tzinfo') and data_frame_timestamp.tzinfo is None:
120+
data_frame_timestamp = data_frame_timestamp.tz_localize('UTC')
121+
if timestamp_column is None:
122+
data_frame.index = data_frame_timestamp
123+
else:
124+
data_frame[timestamp_column] = data_frame_timestamp
108125

109126
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
110127
data_frame_tag_columns = set(data_frame_tag_columns or [])
@@ -141,6 +158,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
141158
# null_columns has a bool value for each column holding
142159
# whether that column contains any null (NaN or None) values.
143160
null_columns = data_frame.isnull().any()
161+
timestamp_index = 0
144162

145163
# Iterate through the columns building up the expression for each column.
146164
for index, (key, value) in columns:
@@ -164,6 +182,9 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
164182
key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}'
165183
tags.append(key_value)
166184
continue
185+
elif timestamp_column is not None and key in timestamp_column:
186+
timestamp_index = field_index
187+
continue
167188

168189
# This column is a field column.
169190
# Note: no comma separator is needed for the first field.
@@ -195,13 +216,13 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
195216

196217
tags = ''.join(tags)
197218
fields = ''.join(fields)
198-
timestamp = '{p[0].value}'
219+
timestamp = '{p[%s].value}' % timestamp_index
199220
if precision == WritePrecision.US:
200-
timestamp = '{int(p[0].value / 1e3)}'
221+
timestamp = '{int(p[%s].value / 1e3)}' % timestamp_index
201222
elif precision == WritePrecision.MS:
202-
timestamp = '{int(p[0].value / 1e6)}'
223+
timestamp = '{int(p[%s].value / 1e6)}' % timestamp_index
203224
elif precision == WritePrecision.S:
204-
timestamp = '{int(p[0].value / 1e9)}'
225+
timestamp = '{int(p[%s].value / 1e9)}' % timestamp_index
205226

206227
f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
207228
'measurement_name': measurement_name,
@@ -268,5 +289,9 @@ def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_W
268289
:param precision: The precision for the unix timestamps within the body line-protocol.
269290
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
270291
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
271-
"""
292+
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value
293+
formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00`
294+
or other formats and types supported by `pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_ - ``DataFrame``
295+
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
296+
""" # noqa: E501
272297
return DataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize()

influxdb_client/client/write_api.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ def write(self, bucket: str, org: str = None,
294294
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame``
295295
:key data_frame_tag_columns: list of DataFrame columns which are tags,
296296
rest columns will be fields - ``DataFrame``
297+
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value
298+
formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00`
299+
or other formats and types supported by `pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_ - ``DataFrame``
300+
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
297301
:key record_measurement_key: key of record with specified measurement -
298302
``dictionary``, ``NamedTuple``, ``dataclass``
299303
:key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass``
@@ -322,8 +326,8 @@ def write(self, bucket: str, org: str = None,
322326
write_api.write("my-bucket", "my-org", point)
323327
324328
DataFrame:
325-
The index of `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
326-
is used as a ``timestamp`` for written data. The index should be `PeriodIndex <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.PeriodIndex.html#pandas.PeriodIndex>`_
329+
If the ``data_frame_timestamp_column`` is not specified the index of `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
330+
is used as a ``timestamp`` for written data. The index can be `PeriodIndex <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.PeriodIndex.html#pandas.PeriodIndex>`_
327331
or its must be transformable to ``datetime`` by
328332
`pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_.
329333

influxdb_client/client/write_api_async.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ async def write(self, bucket: str, org: str = None,
5858
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame``
5959
:key data_frame_tag_columns: list of DataFrame columns which are tags,
6060
rest columns will be fields - ``DataFrame``
61+
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value
62+
formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00`
63+
or other formats and types supported by `pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_ - ``DataFrame``
64+
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
6165
:key record_measurement_key: key of record with specified measurement -
6266
``dictionary``, ``NamedTuple``, ``dataclass``
6367
:key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass``
@@ -87,8 +91,8 @@ async def write(self, bucket: str, org: str = None,
8791
await write_api.write("my-bucket", "my-org", point)
8892
8993
DataFrame:
90-
The index of `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
91-
is used as a ``timestamp`` for written data. The index should be `PeriodIndex <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.PeriodIndex.html#pandas.PeriodIndex>`_
94+
If the ``data_frame_timestamp_column`` is not specified the index of `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
95+
is used as a ``timestamp`` for written data. The index can be `PeriodIndex <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.PeriodIndex.html#pandas.PeriodIndex>`_
9296
or its must be transformable to ``datetime`` by
9397
`pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_.
9498

tests/test_WriteApiDataFrame.py

+95
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,101 @@ def test_without_tags_and_fields_with_nan(self):
416416
self.assertEqual("test a=1.0 1609459260000000000", points[1])
417417
self.assertEqual("test a=2.0,b=1.0 1609459320000000000", points[2])
418418

419+
def test_use_timestamp_from_specified_column(self):
420+
from influxdb_client.extras import pd
421+
data_frame = pd.DataFrame(data={
422+
'column_time': ['2020-04-05', '2020-05-05'],
423+
'value1': [10, 20],
424+
'value2': [30, 40],
425+
}, index=['A', 'B'])
426+
427+
points = data_frame_to_list_of_points(data_frame=data_frame,
428+
data_frame_measurement_name="test",
429+
data_frame_timestamp_column="column_time",
430+
point_settings=PointSettings())
431+
432+
self.assertEqual(2, len(points))
433+
self.assertEqual('test value1=10i,value2=30i 1586044800000000000', points[0])
434+
self.assertEqual('test value1=20i,value2=40i 1588636800000000000', points[1])
435+
436+
def test_str_format_for_timestamp(self):
437+
from influxdb_client.extras import pd
438+
439+
time_formats = [
440+
('2018-10-26', 'test value1=10i,value2=20i 1540512000000000000'),
441+
('2018-10-26 10:00', 'test value1=10i,value2=20i 1540548000000000000'),
442+
('2018-10-26 10:00:00-05:00', 'test value1=10i,value2=20i 1540566000000000000'),
443+
('2018-10-26T11:00:00+00:00', 'test value1=10i,value2=20i 1540551600000000000'),
444+
('2018-10-26 12:00:00+00:00', 'test value1=10i,value2=20i 1540555200000000000'),
445+
('2018-10-26T16:00:00-01:00', 'test value1=10i,value2=20i 1540573200000000000'),
446+
]
447+
448+
for time_format in time_formats:
449+
data_frame = pd.DataFrame(data={
450+
'column_time': [time_format[0]],
451+
'value1': [10],
452+
'value2': [20],
453+
}, index=['A'])
454+
points = data_frame_to_list_of_points(data_frame=data_frame,
455+
data_frame_measurement_name="test",
456+
data_frame_timestamp_column="column_time",
457+
point_settings=PointSettings())
458+
459+
self.assertEqual(1, len(points))
460+
self.assertEqual(time_format[1], points[0])
461+
462+
def test_specify_timezone(self):
463+
from influxdb_client.extras import pd
464+
data_frame = pd.DataFrame(data={
465+
'column_time': ['2020-05-24 10:00', '2020-05-24 01:00'],
466+
'value1': [10, 20],
467+
'value2': [30, 40],
468+
}, index=['A', 'B'])
469+
470+
points = data_frame_to_list_of_points(data_frame=data_frame,
471+
data_frame_measurement_name="test",
472+
data_frame_timestamp_column="column_time",
473+
data_frame_timestamp_timezone="Europe/Berlin",
474+
point_settings=PointSettings())
475+
476+
self.assertEqual(2, len(points))
477+
self.assertEqual('test value1=10i,value2=30i 1590307200000000000', points[0])
478+
self.assertEqual('test value1=20i,value2=40i 1590274800000000000', points[1])
479+
480+
def test_specify_timezone_date_time_index(self):
481+
from influxdb_client.extras import pd
482+
data_frame = pd.DataFrame(data={
483+
'value1': [10, 20],
484+
'value2': [30, 40],
485+
}, index=[pd.Timestamp('2020-05-24 10:00'), pd.Timestamp('2020-05-24 01:00')])
486+
487+
points = data_frame_to_list_of_points(data_frame=data_frame,
488+
data_frame_measurement_name="test",
489+
data_frame_timestamp_timezone="Europe/Berlin",
490+
point_settings=PointSettings())
491+
492+
self.assertEqual(2, len(points))
493+
self.assertEqual('test value1=10i,value2=30i 1590307200000000000', points[0])
494+
self.assertEqual('test value1=20i,value2=40i 1590274800000000000', points[1])
495+
496+
def test_specify_timezone_period_time_index(self):
497+
from influxdb_client.extras import pd
498+
data_frame = pd.DataFrame(data={
499+
'value1': [10, 20],
500+
'value2': [30, 40],
501+
}, index=pd.period_range(start='2020-05-24 10:00', freq='H', periods=2))
502+
503+
print(data_frame.to_string())
504+
505+
points = data_frame_to_list_of_points(data_frame=data_frame,
506+
data_frame_measurement_name="test",
507+
data_frame_timestamp_timezone="Europe/Berlin",
508+
point_settings=PointSettings())
509+
510+
self.assertEqual(2, len(points))
511+
self.assertEqual('test value1=10i,value2=30i 1590307200000000000', points[0])
512+
self.assertEqual('test value1=20i,value2=40i 1590310800000000000', points[1])
513+
419514

420515
class DataSerializerChunksTest(unittest.TestCase):
421516
def test_chunks(self):

0 commit comments

Comments
 (0)