diff --git a/CHANGELOG.md b/CHANGELOG.md index 29fc118a..e8fea709 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.30.0 [unreleased] +### Features +1. [#440](https://github.com/influxdata/influxdb-client-python/pull/440): Add possibility to specify timestamp column and its timezone [DataFrame] + ## 1.29.1 [2022-05-23] ### Bug Fixes diff --git a/influxdb_client/client/write/dataframe_serializer.py b/influxdb_client/client/write/dataframe_serializer.py index 3e55104d..b5791ebf 100644 --- a/influxdb_client/client/write/dataframe_serializer.py +++ b/influxdb_client/client/write/dataframe_serializer.py @@ -41,7 +41,11 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION :param chunk_size: The size of chunk for serializing into chunks. :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields - """ + :key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value + formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00` + or other formats and types supported by `pandas.to_datetime `_ - ``DataFrame`` + :key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame`` + """ # noqa: E501 # This function is hard to understand but for good reason: # the approach used here is considerably more efficient # than the alternatives. @@ -92,19 +96,32 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION if data_frame_measurement_name is None: raise TypeError('"data_frame_measurement_name" is a Required Argument') + timestamp_column = kwargs.get('data_frame_timestamp_column', None) + timestamp_timezone = kwargs.get('data_frame_timestamp_timezone', None) data_frame = data_frame.copy(deep=False) - if isinstance(data_frame.index, pd.PeriodIndex): - data_frame.index = data_frame.index.to_timestamp() + data_frame_timestamp = data_frame.index if timestamp_column is None else data_frame[timestamp_column] + if isinstance(data_frame_timestamp, pd.PeriodIndex): + data_frame_timestamp = data_frame_timestamp.to_timestamp() else: # TODO: this is almost certainly not what you want # when the index is the default RangeIndex. # Instead, it would probably be better to leave # out the timestamp unless a time column is explicitly # enabled. - data_frame.index = pd.to_datetime(data_frame.index, unit=precision) + data_frame_timestamp = pd.to_datetime(data_frame_timestamp, unit=precision) + + if timestamp_timezone: + if isinstance(data_frame_timestamp, pd.DatetimeIndex): + data_frame_timestamp = data_frame_timestamp.tz_localize(timestamp_timezone) + else: + data_frame_timestamp = data_frame_timestamp.dt.tz_localize(timestamp_timezone) - if data_frame.index.tzinfo is None: - data_frame.index = data_frame.index.tz_localize('UTC') + if hasattr(data_frame_timestamp, 'tzinfo') and data_frame_timestamp.tzinfo is None: + data_frame_timestamp = data_frame_timestamp.tz_localize('UTC') + if timestamp_column is None: + data_frame.index = data_frame_timestamp + else: + data_frame[timestamp_column] = data_frame_timestamp data_frame_tag_columns = kwargs.get('data_frame_tag_columns') data_frame_tag_columns = set(data_frame_tag_columns or []) @@ -141,6 +158,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION # null_columns has a bool value for each column holding # whether that column contains any null (NaN or None) values. null_columns = data_frame.isnull().any() + timestamp_index = 0 # Iterate through the columns building up the expression for each column. for index, (key, value) in columns: @@ -164,6 +182,9 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}' tags.append(key_value) continue + elif timestamp_column is not None and key in timestamp_column: + timestamp_index = field_index + continue # This column is a field column. # Note: no comma separator is needed for the first field. @@ -195,13 +216,13 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION tags = ''.join(tags) fields = ''.join(fields) - timestamp = '{p[0].value}' + timestamp = '{p[%s].value}' % timestamp_index if precision == WritePrecision.US: - timestamp = '{int(p[0].value / 1e3)}' + timestamp = '{int(p[%s].value / 1e3)}' % timestamp_index elif precision == WritePrecision.MS: - timestamp = '{int(p[0].value / 1e6)}' + timestamp = '{int(p[%s].value / 1e6)}' % timestamp_index elif precision == WritePrecision.S: - timestamp = '{int(p[0].value / 1e9)}' + timestamp = '{int(p[%s].value / 1e9)}' % timestamp_index f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', { 'measurement_name': measurement_name, @@ -268,5 +289,9 @@ def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_W :param precision: The precision for the unix timestamps within the body line-protocol. :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields - """ + :key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value + formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00` + or other formats and types supported by `pandas.to_datetime `_ - ``DataFrame`` + :key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame`` + """ # noqa: E501 return DataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize() diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 5c78bf23..c069e684 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -294,6 +294,10 @@ def write(self, bucket: str, org: str = None, :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame`` :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields - ``DataFrame`` + :key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value + formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00` + or other formats and types supported by `pandas.to_datetime `_ - ``DataFrame`` + :key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame`` :key record_measurement_key: key of record with specified measurement - ``dictionary``, ``NamedTuple``, ``dataclass`` :key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass`` @@ -322,8 +326,8 @@ def write(self, bucket: str, org: str = None, write_api.write("my-bucket", "my-org", point) DataFrame: - The index of `Pandas DataFrame `_ - is used as a ``timestamp`` for written data. The index should be `PeriodIndex `_ + If the ``data_frame_timestamp_column`` is not specified the index of `Pandas DataFrame `_ + is used as a ``timestamp`` for written data. The index can be `PeriodIndex `_ or its must be transformable to ``datetime`` by `pandas.to_datetime `_. diff --git a/influxdb_client/client/write_api_async.py b/influxdb_client/client/write_api_async.py index b312c77b..f9351783 100644 --- a/influxdb_client/client/write_api_async.py +++ b/influxdb_client/client/write_api_async.py @@ -58,6 +58,10 @@ async def write(self, bucket: str, org: str = None, :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame`` :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields - ``DataFrame`` + :key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value + formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00` + or other formats and types supported by `pandas.to_datetime `_ - ``DataFrame`` + :key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame`` :key record_measurement_key: key of record with specified measurement - ``dictionary``, ``NamedTuple``, ``dataclass`` :key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass`` @@ -87,8 +91,8 @@ async def write(self, bucket: str, org: str = None, await write_api.write("my-bucket", "my-org", point) DataFrame: - The index of `Pandas DataFrame `_ - is used as a ``timestamp`` for written data. The index should be `PeriodIndex `_ + If the ``data_frame_timestamp_column`` is not specified the index of `Pandas DataFrame `_ + is used as a ``timestamp`` for written data. The index can be `PeriodIndex `_ or its must be transformable to ``datetime`` by `pandas.to_datetime `_. diff --git a/tests/test_WriteApiDataFrame.py b/tests/test_WriteApiDataFrame.py index d285463d..fcefda35 100644 --- a/tests/test_WriteApiDataFrame.py +++ b/tests/test_WriteApiDataFrame.py @@ -416,6 +416,101 @@ def test_without_tags_and_fields_with_nan(self): self.assertEqual("test a=1.0 1609459260000000000", points[1]) self.assertEqual("test a=2.0,b=1.0 1609459320000000000", points[2]) + def test_use_timestamp_from_specified_column(self): + from influxdb_client.extras import pd + data_frame = pd.DataFrame(data={ + 'column_time': ['2020-04-05', '2020-05-05'], + 'value1': [10, 20], + 'value2': [30, 40], + }, index=['A', 'B']) + + points = data_frame_to_list_of_points(data_frame=data_frame, + data_frame_measurement_name="test", + data_frame_timestamp_column="column_time", + point_settings=PointSettings()) + + self.assertEqual(2, len(points)) + self.assertEqual('test value1=10i,value2=30i 1586044800000000000', points[0]) + self.assertEqual('test value1=20i,value2=40i 1588636800000000000', points[1]) + + def test_str_format_for_timestamp(self): + from influxdb_client.extras import pd + + time_formats = [ + ('2018-10-26', 'test value1=10i,value2=20i 1540512000000000000'), + ('2018-10-26 10:00', 'test value1=10i,value2=20i 1540548000000000000'), + ('2018-10-26 10:00:00-05:00', 'test value1=10i,value2=20i 1540566000000000000'), + ('2018-10-26T11:00:00+00:00', 'test value1=10i,value2=20i 1540551600000000000'), + ('2018-10-26 12:00:00+00:00', 'test value1=10i,value2=20i 1540555200000000000'), + ('2018-10-26T16:00:00-01:00', 'test value1=10i,value2=20i 1540573200000000000'), + ] + + for time_format in time_formats: + data_frame = pd.DataFrame(data={ + 'column_time': [time_format[0]], + 'value1': [10], + 'value2': [20], + }, index=['A']) + points = data_frame_to_list_of_points(data_frame=data_frame, + data_frame_measurement_name="test", + data_frame_timestamp_column="column_time", + point_settings=PointSettings()) + + self.assertEqual(1, len(points)) + self.assertEqual(time_format[1], points[0]) + + def test_specify_timezone(self): + from influxdb_client.extras import pd + data_frame = pd.DataFrame(data={ + 'column_time': ['2020-05-24 10:00', '2020-05-24 01:00'], + 'value1': [10, 20], + 'value2': [30, 40], + }, index=['A', 'B']) + + points = data_frame_to_list_of_points(data_frame=data_frame, + data_frame_measurement_name="test", + data_frame_timestamp_column="column_time", + data_frame_timestamp_timezone="Europe/Berlin", + point_settings=PointSettings()) + + self.assertEqual(2, len(points)) + self.assertEqual('test value1=10i,value2=30i 1590307200000000000', points[0]) + self.assertEqual('test value1=20i,value2=40i 1590274800000000000', points[1]) + + def test_specify_timezone_date_time_index(self): + from influxdb_client.extras import pd + data_frame = pd.DataFrame(data={ + 'value1': [10, 20], + 'value2': [30, 40], + }, index=[pd.Timestamp('2020-05-24 10:00'), pd.Timestamp('2020-05-24 01:00')]) + + points = data_frame_to_list_of_points(data_frame=data_frame, + data_frame_measurement_name="test", + data_frame_timestamp_timezone="Europe/Berlin", + point_settings=PointSettings()) + + self.assertEqual(2, len(points)) + self.assertEqual('test value1=10i,value2=30i 1590307200000000000', points[0]) + self.assertEqual('test value1=20i,value2=40i 1590274800000000000', points[1]) + + def test_specify_timezone_period_time_index(self): + from influxdb_client.extras import pd + data_frame = pd.DataFrame(data={ + 'value1': [10, 20], + 'value2': [30, 40], + }, index=pd.period_range(start='2020-05-24 10:00', freq='H', periods=2)) + + print(data_frame.to_string()) + + points = data_frame_to_list_of_points(data_frame=data_frame, + data_frame_measurement_name="test", + data_frame_timestamp_timezone="Europe/Berlin", + point_settings=PointSettings()) + + self.assertEqual(2, len(points)) + self.assertEqual('test value1=10i,value2=30i 1590307200000000000', points[0]) + self.assertEqual('test value1=20i,value2=40i 1590310800000000000', points[1]) + class DataSerializerChunksTest(unittest.TestCase): def test_chunks(self):