Skip to content

Commit 33d98b7

Browse files
authored
fix: writes uses precision from Point instead a default precision (#108)
1 parent 1bdf569 commit 33d98b7

File tree

7 files changed

+116
-30
lines changed

7 files changed

+116
-30
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,5 +113,5 @@ sandbox
113113

114114
# OpenAPI-generator
115115
/.openapi-generator*
116-
/tests/writer.pickle
116+
**/writer.pickle
117117
/tests/data_frame_file.csv

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
### Bug Fixes
77
1. [#105](https://github.com/influxdata/influxdb-client-python/pull/105): Fixed mapping dictionary without timestamp and tags into LineProtocol
8+
1. [#108](https://github.com/influxdata/influxdb-client-python/pull/108): The WriteApi uses precision from Point instead a default precision
89

910
## 1.7.0 [2020-05-15]
1011

influxdb_client/client/write/point.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ def to_line_protocol(self):
8484

8585
return f"{_measurement}{_tags}{_fields}{_time}"
8686

87+
@property
88+
def write_precision(self):
89+
return self._write_precision
90+
8791

8892
def _append_tags(tags):
8993
_return = []

influxdb_client/client/write_api.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
# coding: utf-8
22
import logging
33
import os
4-
import re
4+
from collections import defaultdict
55
from datetime import timedelta
66
from enum import Enum
7-
from functools import reduce
8-
from itertools import chain
97
from random import random
108
from time import sleep
11-
from typing import Union, List
9+
from typing import Union, List, Any
1210

1311
import rx
1412
from rx import operators as ops, Observable
@@ -186,13 +184,13 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
186184
def write(self, bucket: str, org: str = None,
187185
record: Union[
188186
str, List['str'], Point, List['Point'], dict, List['dict'], bytes, List['bytes'], Observable] = None,
189-
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> None:
187+
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any:
190188
"""
191189
Writes time-series data into influxdb.
192190
193191
:param str org: specifies the destination organization for writes; take either the ID or Name interchangeably; if both orgID and org are specified, org takes precedence. (required)
194192
:param str bucket: specifies the destination bucket for writes (required)
195-
:param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol
193+
:param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
196194
:param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write
197195
:param data_frame_measurement_name: name of measurement for writing Pandas DataFrame
198196
:param data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
@@ -217,11 +215,21 @@ def write(self, bucket: str, org: str = None,
217215
return self._write_batching(bucket, org, record,
218216
write_precision, **kwargs)
219217

220-
final_string = self._serialize(record, write_precision, **kwargs)
218+
payloads = defaultdict(list)
219+
self._serialize(record, write_precision, payloads, **kwargs)
221220

222221
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
223222

224-
return self._post_write(_async_req, bucket, org, final_string, write_precision)
223+
def write_payload(payload):
224+
final_string = b'\n'.join(payload[1])
225+
return self._post_write(_async_req, bucket, org, final_string, payload[0])
226+
227+
results = list(map(write_payload, payloads.items()))
228+
if not _async_req:
229+
return None
230+
elif len(results) == 1:
231+
return results[0]
232+
return results
225233

226234
def flush(self):
227235
# TODO
@@ -241,44 +249,39 @@ def __del__(self):
241249
self._disposable = None
242250
pass
243251

244-
def _serialize(self, record, write_precision, **kwargs) -> bytes:
245-
_result = b''
252+
def _serialize(self, record, write_precision, payload, **kwargs):
246253
if isinstance(record, bytes):
247-
_result = record
254+
payload[write_precision].append(record)
248255

249256
elif isinstance(record, str):
250-
_result = record.encode("utf-8")
257+
self._serialize(record.encode("utf-8"), write_precision, payload, **kwargs)
251258

252259
elif isinstance(record, Point):
253-
_result = self._serialize(record.to_line_protocol(), write_precision, **kwargs)
260+
self._serialize(record.to_line_protocol(), record.write_precision, payload, **kwargs)
254261

255262
elif isinstance(record, dict):
256-
_result = self._serialize(Point.from_dict(record, write_precision=write_precision),
257-
write_precision, **kwargs)
263+
self._serialize(Point.from_dict(record, write_precision=write_precision), write_precision, payload, **kwargs)
258264
elif 'DataFrame' in type(record).__name__:
259265
_data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs)
260-
_result = self._serialize(_data, write_precision, **kwargs)
266+
self._serialize(_data, write_precision, payload, **kwargs)
261267

262268
elif isinstance(record, list):
263-
_result = b'\n'.join([self._serialize(item, write_precision,
264-
**kwargs) for item in record])
265-
266-
return _result
269+
for item in record:
270+
self._serialize(item, write_precision, payload, **kwargs)
267271

268272
def _write_batching(self, bucket, org, data,
269273
precision=DEFAULT_WRITE_PRECISION,
270274
**kwargs):
271-
_key = _BatchItemKey(bucket, org, precision)
272275
if isinstance(data, bytes):
276+
_key = _BatchItemKey(bucket, org, precision)
273277
self._subject.on_next(_BatchItem(key=_key, data=data))
274278

275279
elif isinstance(data, str):
276280
self._write_batching(bucket, org, data.encode("utf-8"),
277281
precision, **kwargs)
278282

279283
elif isinstance(data, Point):
280-
self._write_batching(bucket, org, data.to_line_protocol(),
281-
precision, **kwargs)
284+
self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs)
282285

283286
elif isinstance(data, dict):
284287
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision),

tests/test_WriteApi.py

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ def test_write_data_frame(self):
238238
data_frame_tag_columns=['location'])
239239

240240
result = self.query_api.query(
241-
"from(bucket:\"" + bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", self.org)
241+
f"from(bucket:\"{bucket.name}\") |> range(start: 1970-01-01T00:00:00.000000001Z)", self.org)
242242

243243
self.assertEqual(1, len(result))
244244
self.assertEqual(2, len(result[0].records))
@@ -295,7 +295,7 @@ def test_write_empty_data(self):
295295
bucket = self.create_test_bucket()
296296

297297
with self.assertRaises(ApiException) as cm:
298-
self.write_client.write(bucket.name)
298+
self.write_client.write(bucket.name, record="")
299299
exception = cm.exception
300300

301301
self.assertEqual(400, exception.status)
@@ -306,6 +306,34 @@ def test_write_empty_data(self):
306306

307307
self.assertEqual(len(result), 0)
308308

309+
def test_write_point_different_precision(self):
310+
bucket = self.create_test_bucket()
311+
312+
point1 = Point('test_precision') \
313+
.field('power', 10) \
314+
.tag('powerFlow', 'low') \
315+
.time(datetime.datetime(2020, 4, 20, 6, 30, tzinfo=datetime.timezone.utc), WritePrecision.S)
316+
317+
point2 = Point('test_precision') \
318+
.field('power', 20) \
319+
.tag('powerFlow', 'high') \
320+
.time(datetime.datetime(2020, 4, 20, 5, 30, tzinfo=datetime.timezone.utc), WritePrecision.MS)
321+
322+
writer = self.client.write_api(write_options=SYNCHRONOUS)
323+
writer.write(bucket.name, self.org, [point1, point2])
324+
325+
result = self.query_api.query(
326+
f"from(bucket:\"{bucket.name}\") |> range(start: 1970-01-01T00:00:00.000000001Z) |> last() "
327+
"|> sort(columns: [\"_time\"], desc: false)", self.org)
328+
329+
self.assertEqual(len(result), 2)
330+
self.assertEqual(len(result[0].records), 1)
331+
self.assertEqual(len(result[1].records), 1)
332+
self.assertEqual(result[0].records[0].get_time(),
333+
datetime.datetime(2020, 4, 20, 5, 30, tzinfo=datetime.timezone.utc))
334+
self.assertEqual(result[1].records[0].get_time(),
335+
datetime.datetime(2020, 4, 20, 6, 30, tzinfo=datetime.timezone.utc))
336+
309337

310338
class AsynchronousWriteTest(BaseTest):
311339

@@ -381,9 +409,9 @@ def test_use_default_tags_with_dictionaries(self):
381409
bucket = self.create_test_bucket()
382410

383411
_point1 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
384-
"time": "2009-11-10T22:00:00Z", "fields": {"water_level": 1.0}}
412+
"time": "2009-11-10T22:00:00Z", "fields": {"water_level": 1.0}}
385413
_point2 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
386-
"time": "2009-11-10T23:00:00Z", "fields": {"water_level": 2.0}}
414+
"time": "2009-11-10T23:00:00Z", "fields": {"water_level": 2.0}}
387415

388416
_point_list = [_point1, _point2]
389417

@@ -424,7 +452,7 @@ def test_use_default_tags_with_data_frame(self):
424452
columns=["location", "water_level"])
425453

426454
async_result = self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
427-
data_frame_tag_columns=['location'])
455+
data_frame_tag_columns=['location'])
428456
async_result.get()
429457

430458
query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
@@ -487,6 +515,37 @@ def test_write_bytes(self):
487515

488516
self.delete_test_bucket(bucket)
489517

518+
def test_write_point_different_precision(self):
519+
bucket = self.create_test_bucket()
520+
521+
_point1 = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 5.0).time(5,
522+
WritePrecision.S)
523+
_point2 = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 6.0).time(6,
524+
WritePrecision.US)
525+
526+
_point_list = [_point1, _point2]
527+
528+
async_results = self.write_client.write(bucket.name, self.org, _point_list)
529+
self.assertEqual(2, len(async_results))
530+
for async_result in async_results:
531+
async_result.get()
532+
533+
query = f'from(bucket:"{bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z) '\
534+
'|> sort(columns: [\"_time\"], desc: false)'
535+
536+
flux_result = self.client.query_api().query(query)
537+
538+
self.assertEqual(1, len(flux_result))
539+
540+
records = flux_result[0].records
541+
542+
self.assertEqual(2, len(records))
543+
self.assertEqual(records[0].get_time(),
544+
datetime.datetime(1970, 1, 1, 0, 0, 0, 6, tzinfo=datetime.timezone.utc))
545+
self.assertEqual(records[1].get_time(),
546+
datetime.datetime(1970, 1, 1, 0, 0, 5, tzinfo=datetime.timezone.utc))
547+
548+
490549

491550
class PointSettingTest(BaseTest):
492551

tests/test_WriteApiBatching.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,25 @@ def test_record_types(self):
315315

316316
pass
317317

318+
def test_write_point_different_precision(self):
319+
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
320+
321+
_point1 = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 5.0).time(5, WritePrecision.S)
322+
_point2 = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 6.0).time(6, WritePrecision.NS)
323+
324+
self._write_client.write("my-bucket", "my-org", [_point1, _point2])
325+
326+
time.sleep(1)
327+
328+
_requests = httpretty.httpretty.latest_requests
329+
330+
self.assertEqual(2, len(_requests))
331+
332+
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=5.0 5", _requests[0].parsed_body)
333+
self.assertEqual("s", _requests[0].querystring["precision"][0])
334+
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=6.0 6", _requests[1].parsed_body)
335+
self.assertEqual("ns", _requests[1].querystring["precision"][0])
336+
318337
def test_write_result(self):
319338
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
320339

tests/test_point.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def test_TimePrecisionDefault(self):
104104
.tag("location", "europe") \
105105
.field("level", 2)
106106

107-
self.assertEqual(WritePrecision.NS, point._write_precision)
107+
self.assertEqual(WritePrecision.NS, point.write_precision)
108108

109109
def test_TimeSpanFormatting(self):
110110
point = Point.measurement("h2o") \

0 commit comments

Comments
 (0)