From 211c6a0c74cd20a08b34f6e5610773d7b3fd60c3 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 11 Sep 2020 09:29:04 +0200 Subject: [PATCH 1/2] feat: write supports Iterable --- influxdb_client/client/write_api.py | 14 +++++----- tests/test_WriteApi.py | 41 +++++++++++++++++++++++++++++ tests/test_WriteApiBatching.py | 9 ++++++- 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index cb97eebd..6f70ae83 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -8,7 +8,7 @@ from enum import Enum from random import random from time import sleep -from typing import Union, List, Any +from typing import Union, Any, Iterable import rx from rx import operators as ops, Observable @@ -212,7 +212,7 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions() def write(self, bucket: str, org: str = None, record: Union[ - str, List['str'], Point, List['Point'], dict, List['dict'], bytes, List['bytes'], Observable] = None, + str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'], Observable] = None, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any: """ Write time-series data into InfluxDB. @@ -291,7 +291,7 @@ def _serialize(self, record, write_precision, payload, **kwargs): _data = data_frame_to_list_of_points(record, self._point_settings, **kwargs) self._serialize(_data, write_precision, payload, **kwargs) - elif isinstance(record, list): + elif isinstance(record, Iterable): for item in record: self._serialize(item, write_precision, payload, **kwargs) @@ -317,7 +317,7 @@ def _write_batching(self, bucket, org, data, self._write_batching(bucket, org, data_frame_to_list_of_points(data, self._point_settings, **kwargs), precision, **kwargs) - elif isinstance(data, list): + elif isinstance(data, Iterable): for item in data: self._write_batching(bucket, org, item, precision, **kwargs) @@ -328,11 +328,13 @@ def _write_batching(self, bucket, org, data, return None def _append_default_tag(self, key, val, record): - if isinstance(record, Point): + if isinstance(record, bytes) or isinstance(record, str): + pass + elif isinstance(record, Point): record.tag(key, val) elif isinstance(record, dict): record.get("tags")[key] = val - elif isinstance(record, list): + elif isinstance(record, Iterable): for item in record: self._append_default_tag(key, val, item) diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index efc34409..6f781161 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -227,6 +227,47 @@ def test_write_bytes(self): self.delete_test_bucket(_bucket) + def test_write_tuple(self): + bucket = self.create_test_bucket() + + _record1 = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1" + _record2 = "h2o_feet,location=coyote_creek level\\ water_level=2.0 2" + _bytes = "h2o_feet,location=coyote_creek level\\ water_level=3.0 3".encode("utf-8") + + p = (Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 4.0).time(4)) + + tuple = (_record1, _record2, _bytes, (p, )) + + self.write_client = self.client.write_api(write_options=SYNCHRONOUS) + self.write_client.write(bucket.name, self.org, tuple) + + query = f'from(bucket:"{bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z)' + + flux_result = self.client.query_api().query(query) + + self.assertEqual(1, len(flux_result)) + + records = flux_result[0].records + + self.assertEqual(4, len(records)) + + self.assertEqual("h2o_feet", records[0].get_measurement()) + self.assertEqual(1, records[0].get_value()) + self.assertEqual("level water_level", records[0].get_field()) + + self.assertEqual("h2o_feet", records[1].get_measurement()) + self.assertEqual(2, records[1].get_value()) + self.assertEqual("level water_level", records[1].get_field()) + + self.assertEqual("h2o_feet", records[2].get_measurement()) + self.assertEqual(3, records[2].get_value()) + self.assertEqual("level water_level", records[2].get_field()) + + self.assertEqual("h2o_feet", records[3].get_measurement()) + self.assertEqual(4, records[3].get_value()) + self.assertEqual("level water_level", records[3].get_field()) + self.delete_test_bucket(bucket) + def test_write_data_frame(self): from influxdb_client.extras import pd diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index b1f61d84..0e95c2cb 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -311,11 +311,16 @@ def test_record_types(self): _bytes2 = "h2o_feet,location=coyote_creek level\\ water_level=18.0 18".encode("utf-8") self._write_client.write("my-bucket", "my-org", [_bytes1, _bytes2]) + # Tuple + _bytes3 = "h2o_feet,location=coyote_creek level\\ water_level=19.0 19".encode("utf-8") + _bytes4 = "h2o_feet,location=coyote_creek level\\ water_level=20.0 20".encode("utf-8") + self._write_client.write("my-bucket", "my-org", (_bytes3, _bytes4, )) + time.sleep(1) _requests = httpretty.httpretty.latest_requests - self.assertEqual(9, len(_requests)) + self.assertEqual(10, len(_requests)) self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n" "h2o_feet,location=coyote_creek level\\ water_level=2.0 2", _requests[0].parsed_body) @@ -335,6 +340,8 @@ def test_record_types(self): "h2o_feet,location=coyote_creek level\\ water_level=16.0 16", _requests[7].parsed_body) self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=17.0 17\n" "h2o_feet,location=coyote_creek level\\ water_level=18.0 18", _requests[8].parsed_body) + self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=19.0 19\n" + "h2o_feet,location=coyote_creek level\\ water_level=20.0 20", _requests[9].parsed_body) pass From 86a7fb0b8e3e4bc8fa3bb91b7faecf496790c586 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 11 Sep 2020 09:33:02 +0200 Subject: [PATCH 2/2] docs: CHANGELOG.md updated --- CHANGELOG.md | 5 ++++- influxdb_client/client/write_api.py | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 154678e0..bba19fb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,10 @@ ## 1.11.0 [unreleased] +### Features +1. [#152](https://github.com/influxdata/influxdb-client-python/pull/152): WriteApi supports generic Iterable type + ### API - 1. [#151](https://github.com/influxdata/influxdb-client-python/pull/151): Default port changed from 9999 -> 8086 +1. [#151](https://github.com/influxdata/influxdb-client-python/pull/151): Default port changed from 9999 -> 8086 ## 1.10.0 [2020-08-14] diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 6f70ae83..55fa1fe6 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -212,7 +212,8 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions() def write(self, bucket: str, org: str = None, record: Union[ - str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'], Observable] = None, + str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'], + Observable] = None, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any: """ Write time-series data into InfluxDB.