From 212cee30c8b60119df01d2b7f6ee02ba97006a22 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Tue, 28 Apr 2020 14:45:44 +0200 Subject: [PATCH 1/7] feat: support for writing pandas DataFrame (#79) --- influxdb_client/client/write_api.py | 96 ++++++++++++++++++++++++----- tests/test_WriteApi.py | 36 +++++++++++ 2 files changed, 118 insertions(+), 14 deletions(-) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index d51eafa7..18503ed3 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -183,21 +183,24 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions() def write(self, bucket: str, org: str = None, record: Union[ str, List['str'], Point, List['Point'], dict, List['dict'], bytes, List['bytes'], Observable] = None, - write_precision: WritePrecision = DEFAULT_WRITE_PRECISION) -> None: + write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, data_frame_measurement_name: str = None, + data_frame_tag_columns: List['str'] = None) -> None: """ Writes time-series data into influxdb. :param str org: specifies the destination organization for writes; take either the ID or Name interchangeably; if both orgID and org are specified, org takes precedence. (required) :param str bucket: specifies the destination bucket for writes (required) :param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol - :param record: Points, line protocol, RxPY Observable to write + :param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write + :param data_frame_measurement_name: name of measurement for writing Pandas DataFrame + :param data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields """ if org is None: org = self._influxdb_client.org - if self._point_settings.defaultTags and record: + if self._point_settings.defaultTags and record is not None: for key, val in self._point_settings.defaultTags.items(): if isinstance(record, dict): record.get("tags")[key] = val @@ -211,7 +214,9 @@ def write(self, bucket: str, org: str = None, if self._write_options.write_type is WriteType.batching: return self._write_batching(bucket, org, record, write_precision) - final_string = self._serialize(record, write_precision) + final_string = self._serialize(record, write_precision, + data_frame_measurement_name, + data_frame_tag_columns) _async_req = True if self._write_options.write_type == WriteType.asynchronous else False @@ -235,7 +240,7 @@ def __del__(self): self._disposable = None pass - def _serialize(self, record, write_precision) -> bytes: + def _serialize(self, record, write_precision, data_frame_measurement_name, data_frame_tag_columns) -> bytes: _result = b'' if isinstance(record, bytes): _result = record @@ -244,40 +249,103 @@ def _serialize(self, record, write_precision) -> bytes: _result = record.encode("utf-8") elif isinstance(record, Point): - _result = self._serialize(record.to_line_protocol(), write_precision=write_precision) + _result = self._serialize(record.to_line_protocol(), write_precision, + data_frame_measurement_name, data_frame_tag_columns) elif isinstance(record, dict): _result = self._serialize(Point.from_dict(record, write_precision=write_precision), - write_precision=write_precision) + write_precision, + data_frame_measurement_name, data_frame_tag_columns) + elif 'DataFrame' in type(record).__name__: + _result = self._serialize(self._data_frame_to_list_of_points(record, data_frame_measurement_name, + data_frame_tag_columns, + precision=write_precision), + write_precision, + data_frame_measurement_name, data_frame_tag_columns) + elif isinstance(record, list): - _result = b'\n'.join([self._serialize(item, write_precision=write_precision) for item in record]) + _result = b'\n'.join([self._serialize(item, write_precision, + data_frame_measurement_name, data_frame_tag_columns) for item in record]) return _result - def _write_batching(self, bucket, org, data, precision=DEFAULT_WRITE_PRECISION): + def _write_batching(self, bucket, org, data, + data_frame_measurement_name, data_frame_tag_columns, + precision=DEFAULT_WRITE_PRECISION): _key = _BatchItemKey(bucket, org, precision) if isinstance(data, bytes): self._subject.on_next(_BatchItem(key=_key, data=data)) elif isinstance(data, str): - self._write_batching(bucket, org, data.encode("utf-8"), precision) + self._write_batching(bucket, org, data.encode("utf-8"), + data_frame_measurement_name, data_frame_tag_columns, precision) elif isinstance(data, Point): - self._write_batching(bucket, org, data.to_line_protocol(), precision) + self._write_batching(bucket, org, data.to_line_protocol(), + data_frame_measurement_name, data_frame_tag_columns, precision) elif isinstance(data, dict): - self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), precision) + self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), + data_frame_measurement_name, data_frame_tag_columns, precision) + + elif 'DataFrame' in type(data).__name__: + self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, data_frame_measurement_name, + data_frame_tag_columns, precision), + data_frame_measurement_name, data_frame_tag_columns, precision) elif isinstance(data, list): for item in data: - self._write_batching(bucket, org, item, precision) + self._write_batching(bucket, org, item, + data_frame_measurement_name, data_frame_tag_columns, precision) elif isinstance(data, Observable): - data.subscribe(lambda it: self._write_batching(bucket, org, it, precision)) + data.subscribe(lambda it: self._write_batching(bucket, org, it, + data_frame_measurement_name, data_frame_tag_columns, + precision)) pass return None + def _data_frame_to_list_of_points(self, dataframe, data_frame_measurement_name, data_frame_tag_columns, precision='s'): + from ..extras import pd + if not isinstance(dataframe, pd.DataFrame): + raise TypeError('Must be DataFrame, but type was: {0}.' + .format(type(dataframe))) + if not (isinstance(dataframe.index, pd.PeriodIndex) or + isinstance(dataframe.index, pd.DatetimeIndex)): + raise TypeError('Must be DataFrame with DatetimeIndex or \ + PeriodIndex.') + + if isinstance(dataframe.index, pd.PeriodIndex): + dataframe.index = dataframe.index.to_timestamp() + else: + dataframe.index = pd.to_datetime(dataframe.index) + + if dataframe.index.tzinfo is None: + dataframe.index = dataframe.index.tz_localize('UTC') + + data = [] + + c = 0 + for v in dataframe.values: + point = Point(measurement_name=data_frame_measurement_name) + + count = 0 + for f in v: + column = dataframe.columns[count] + if data_frame_tag_columns and column in data_frame_tag_columns: + point.tag(column, f) + else: + point.field(column, f) + count += 1 + + point.time(dataframe.index[c], precision) + c += 1 + + data.append(point) + + return data + def _http(self, batch_item: _BatchItem): logger.debug("Write time series data into InfluxDB: %s", batch_item) diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 1b3ec547..2e0041ba 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -6,6 +6,7 @@ import os import unittest import time +from datetime import timedelta from multiprocessing.pool import ApplyResult from influxdb_client import Point, WritePrecision, InfluxDBClient @@ -224,6 +225,41 @@ def test_write_bytes(self): self.delete_test_bucket(_bucket) + def test_write_data_frame(self): + from influxdb_client.extras import pd + + bucket = self.create_test_bucket() + + now = pd.Timestamp('1970-01-01 00:00+00:00') + data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]], + index=[now + timedelta(hours=1), now + timedelta(hours=2)], + columns=["location", "water_level"]) + + self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet', + data_frame_tag_columns=['location']) + + result = self.query_api.query( + "from(bucket:\"" + bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", self.org) + + self.assertEqual(1, len(result)) + self.assertEqual(2, len(result[0].records)) + + self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet") + self.assertEqual(result[0].records[0].get_value(), 1.0) + self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek") + self.assertEqual(result[0].records[0].get_field(), "water_level") + self.assertEqual(result[0].records[0].get_time(), + datetime.datetime(1970, 1, 1, 1, 0, tzinfo=datetime.timezone.utc)) + + self.assertEqual(result[0].records[1].get_measurement(), "h2o_feet") + self.assertEqual(result[0].records[1].get_value(), 2.0) + self.assertEqual(result[0].records[1].values.get("location"), "coyote_creek") + self.assertEqual(result[0].records[1].get_field(), "water_level") + self.assertEqual(result[0].records[1].get_time(), + datetime.datetime(1970, 1, 1, 2, 0, tzinfo=datetime.timezone.utc)) + + self.delete_test_bucket(bucket) + def test_use_default_org(self): bucket = self.create_test_bucket() From aa491824c911737cf2e5e3eb7ab67c4d180ca513 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Wed, 29 Apr 2020 10:38:05 +0200 Subject: [PATCH 2/7] feat: support for writing pandas DataFrame (#79) - default tags --- influxdb_client/client/write_api.py | 30 +++++++++++++---------- tests/test_WriteApi.py | 38 +++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 13 deletions(-) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 18503ed3..08f6067b 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -306,42 +306,46 @@ def _write_batching(self, bucket, org, data, return None - def _data_frame_to_list_of_points(self, dataframe, data_frame_measurement_name, data_frame_tag_columns, precision='s'): + def _data_frame_to_list_of_points(self, data_frame, data_frame_measurement_name, data_frame_tag_columns, precision): from ..extras import pd - if not isinstance(dataframe, pd.DataFrame): + if not isinstance(data_frame, pd.DataFrame): raise TypeError('Must be DataFrame, but type was: {0}.' - .format(type(dataframe))) - if not (isinstance(dataframe.index, pd.PeriodIndex) or - isinstance(dataframe.index, pd.DatetimeIndex)): + .format(type(data_frame))) + if not (isinstance(data_frame.index, pd.PeriodIndex) or + isinstance(data_frame.index, pd.DatetimeIndex)): raise TypeError('Must be DataFrame with DatetimeIndex or \ PeriodIndex.') - if isinstance(dataframe.index, pd.PeriodIndex): - dataframe.index = dataframe.index.to_timestamp() + if isinstance(data_frame.index, pd.PeriodIndex): + data_frame.index = data_frame.index.to_timestamp() else: - dataframe.index = pd.to_datetime(dataframe.index) + data_frame.index = pd.to_datetime(data_frame.index) - if dataframe.index.tzinfo is None: - dataframe.index = dataframe.index.tz_localize('UTC') + if data_frame.index.tzinfo is None: + data_frame.index = data_frame.index.tz_localize('UTC') data = [] c = 0 - for v in dataframe.values: + for v in data_frame.values: point = Point(measurement_name=data_frame_measurement_name) count = 0 for f in v: - column = dataframe.columns[count] + column = data_frame.columns[count] if data_frame_tag_columns and column in data_frame_tag_columns: point.tag(column, f) else: point.field(column, f) count += 1 - point.time(dataframe.index[c], precision) + point.time(data_frame.index[c], precision) c += 1 + if self._point_settings.defaultTags: + for key, val in self._point_settings.defaultTags.items(): + point.tag(key, val) + data.append(point) return data diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 2e0041ba..926a6bf9 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -398,6 +398,44 @@ def test_use_default_tags_with_dictionaries(self): self.delete_test_bucket(bucket) + def test_use_default_tags_with_data_frame(self): + from influxdb_client.extras import pd + + bucket = self.create_test_bucket() + + now = pd.Timestamp('1970-01-01 00:00+00:00') + data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]], + index=[now + timedelta(hours=1), now + timedelta(hours=2)], + columns=["location", "water_level"]) + + self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet', + data_frame_tag_columns=['location']) + + time.sleep(1) + + query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)' + + flux_result = self.client.query_api().query(query) + + self.assertEqual(1, len(flux_result)) + + records = flux_result[0].records + + self.assertEqual(2, len(records)) + + rec = records[0] + rec2 = records[1] + + self.assertEqual(self.id_tag, rec["id"]) + self.assertEqual(self.customer_tag, rec["customer"]) + self.assertEqual("LA", rec[self.data_center_key]) + + self.assertEqual(self.id_tag, rec2["id"]) + self.assertEqual(self.customer_tag, rec2["customer"]) + self.assertEqual("LA", rec2[self.data_center_key]) + + self.delete_test_bucket(bucket) + def test_write_bytes(self): bucket = self.create_test_bucket() From baf695186b7a7fcf234e80eda68ed226880d6f0e Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Wed, 29 Apr 2020 11:28:24 +0200 Subject: [PATCH 3/7] feat: support for writing pandas DataFrame (#79) - readme --- CHANGELOG.md | 3 +++ README.rst | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 636b73e9..55aba4dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.7.0 [unreleased] +### Features +1. [#79](https://github.com/influxdata/influxdb-client-python/issues/79): Added support for writing Pandas DataFrame + ### Bug Fixes 1. [#85](https://github.com/influxdata/influxdb-client-python/issues/85): Fixed a possibility to generate empty write batch diff --git a/README.rst b/README.rst index 52b7eeb0..b8b1dd49 100644 --- a/README.rst +++ b/README.rst @@ -49,6 +49,7 @@ InfluxDB 2.0 client features - `Line Protocol `_ - `Data Point `__ - `RxPY `__ Observable + - `Pandas DataFrame `_ - `How to writes <#writes>`_ - `InfluxDB 2.0 API `_ client for management - the client is generated from the `swagger `_ by using the `openapi-generator `_ @@ -219,6 +220,7 @@ The data could be written as 3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` 4. List of above items 5. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item +6. `Pandas DataFrame `_ Batching @@ -302,6 +304,16 @@ The batching is configurable by ``write_options``\ : _write_client.write("my-bucket", "my-org", _data) + """ + Write Pandas DataFrame + """ + now = pd.Timestamp.now('UTC') + data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]], + index=[now, now + timedelta(hours=1)], + columns=["location", "water_level"]) + + self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet', + data_frame_tag_columns=['location']) """ Close client From 0cacefc062087190484f6bc66a338cd2c6dae1d6 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Wed, 29 Apr 2020 12:50:03 +0200 Subject: [PATCH 4/7] feat: support for writing pandas DataFrame (#79) - batching --- influxdb_client/client/write_api.py | 7 ++----- tests/test_WriteApiBatching.py | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 08f6067b..e04ca23e 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -212,7 +212,8 @@ def write(self, bucket: str, org: str = None, r.tag(key, val) if self._write_options.write_type is WriteType.batching: - return self._write_batching(bucket, org, record, write_precision) + return self._write_batching(bucket, org, record, data_frame_measurement_name, data_frame_tag_columns, + write_precision) final_string = self._serialize(record, write_precision, data_frame_measurement_name, @@ -311,10 +312,6 @@ def _data_frame_to_list_of_points(self, data_frame, data_frame_measurement_name, if not isinstance(data_frame, pd.DataFrame): raise TypeError('Must be DataFrame, but type was: {0}.' .format(type(data_frame))) - if not (isinstance(data_frame.index, pd.PeriodIndex) or - isinstance(data_frame.index, pd.DatetimeIndex)): - raise TypeError('Must be DataFrame with DatetimeIndex or \ - PeriodIndex.') if isinstance(data_frame.index, pd.PeriodIndex): data_frame.index = data_frame.index.to_timestamp() diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index b661c202..6f8fd393 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -407,6 +407,33 @@ def test_to_low_flush_interval(self): httpretty.reset() + def test_batching_data_frame(self): + from influxdb_client.extras import pd + + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0], + ["coyote_creek", 3.0], ["coyote_creek", 4.0]], + index=[1, 2, 3, 4], + columns=["location", "level water_level"]) + + self._write_client.write("my-bucket", "my-org", record=data_frame, + data_frame_measurement_name='h2o_feet', + data_frame_tag_columns=['location']) + + time.sleep(1) + + _requests = httpretty.httpretty.latest_requests + + self.assertEqual(2, len(_requests)) + _request1 = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n" \ + "h2o_feet,location=coyote_creek level\\ water_level=2.0 2" + _request2 = "h2o_feet,location=coyote_creek level\\ water_level=3.0 3\n" \ + "h2o_feet,location=coyote_creek level\\ water_level=4.0 4" + + self.assertEqual(_request1, _requests[0].parsed_body) + self.assertEqual(_request2, _requests[1].parsed_body) if __name__ == '__main__': unittest.main() From 3b4732659a9d9d195950bf8c6f4f2c97a43ea576 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Wed, 29 Apr 2020 12:53:33 +0200 Subject: [PATCH 5/7] feat: support for writing pandas DataFrame (#79) - readme --- README.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index b8b1dd49..1c9c3e75 100644 --- a/README.rst +++ b/README.rst @@ -307,13 +307,13 @@ The batching is configurable by ``write_options``\ : """ Write Pandas DataFrame """ - now = pd.Timestamp.now('UTC') - data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]], - index=[now, now + timedelta(hours=1)], - columns=["location", "water_level"]) + _now = pd.Timestamp().now('UTC') + _data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]], + index=[now, now + timedelta(hours=1)], + columns=["location", "water_level"]) - self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet', - data_frame_tag_columns=['location']) + _write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet', + data_frame_tag_columns=['location']) """ Close client From 3a3aab8c7b038d63019455bf7bab67c7aeebd0e3 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Mon, 4 May 2020 10:13:17 +0200 Subject: [PATCH 6/7] fix: support for writing pandas DataFrame (#79) --- influxdb_client/client/write_api.py | 67 ++++++++++++----------------- 1 file changed, 27 insertions(+), 40 deletions(-) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index e04ca23e..a2620395 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -183,8 +183,7 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions() def write(self, bucket: str, org: str = None, record: Union[ str, List['str'], Point, List['Point'], dict, List['dict'], bytes, List['bytes'], Observable] = None, - write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, data_frame_measurement_name: str = None, - data_frame_tag_columns: List['str'] = None) -> None: + write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> None: """ Writes time-series data into influxdb. @@ -212,12 +211,10 @@ def write(self, bucket: str, org: str = None, r.tag(key, val) if self._write_options.write_type is WriteType.batching: - return self._write_batching(bucket, org, record, data_frame_measurement_name, data_frame_tag_columns, - write_precision) + return self._write_batching(bucket, org, record, + write_precision, **kwargs) - final_string = self._serialize(record, write_precision, - data_frame_measurement_name, - data_frame_tag_columns) + final_string = self._serialize(record, write_precision, **kwargs) _async_req = True if self._write_options.write_type == WriteType.asynchronous else False @@ -241,7 +238,7 @@ def __del__(self): self._disposable = None pass - def _serialize(self, record, write_precision, data_frame_measurement_name, data_frame_tag_columns) -> bytes: + def _serialize(self, record, write_precision, **kwargs) -> bytes: _result = b'' if isinstance(record, bytes): _result = record @@ -250,64 +247,57 @@ def _serialize(self, record, write_precision, data_frame_measurement_name, data_ _result = record.encode("utf-8") elif isinstance(record, Point): - _result = self._serialize(record.to_line_protocol(), write_precision, - data_frame_measurement_name, data_frame_tag_columns) + _result = self._serialize(record.to_line_protocol(), write_precision, **kwargs) elif isinstance(record, dict): _result = self._serialize(Point.from_dict(record, write_precision=write_precision), - write_precision, - data_frame_measurement_name, data_frame_tag_columns) + write_precision, **kwargs) elif 'DataFrame' in type(record).__name__: - _result = self._serialize(self._data_frame_to_list_of_points(record, data_frame_measurement_name, - data_frame_tag_columns, - precision=write_precision), + _result = self._serialize(self._data_frame_to_list_of_points(record, + precision=write_precision, **kwargs), write_precision, - data_frame_measurement_name, data_frame_tag_columns) + **kwargs) elif isinstance(record, list): _result = b'\n'.join([self._serialize(item, write_precision, - data_frame_measurement_name, data_frame_tag_columns) for item in record]) + **kwargs) for item in record]) return _result def _write_batching(self, bucket, org, data, - data_frame_measurement_name, data_frame_tag_columns, - precision=DEFAULT_WRITE_PRECISION): + precision=DEFAULT_WRITE_PRECISION, + **kwargs): _key = _BatchItemKey(bucket, org, precision) if isinstance(data, bytes): self._subject.on_next(_BatchItem(key=_key, data=data)) elif isinstance(data, str): self._write_batching(bucket, org, data.encode("utf-8"), - data_frame_measurement_name, data_frame_tag_columns, precision) + precision, **kwargs) elif isinstance(data, Point): self._write_batching(bucket, org, data.to_line_protocol(), - data_frame_measurement_name, data_frame_tag_columns, precision) + precision, **kwargs) elif isinstance(data, dict): self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), - data_frame_measurement_name, data_frame_tag_columns, precision) + precision, **kwargs) elif 'DataFrame' in type(data).__name__: - self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, data_frame_measurement_name, - data_frame_tag_columns, precision), - data_frame_measurement_name, data_frame_tag_columns, precision) + self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, precision, **kwargs), + precision, **kwargs) elif isinstance(data, list): for item in data: - self._write_batching(bucket, org, item, - data_frame_measurement_name, data_frame_tag_columns, precision) + self._write_batching(bucket, org, item, precision, **kwargs) elif isinstance(data, Observable): - data.subscribe(lambda it: self._write_batching(bucket, org, it, - data_frame_measurement_name, data_frame_tag_columns, - precision)) + data.subscribe(lambda it: self._write_batching(bucket, org, it, precision, **kwargs)) pass return None - def _data_frame_to_list_of_points(self, data_frame, data_frame_measurement_name, data_frame_tag_columns, precision): + def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs): from ..extras import pd if not isinstance(data_frame, pd.DataFrame): raise TypeError('Must be DataFrame, but type was: {0}.' @@ -323,21 +313,18 @@ def _data_frame_to_list_of_points(self, data_frame, data_frame_measurement_name, data = [] - c = 0 - for v in data_frame.values: - point = Point(measurement_name=data_frame_measurement_name) + for c, (row) in enumerate(data_frame.values): + point = Point(measurement_name=kwargs.get('data_frame_measurement_name')) - count = 0 - for f in v: + for count, (value) in enumerate(row): column = data_frame.columns[count] + data_frame_tag_columns = kwargs.get('data_frame_tag_columns') if data_frame_tag_columns and column in data_frame_tag_columns: - point.tag(column, f) + point.tag(column, value) else: - point.field(column, f) - count += 1 + point.field(column, value) point.time(data_frame.index[c], precision) - c += 1 if self._point_settings.defaultTags: for key, val in self._point_settings.defaultTags.items(): From f951d43797fbff07376f76fb254ad4cb58ccbefe Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Mon, 4 May 2020 10:47:01 +0200 Subject: [PATCH 7/7] feat: support for writing pandas DataFrame (#79) - check if measurement name is in kwargs --- influxdb_client/client/write_api.py | 3 +++ tests/test_WriteApi.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index a2620395..4ddf778a 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -303,6 +303,9 @@ def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs): raise TypeError('Must be DataFrame, but type was: {0}.' .format(type(data_frame))) + if 'data_frame_measurement_name' not in kwargs: + raise TypeError('"data_frame_measurement_name" is a Required Argument') + if isinstance(data_frame.index, pd.PeriodIndex): data_frame.index = data_frame.index.to_timestamp() else: diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 926a6bf9..42d0798d 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -260,6 +260,22 @@ def test_write_data_frame(self): self.delete_test_bucket(bucket) + def test_write_data_frame_without_measurement_name(self): + from influxdb_client.extras import pd + + bucket = self.create_test_bucket() + + now = pd.Timestamp('1970-01-01 00:00+00:00') + data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]], + index=[now + timedelta(hours=1), now + timedelta(hours=2)], + columns=["location", "water_level"]) + + with self.assertRaises(TypeError) as cm: + self.write_client.write(bucket.name, record=data_frame) + exception = cm.exception + + self.assertEqual('"data_frame_measurement_name" is a Required Argument', exception.__str__()) + def test_use_default_org(self): bucket = self.create_test_bucket()