Skip to content

Commit 95ad5ec

Browse files
authored
feat: Added possibility to write dictionary-style object (#25)
1 parent 4a59e91 commit 95ad5ec

File tree

6 files changed

+137
-17
lines changed

6 files changed

+137
-17
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 0.0.3 [unreleased]
22

3+
### Features
4+
1. [#24](https://github.com/influxdata/influxdb-client-python/issues/24): Added possibility to write dictionary-style object
5+
36
### API
47
1. [#21](https://github.com/bonitoo-io/influxdb-client-python/pull/21): Updated swagger to latest version
58

README.rst

+22-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ InfluxDB 2.0 client features
4545
- Writing data using
4646
- `Line Protocol <https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial>`_
4747
- `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16>`__
48-
- `RxPY`_ Observable
48+
- `RxPY <https://rxpy.readthedocs.io/en/latest/>`_ Observable
4949
- Not implemented yet
5050
- write user types using decorator
5151
- write Pandas DataFrame
@@ -151,7 +151,17 @@ Writes
151151
The `WriteApi <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write_api.py>`_ supports synchronous, asynchronous and batching writes into InfluxDB 2.0.
152152
The data should be passed as a `InfluxDB Line Protocol <https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial/>`_\ , `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py>`_ or Observable stream.
153153

154-
*The default instance of ``WriteApi`` use batching.*
154+
*The default instance of WriteApi use batching.*
155+
156+
The data could be written as
157+
""""""""""""""""""""""""""""
158+
159+
1. ``string`` that is formatted as a InfluxDB's line protocol
160+
2. `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16>`__ structure
161+
3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time``
162+
4. List of above items
163+
5. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item
164+
155165

156166
Batching
157167
""""""""
@@ -201,6 +211,16 @@ The batching is configurable by ``write_options``\ :
201211
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
202212
"h2o_feet,location=coyote_creek water_level=3.0 3"])
203213
214+
"""
215+
Write Dictionary-style object
216+
"""
217+
_write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
218+
"fields": {"water_level": 1.0}, "time": 1})
219+
_write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
220+
"fields": {"water_level": 2.0}, "time": 2},
221+
{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
222+
"fields": {"water_level": 3.0}, "time": 3}])
223+
204224
"""
205225
Write Data Point
206226
"""

influxdb_client/client/write/point.py

+11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
1414
DEFAULT_WRITE_PRECISION = WritePrecision.NS
1515

16+
1617
class Point(object):
1718
"""
1819
Point defines the values that will be written to the database.
@@ -24,6 +25,16 @@ def measurement(measurement):
2425
p = Point(measurement)
2526
return p
2627

28+
@staticmethod
29+
def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION):
30+
point = Point(dictionary['measurement'])
31+
for tag_key, tag_value in dictionary['tags'].items():
32+
point.tag(tag_key, tag_value)
33+
for field_key, field_value in dictionary['fields'].items():
34+
point.field(field_key, field_value)
35+
point.time(dictionary['time'], write_precision=write_precision)
36+
return point
37+
2738
def __init__(self, measurement_name):
2839
self._tags = {}
2940
self._fields = {}

influxdb_client/client/write_api.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
143143
self._subject = None
144144
self._disposable = None
145145

146-
def write(self, bucket: str, org: str, record: Union[str, List['str'], Point, List['Point'], Observable],
146+
def write(self, bucket: str, org: str,
147+
record: Union[str, List['str'], Point, List['Point'], dict, List['dict'], Observable],
147148
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION) -> None:
148149
"""
149150
Writes time-series data into influxdb.
@@ -166,13 +167,18 @@ def write(self, bucket: str, org: str, record: Union[str, List['str'], Point, Li
166167
if isinstance(record, Point):
167168
final_string = record.to_line_protocol()
168169

170+
if isinstance(record, dict):
171+
final_string = Point.from_dict(record, write_precision=write_precision).to_line_protocol()
172+
169173
if isinstance(record, list):
170174
lines = []
171175
for item in record:
172176
if isinstance(item, str):
173177
lines.append(item)
174178
if isinstance(item, Point):
175179
lines.append(item.to_line_protocol())
180+
if isinstance(item, dict):
181+
lines.append(Point.from_dict(item, write_precision=write_precision).to_line_protocol())
176182
final_string = '\n'.join(lines)
177183

178184
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
@@ -205,6 +211,9 @@ def _write_batching(self, bucket, org, data, precision=DEFAULT_WRITE_PRECISION):
205211
elif isinstance(data, Point):
206212
self._subject.on_next(_BatchItem(key=_key, data=data.to_line_protocol()))
207213

214+
elif isinstance(data, dict):
215+
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), precision)
216+
208217
elif isinstance(data, list):
209218
for item in data:
210219
self._write_batching(bucket, org, item, precision)

tests/test_WriteApi.py

+62
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import datetime
66
import unittest
7+
import time
78
from multiprocessing.pool import ApplyResult
89

910
from influxdb_client import Point, WritePrecision
@@ -135,6 +136,27 @@ def test_write_error(self):
135136
self.assertEqual(400, exception.status)
136137
self.assertEqual("Bad Request", exception.reason)
137138

139+
def test_write_dictionary(self):
140+
_bucket = self.create_test_bucket()
141+
_point = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
142+
"time": "2009-11-10T23:00:00Z", "fields": {"water_level": 1.0}}
143+
144+
self.write_client.write(_bucket.name, self.org, _point)
145+
self.write_client.flush()
146+
147+
result = self.query_api.query(
148+
"from(bucket:\"" + _bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z) |> last()", self.org)
149+
150+
self.assertEqual(len(result), 1)
151+
self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet")
152+
self.assertEqual(result[0].records[0].get_value(), 1.0)
153+
self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek")
154+
self.assertEqual(result[0].records[0].get_field(), "water_level")
155+
self.assertEqual(result[0].records[0].get_time(),
156+
datetime.datetime(2009, 11, 10, 23, 0, tzinfo=datetime.timezone.utc))
157+
158+
self.delete_test_bucket(_bucket)
159+
138160

139161
class AsynchronousWriteTest(BaseTest):
140162

@@ -156,6 +178,46 @@ def test_write_result(self):
156178
self.assertEqual(None, result.get())
157179
self.delete_test_bucket(_bucket)
158180

181+
def test_write_dictionaries(self):
182+
bucket = self.create_test_bucket()
183+
184+
_point1 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
185+
"time": "2009-11-10T22:00:00Z", "fields": {"water_level": 1.0}}
186+
_point2 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
187+
"time": "2009-11-10T23:00:00Z", "fields": {"water_level": 2.0}}
188+
189+
_point_list = [_point1, _point2]
190+
191+
self.write_client.write(bucket.name, self.org, _point_list)
192+
time.sleep(1)
193+
194+
query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
195+
print(query)
196+
197+
flux_result = self.client.query_api().query(query)
198+
199+
self.assertEqual(1, len(flux_result))
200+
201+
records = flux_result[0].records
202+
203+
self.assertEqual(2, len(records))
204+
205+
self.assertEqual("h2o_feet", records[0].get_measurement())
206+
self.assertEqual(1, records[0].get_value())
207+
self.assertEqual("water_level", records[0].get_field())
208+
self.assertEqual("coyote_creek", records[0].values.get('location'))
209+
self.assertEqual(records[0].get_time(),
210+
datetime.datetime(2009, 11, 10, 22, 0, tzinfo=datetime.timezone.utc))
211+
212+
self.assertEqual("h2o_feet", records[1].get_measurement())
213+
self.assertEqual(2, records[1].get_value())
214+
self.assertEqual("water_level", records[1].get_field())
215+
self.assertEqual("coyote_creek", records[1].values.get('location'))
216+
self.assertEqual(records[1].get_time(),
217+
datetime.datetime(2009, 11, 10, 23, 0, tzinfo=datetime.timezone.utc))
218+
219+
self.delete_test_bucket(bucket)
220+
159221

160222
if __name__ == '__main__':
161223
unittest.main()

tests/test_WriteApiBatching.py

+29-14
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ def setUp(self) -> None:
3333

3434
self.influxdb_client = InfluxDBClient(url=conf.host, token="my-token")
3535

36-
# self._api_client = influxdb_client.ApiClient(configuration=conf, header_name="Authorization",
37-
# header_value="Token my-token")
38-
3936
write_options = WriteOptions(batch_size=2, flush_interval=5_000, retry_interval=3_000)
4037
self._write_client = WriteApi(influxdb_client=self.influxdb_client, write_options=write_options)
4138

@@ -48,10 +45,10 @@ def test_batch_size(self):
4845
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
4946

5047
self._write_client.write("my-bucket", "my-org",
51-
["h2o_feet,location=coyote_creek level\\ water_level=1.0 1",
52-
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2",
53-
"h2o_feet,location=coyote_creek level\\ water_level=3.0 3",
54-
"h2o_feet,location=coyote_creek level\\ water_level=4.0 4"])
48+
["h2o_feet,location=coyote_creek level\\ water_level=1.0 1",
49+
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2",
50+
"h2o_feet,location=coyote_creek level\\ water_level=3.0 3",
51+
"h2o_feet,location=coyote_creek level\\ water_level=4.0 4"])
5552

5653
time.sleep(1)
5754

@@ -88,23 +85,23 @@ def test_batch_size_group_by(self):
8885
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
8986

9087
self._write_client.write("my-bucket", "my-org",
91-
"h2o_feet,location=coyote_creek level\\ water_level=1.0 1")
88+
"h2o_feet,location=coyote_creek level\\ water_level=1.0 1")
9289

9390
self._write_client.write("my-bucket", "my-org",
94-
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2",
91+
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2",
9592
write_precision=WritePrecision.S)
9693

9794
self._write_client.write("my-bucket", "my-org-a",
98-
"h2o_feet,location=coyote_creek level\\ water_level=3.0 3")
95+
"h2o_feet,location=coyote_creek level\\ water_level=3.0 3")
9996

10097
self._write_client.write("my-bucket", "my-org-a",
101-
"h2o_feet,location=coyote_creek level\\ water_level=4.0 4")
98+
"h2o_feet,location=coyote_creek level\\ water_level=4.0 4")
10299

103100
self._write_client.write("my-bucket2", "my-org-a",
104-
"h2o_feet,location=coyote_creek level\\ water_level=5.0 5")
101+
"h2o_feet,location=coyote_creek level\\ water_level=5.0 5")
105102

106103
self._write_client.write("my-bucket", "my-org-a",
107-
"h2o_feet,location=coyote_creek level\\ water_level=6.0 6")
104+
"h2o_feet,location=coyote_creek level\\ water_level=6.0 6")
108105

109106
time.sleep(1)
110107

@@ -270,11 +267,25 @@ def test_record_types(self):
270267
.pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek level\\ water_level={0}.0 {0}".format(i)))
271268
self._write_client.write("my-bucket", "my-org", _data)
272269

270+
# Dictionary item
271+
_dict = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
272+
"time": 13, "fields": {"level water_level": 13.0}}
273+
self._write_client.write("my-bucket", "my-org", _dict)
274+
275+
# Dictionary list
276+
_dict1 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
277+
"time": 14, "fields": {"level water_level": 14.0}}
278+
_dict2 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
279+
"time": 15, "fields": {"level water_level": 15.0}}
280+
_dict3 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
281+
"time": 16, "fields": {"level water_level": 16.0}}
282+
self._write_client.write("my-bucket", "my-org", [_dict1, _dict2, _dict3])
283+
273284
time.sleep(1)
274285

275286
_requests = httpretty.httpretty.latest_requests
276287

277-
self.assertEqual(6, len(_requests))
288+
self.assertEqual(8, len(_requests))
278289

279290
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n"
280291
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2", _requests[0].parsed_body)
@@ -288,6 +299,10 @@ def test_record_types(self):
288299
"h2o_feet,location=coyote_creek level\\ water_level=10.0 10", _requests[4].parsed_body)
289300
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=11.0 11\n"
290301
"h2o_feet,location=coyote_creek level\\ water_level=12.0 12", _requests[5].parsed_body)
302+
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=13.0 13\n"
303+
"h2o_feet,location=coyote_creek level\\ water_level=14.0 14", _requests[6].parsed_body)
304+
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=15.0 15\n"
305+
"h2o_feet,location=coyote_creek level\\ water_level=16.0 16", _requests[7].parsed_body)
291306

292307
pass
293308

0 commit comments

Comments
 (0)