Skip to content

Commit b40f24b

Browse files
committed
feat(data_frame): add possibility to specify timestamp column
1 parent 562f4fc commit b40f24b

File tree

4 files changed

+40
-9
lines changed

4 files changed

+40
-9
lines changed

influxdb_client/client/write/dataframe_serializer.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
4141
:param chunk_size: The size of chunk for serializing into chunks.
4242
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
4343
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
44+
:key data_frame_timestamp_column: DataFrame column which contains timestamp
4445
"""
4546
# This function is hard to understand but for good reason:
4647
# the approach used here is considerably more efficient
@@ -92,19 +93,25 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
9293
if data_frame_measurement_name is None:
9394
raise TypeError('"data_frame_measurement_name" is a Required Argument')
9495

96+
timestamp_column = kwargs.get('data_frame_timestamp_column', None)
9597
data_frame = data_frame.copy(deep=False)
96-
if isinstance(data_frame.index, pd.PeriodIndex):
97-
data_frame.index = data_frame.index.to_timestamp()
98+
data_frame_timestamp = data_frame.index if timestamp_column is None else data_frame[timestamp_column]
99+
if isinstance(data_frame_timestamp, pd.PeriodIndex):
100+
data_frame_timestamp = data_frame_timestamp.to_timestamp()
98101
else:
99102
# TODO: this is almost certainly not what you want
100103
# when the index is the default RangeIndex.
101104
# Instead, it would probably be better to leave
102105
# out the timestamp unless a time column is explicitly
103106
# enabled.
104-
data_frame.index = pd.to_datetime(data_frame.index, unit=precision)
107+
data_frame_timestamp = pd.to_datetime(data_frame_timestamp, unit=precision)
105108

106-
if data_frame.index.tzinfo is None:
107-
data_frame.index = data_frame.index.tz_localize('UTC')
109+
if hasattr(data_frame_timestamp, 'tzinfo') and data_frame_timestamp.tzinfo is None:
110+
data_frame_timestamp = data_frame_timestamp.tz_localize('UTC')
111+
if timestamp_column is None:
112+
data_frame.index = data_frame_timestamp
113+
else:
114+
data_frame[timestamp_column] = data_frame_timestamp
108115

109116
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
110117
data_frame_tag_columns = set(data_frame_tag_columns or [])
@@ -141,6 +148,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
141148
# null_columns has a bool value for each column holding
142149
# whether that column contains any null (NaN or None) values.
143150
null_columns = data_frame.isnull().any()
151+
timestamp_index = 0
144152

145153
# Iterate through the columns building up the expression for each column.
146154
for index, (key, value) in columns:
@@ -164,6 +172,9 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
164172
key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}'
165173
tags.append(key_value)
166174
continue
175+
elif timestamp_column is not None and key in timestamp_column:
176+
timestamp_index = field_index
177+
continue
167178

168179
# This column is a field column.
169180
# Note: no comma separator is needed for the first field.
@@ -195,13 +206,13 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
195206

196207
tags = ''.join(tags)
197208
fields = ''.join(fields)
198-
timestamp = '{p[0].value}'
209+
timestamp = '{p[%s].value}' % timestamp_index
199210
if precision == WritePrecision.US:
200-
timestamp = '{int(p[0].value / 1e3)}'
211+
timestamp = '{int(p[%s].value / 1e3)}' % timestamp_index
201212
elif precision == WritePrecision.MS:
202-
timestamp = '{int(p[0].value / 1e6)}'
213+
timestamp = '{int(p[%s].value / 1e6)}' % timestamp_index
203214
elif precision == WritePrecision.S:
204-
timestamp = '{int(p[0].value / 1e9)}'
215+
timestamp = '{int(p[%s].value / 1e9)}' % timestamp_index
205216

206217
f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
207218
'measurement_name': measurement_name,
@@ -268,5 +279,6 @@ def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_W
268279
:param precision: The precision for the unix timestamps within the body line-protocol.
269280
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
270281
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
282+
:key data_frame_timestamp_column: DataFrame column which contains timestamps
271283
"""
272284
return DataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize()

influxdb_client/client/write_api.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ def write(self, bucket: str, org: str = None,
294294
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame``
295295
:key data_frame_tag_columns: list of DataFrame columns which are tags,
296296
rest columns will be fields - ``DataFrame``
297+
:key data_frame_timestamp_column: DataFrame column which contains timestamp - ``DataFrame``
297298
:key record_measurement_key: key of record with specified measurement -
298299
``dictionary``, ``NamedTuple``, ``dataclass``
299300
:key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass``

influxdb_client/client/write_api_async.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ async def write(self, bucket: str, org: str = None,
5858
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame``
5959
:key data_frame_tag_columns: list of DataFrame columns which are tags,
6060
rest columns will be fields - ``DataFrame``
61+
:key data_frame_timestamp_column: DataFrame column which contains timestamp - ``DataFrame``
6162
:key record_measurement_key: key of record with specified measurement -
6263
``dictionary``, ``NamedTuple``, ``dataclass``
6364
:key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass``

tests/test_WriteApiDataFrame.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,23 @@ def test_without_tags_and_fields_with_nan(self):
416416
self.assertEqual("test a=1.0 1609459260000000000", points[1])
417417
self.assertEqual("test a=2.0,b=1.0 1609459320000000000", points[2])
418418

419+
def test_use_timestamp_from_specified_column(self):
420+
from influxdb_client.extras import pd
421+
data_frame = pd.DataFrame(data={
422+
'column_time': ['2020-04-05', '2020-05-05'],
423+
'value1': [10, 20],
424+
'value2': [30, 40],
425+
}, index=['A', 'B'])
426+
427+
points = data_frame_to_list_of_points(data_frame=data_frame,
428+
data_frame_measurement_name="test",
429+
data_frame_timestamp_column="column_time",
430+
point_settings=PointSettings())
431+
432+
self.assertEqual(2, len(points))
433+
self.assertEqual('test value1=10i,value2=30i 1586044800000000000', points[0])
434+
self.assertEqual('test value1=20i,value2=40i 1588636800000000000', points[1])
435+
419436

420437
class DataSerializerChunksTest(unittest.TestCase):
421438
def test_chunks(self):

0 commit comments

Comments
 (0)