From 232816b95b5124df8e01724ee629f43ad5ea2381 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 15 Oct 2019 10:02:30 +0200 Subject: [PATCH 1/2] feat: Add support to write by dictionary (#24) --- CHANGELOG.md | 3 ++ README.rst | 23 +++++++++- influxdb_client/client/write/point.py | 11 +++++ influxdb_client/client/write_api.py | 11 ++++- tests/test_WriteApi.py | 62 +++++++++++++++++++++++++++ tests/test_WriteApiBatching.py | 43 +++++++++++++------ 6 files changed, 136 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00265f41..eaef1890 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.0.3 [unreleased] +### Features +1. [#24](https://github.com/influxdata/influxdb-client-python/issues/24): Added possibility to write dictionary-style object + ### API 1. [#21](https://github.com/bonitoo-io/influxdb-client-python/pull/21): Updated swagger to latest version diff --git a/README.rst b/README.rst index 9f69a5e6..d4baa58e 100644 --- a/README.rst +++ b/README.rst @@ -45,7 +45,7 @@ InfluxDB 2.0 client features - Writing data using - `Line Protocol `_ - `Data Point `__ - - `RxPY`_ Observable + - `RxPY `_ Observable - Not implemented yet - write user types using decorator - write Pandas DataFrame @@ -151,7 +151,16 @@ Writes The `WriteApi `_ supports synchronous, asynchronous and batching writes into InfluxDB 2.0. The data should be passed as a `InfluxDB Line Protocol `_\ , `Data Point `_ or Observable stream. -*The default instance of ``WriteApi`` use batching.* +*The default instance of WriteApi use batching.* + +The data could be written as: + +1. ``string`` that is formatted as a InfluxDB's line protocol +2. `Data Point `__ structure +3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` +4. List of above items +5. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item + Batching """""""" @@ -201,6 +210,16 @@ The batching is configurable by ``write_options``\ : _write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2", "h2o_feet,location=coyote_creek water_level=3.0 3"]) + """ + Write Dictionary-style object + """ + _write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "fields": {"water_level": 1.0}, "time": 1}) + _write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "fields": {"water_level": 2.0}, "time": 2}, + {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "fields": {"water_level": 3.0}, "time": 3}]) + """ Write Data Point """ diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index 59476fef..53ac1c14 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -13,6 +13,7 @@ EPOCH = UTC.localize(datetime.utcfromtimestamp(0)) DEFAULT_WRITE_PRECISION = WritePrecision.NS + class Point(object): """ Point defines the values that will be written to the database. @@ -24,6 +25,16 @@ def measurement(measurement): p = Point(measurement) return p + @staticmethod + def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION): + point = Point(dictionary['measurement']) + for tag_key, tag_value in dictionary['tags'].items(): + point.tag(tag_key, tag_value) + for field_key, field_value in dictionary['fields'].items(): + point.field(field_key, field_value) + point.time(dictionary['time'], write_precision=write_precision) + return point + def __init__(self, measurement_name): self._tags = {} self._fields = {} diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 8ff0c6a1..df01241e 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -143,7 +143,8 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions() self._subject = None self._disposable = None - def write(self, bucket: str, org: str, record: Union[str, List['str'], Point, List['Point'], Observable], + def write(self, bucket: str, org: str, + record: Union[str, List['str'], Point, List['Point'], dict, List['dict'], Observable], write_precision: WritePrecision = DEFAULT_WRITE_PRECISION) -> None: """ Writes time-series data into influxdb. @@ -166,6 +167,9 @@ def write(self, bucket: str, org: str, record: Union[str, List['str'], Point, Li if isinstance(record, Point): final_string = record.to_line_protocol() + if isinstance(record, dict): + final_string = Point.from_dict(record, write_precision=write_precision).to_line_protocol() + if isinstance(record, list): lines = [] for item in record: @@ -173,6 +177,8 @@ def write(self, bucket: str, org: str, record: Union[str, List['str'], Point, Li lines.append(item) if isinstance(item, Point): lines.append(item.to_line_protocol()) + if isinstance(item, dict): + lines.append(Point.from_dict(item, write_precision=write_precision).to_line_protocol()) final_string = '\n'.join(lines) _async_req = True if self._write_options.write_type == WriteType.asynchronous else False @@ -205,6 +211,9 @@ def _write_batching(self, bucket, org, data, precision=DEFAULT_WRITE_PRECISION): elif isinstance(data, Point): self._subject.on_next(_BatchItem(key=_key, data=data.to_line_protocol())) + elif isinstance(data, dict): + self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), precision) + elif isinstance(data, list): for item in data: self._write_batching(bucket, org, item, precision) diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 69eb6ec0..1d6667fa 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -4,6 +4,7 @@ import datetime import unittest +import time from multiprocessing.pool import ApplyResult from influxdb_client import Point, WritePrecision @@ -135,6 +136,27 @@ def test_write_error(self): self.assertEqual(400, exception.status) self.assertEqual("Bad Request", exception.reason) + def test_write_dictionary(self): + _bucket = self.create_test_bucket() + _point = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "time": "2009-11-10T23:00:00Z", "fields": {"water_level": 1.0}} + + self.write_client.write(_bucket.name, self.org, _point) + self.write_client.flush() + + result = self.query_api.query( + "from(bucket:\"" + _bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z) |> last()", self.org) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet") + self.assertEqual(result[0].records[0].get_value(), 1.0) + self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek") + self.assertEqual(result[0].records[0].get(), "water_level") + self.assertEqual(result[0].records[0].get_time(), + datetime.datetime(2009, 11, 10, 23, 0, tzinfo=datetime.timezone.utc)) + + self.delete_test_bucket(_bucket) + class AsynchronousWriteTest(BaseTest): @@ -156,6 +178,46 @@ def test_write_result(self): self.assertEqual(None, result.get()) self.delete_test_bucket(_bucket) + def test_write_dictionaries(self): + bucket = self.create_test_bucket() + + _point1 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "time": "2009-11-10T22:00:00Z", "fields": {"water_level": 1.0}} + _point2 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "time": "2009-11-10T23:00:00Z", "fields": {"water_level": 2.0}} + + _point_list = [_point1, _point2] + + self.write_client.write(bucket.name, self.org, _point_list) + time.sleep(1) + + query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)' + print(query) + + flux_result = self.client.query_api().query(query) + + self.assertEqual(1, len(flux_result)) + + records = flux_result[0].records + + self.assertEqual(2, len(records)) + + self.assertEqual("h2o_feet", records[0].get_measurement()) + self.assertEqual(1, records[0].get_value()) + self.assertEqual("water_level", records[0].get_field()) + self.assertEqual("coyote_creek", records[0].values.get('location')) + self.assertEqual(records[0].get_time(), + datetime.datetime(2009, 11, 10, 22, 0, tzinfo=datetime.timezone.utc)) + + self.assertEqual("h2o_feet", records[1].get_measurement()) + self.assertEqual(2, records[1].get_value()) + self.assertEqual("water_level", records[1].get_field()) + self.assertEqual("coyote_creek", records[1].values.get('location')) + self.assertEqual(records[1].get_time(), + datetime.datetime(2009, 11, 10, 23, 0, tzinfo=datetime.timezone.utc)) + + self.delete_test_bucket(bucket) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index 7a0d1445..024d5f6a 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -33,9 +33,6 @@ def setUp(self) -> None: self.influxdb_client = InfluxDBClient(url=conf.host, token="my-token") - # self._api_client = influxdb_client.ApiClient(configuration=conf, header_name="Authorization", - # header_value="Token my-token") - write_options = WriteOptions(batch_size=2, flush_interval=5_000, retry_interval=3_000) self._write_client = WriteApi(influxdb_client=self.influxdb_client, write_options=write_options) @@ -48,10 +45,10 @@ def test_batch_size(self): httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) self._write_client.write("my-bucket", "my-org", - ["h2o_feet,location=coyote_creek level\\ water_level=1.0 1", - "h2o_feet,location=coyote_creek level\\ water_level=2.0 2", - "h2o_feet,location=coyote_creek level\\ water_level=3.0 3", - "h2o_feet,location=coyote_creek level\\ water_level=4.0 4"]) + ["h2o_feet,location=coyote_creek level\\ water_level=1.0 1", + "h2o_feet,location=coyote_creek level\\ water_level=2.0 2", + "h2o_feet,location=coyote_creek level\\ water_level=3.0 3", + "h2o_feet,location=coyote_creek level\\ water_level=4.0 4"]) time.sleep(1) @@ -88,23 +85,23 @@ def test_batch_size_group_by(self): httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) self._write_client.write("my-bucket", "my-org", - "h2o_feet,location=coyote_creek level\\ water_level=1.0 1") + "h2o_feet,location=coyote_creek level\\ water_level=1.0 1") self._write_client.write("my-bucket", "my-org", - "h2o_feet,location=coyote_creek level\\ water_level=2.0 2", + "h2o_feet,location=coyote_creek level\\ water_level=2.0 2", write_precision=WritePrecision.S) self._write_client.write("my-bucket", "my-org-a", - "h2o_feet,location=coyote_creek level\\ water_level=3.0 3") + "h2o_feet,location=coyote_creek level\\ water_level=3.0 3") self._write_client.write("my-bucket", "my-org-a", - "h2o_feet,location=coyote_creek level\\ water_level=4.0 4") + "h2o_feet,location=coyote_creek level\\ water_level=4.0 4") self._write_client.write("my-bucket2", "my-org-a", - "h2o_feet,location=coyote_creek level\\ water_level=5.0 5") + "h2o_feet,location=coyote_creek level\\ water_level=5.0 5") self._write_client.write("my-bucket", "my-org-a", - "h2o_feet,location=coyote_creek level\\ water_level=6.0 6") + "h2o_feet,location=coyote_creek level\\ water_level=6.0 6") time.sleep(1) @@ -270,11 +267,25 @@ def test_record_types(self): .pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek level\\ water_level={0}.0 {0}".format(i))) self._write_client.write("my-bucket", "my-org", _data) + # Dictionary item + _dict = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "time": 13, "fields": {"level water_level": 13.0}} + self._write_client.write("my-bucket", "my-org", _dict) + + # Dictionary list + _dict1 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "time": 14, "fields": {"level water_level": 14.0}} + _dict2 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "time": 15, "fields": {"level water_level": 15.0}} + _dict3 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "time": 16, "fields": {"level water_level": 16.0}} + self._write_client.write("my-bucket", "my-org", [_dict1, _dict2, _dict3]) + time.sleep(1) _requests = httpretty.httpretty.latest_requests - self.assertEqual(6, len(_requests)) + self.assertEqual(8, len(_requests)) self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n" "h2o_feet,location=coyote_creek level\\ water_level=2.0 2", _requests[0].parsed_body) @@ -288,6 +299,10 @@ def test_record_types(self): "h2o_feet,location=coyote_creek level\\ water_level=10.0 10", _requests[4].parsed_body) self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=11.0 11\n" "h2o_feet,location=coyote_creek level\\ water_level=12.0 12", _requests[5].parsed_body) + self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=13.0 13\n" + "h2o_feet,location=coyote_creek level\\ water_level=14.0 14", _requests[6].parsed_body) + self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=15.0 15\n" + "h2o_feet,location=coyote_creek level\\ water_level=16.0 16", _requests[7].parsed_body) pass From 44b71b1930b07781f971542dd4abdbebb655ac7f Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 15 Oct 2019 10:09:31 +0200 Subject: [PATCH 2/2] feat: Add support to write by dictionary (#24) --- README.rst | 3 ++- tests/test_WriteApi.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index d4baa58e..ba9d4261 100644 --- a/README.rst +++ b/README.rst @@ -153,7 +153,8 @@ The data should be passed as a `InfluxDB Line Protocol `__ structure diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 1d6667fa..0df891be 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -151,7 +151,7 @@ def test_write_dictionary(self): self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet") self.assertEqual(result[0].records[0].get_value(), 1.0) self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek") - self.assertEqual(result[0].records[0].get(), "water_level") + self.assertEqual(result[0].records[0].get_field(), "water_level") self.assertEqual(result[0].records[0].get_time(), datetime.datetime(2009, 11, 10, 23, 0, tzinfo=datetime.timezone.utc))