Skip to content

Commit 110cc51

Browse files
committed
feat: Add support to write by byte array (#26)
1 parent 95ad5ec commit 110cc51

File tree

4 files changed

+110
-33
lines changed

4 files changed

+110
-33
lines changed

README.rst

+9-2
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ The data should be passed as a `InfluxDB Line Protocol <https://docs.influxdata.
156156
The data could be written as
157157
""""""""""""""""""""""""""""
158158

159-
1. ``string`` that is formatted as a InfluxDB's line protocol
159+
1. ``string`` or ``bytes`` that is formatted as a InfluxDB's line protocol
160160
2. `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16>`__ structure
161161
3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time``
162162
4. List of above items
@@ -205,12 +205,19 @@ The batching is configurable by ``write_options``\ :
205205
retry_interval=5_000))
206206
207207
"""
208-
Write Line Protocol
208+
Write Line Protocol formatted as string
209209
"""
210210
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
211211
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
212212
"h2o_feet,location=coyote_creek water_level=3.0 3"])
213213
214+
"""
215+
Write Line Protocol formatted as byte array
216+
"""
217+
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode())
218+
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),
219+
"h2o_feet,location=coyote_creek water_level=3.0 3".encode()])
220+
214221
"""
215222
Write Dictionary-style object
216223
"""

influxdb_client/client/write_api.py

+30-27
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def __str__(self) -> str:
9898

9999

100100
def _body_reduce(batch_items):
101-
return '\n'.join(map(lambda batch_item: batch_item.data, batch_items))
101+
return b'\n'.join(map(lambda batch_item: batch_item.data, batch_items))
102102

103103

104104
def _create_batch(group: GroupedObservable):
@@ -144,7 +144,8 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
144144
self._disposable = None
145145

146146
def write(self, bucket: str, org: str,
147-
record: Union[str, List['str'], Point, List['Point'], dict, List['dict'], Observable],
147+
record: Union[
148+
str, List['str'], Point, List['Point'], dict, List['dict'], bytes, List['bytes'], Observable],
148149
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION) -> None:
149150
"""
150151
Writes time-series data into influxdb.
@@ -159,27 +160,7 @@ def write(self, bucket: str, org: str,
159160
if self._write_options.write_type is WriteType.batching:
160161
return self._write_batching(bucket, org, record, write_precision)
161162

162-
final_string = ''
163-
164-
if isinstance(record, str):
165-
final_string = record
166-
167-
if isinstance(record, Point):
168-
final_string = record.to_line_protocol()
169-
170-
if isinstance(record, dict):
171-
final_string = Point.from_dict(record, write_precision=write_precision).to_line_protocol()
172-
173-
if isinstance(record, list):
174-
lines = []
175-
for item in record:
176-
if isinstance(item, str):
177-
lines.append(item)
178-
if isinstance(item, Point):
179-
lines.append(item.to_line_protocol())
180-
if isinstance(item, dict):
181-
lines.append(Point.from_dict(item, write_precision=write_precision).to_line_protocol())
182-
final_string = '\n'.join(lines)
163+
final_string = self._serialize(record, write_precision)
183164

184165
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
185166

@@ -203,13 +184,35 @@ def __del__(self):
203184
self._disposable = None
204185
pass
205186

187+
def _serialize(self, record, write_precision) -> bytes:
188+
_result = b''
189+
if isinstance(record, bytes):
190+
_result = record
191+
192+
elif isinstance(record, str):
193+
_result = record.encode("utf-8")
194+
195+
elif isinstance(record, Point):
196+
_result = self._serialize(record.to_line_protocol(), write_precision=write_precision)
197+
198+
elif isinstance(record, dict):
199+
_result = self._serialize(Point.from_dict(record, write_precision=write_precision),
200+
write_precision=write_precision)
201+
elif isinstance(record, list):
202+
_result = b'\n'.join([self._serialize(item, write_precision=write_precision) for item in record])
203+
204+
return _result
205+
206206
def _write_batching(self, bucket, org, data, precision=DEFAULT_WRITE_PRECISION):
207207
_key = _BatchItemKey(bucket, org, precision)
208-
if isinstance(data, str):
208+
if isinstance(data, bytes):
209209
self._subject.on_next(_BatchItem(key=_key, data=data))
210210

211+
elif isinstance(data, str):
212+
self._write_batching(bucket, org, data.encode("utf-8"), precision)
213+
211214
elif isinstance(data, Point):
212-
self._subject.on_next(_BatchItem(key=_key, data=data.to_line_protocol()))
215+
self._write_batching(bucket, org, data.to_line_protocol(), precision)
213216

214217
elif isinstance(data, dict):
215218
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), precision)
@@ -234,7 +237,7 @@ def _http(self, batch_item: _BatchItem):
234237
return _BatchResponse(data=batch_item)
235238

236239
def _post_write(self, _async_req, bucket, org, body, precision):
237-
return self._write_service.post_write(org=org, bucket=bucket, body=body.encode("utf-8"), precision=precision,
240+
return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
238241
async_req=_async_req, content_encoding="identity",
239242
content_type="text/plain; charset=utf-8")
240243

@@ -272,4 +275,4 @@ def _on_error(ex):
272275

273276
def _on_complete(self):
274277
self._disposable.dispose()
275-
logger.info("the batching processor was dispose")
278+
logger.info("the batching processor was disposed")

tests/test_WriteApi.py

+58
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,26 @@ def test_write_dictionary(self):
157157

158158
self.delete_test_bucket(_bucket)
159159

160+
def test_write_bytes(self):
161+
_bucket = self.create_test_bucket()
162+
_bytes = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1".encode("utf-8")
163+
164+
self.write_client.write(_bucket.name, self.org, _bytes)
165+
self.write_client.flush()
166+
167+
result = self.query_api.query(
168+
"from(bucket:\"" + _bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z) |> last()", self.org)
169+
170+
self.assertEqual(len(result), 1)
171+
self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet")
172+
self.assertEqual(result[0].records[0].get_value(), 1.0)
173+
self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek")
174+
self.assertEqual(result[0].records[0].get_field(), "level water_level")
175+
self.assertEqual(result[0].records[0].get_time(),
176+
datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))
177+
178+
self.delete_test_bucket(_bucket)
179+
160180

161181
class AsynchronousWriteTest(BaseTest):
162182

@@ -218,6 +238,44 @@ def test_write_dictionaries(self):
218238

219239
self.delete_test_bucket(bucket)
220240

241+
def test_write_bytes(self):
242+
bucket = self.create_test_bucket()
243+
244+
_bytes1 = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1".encode("utf-8")
245+
_bytes2 = "h2o_feet,location=coyote_creek level\\ water_level=2.0 2".encode("utf-8")
246+
247+
_bytes_list = [_bytes1, _bytes2]
248+
249+
self.write_client.write(bucket.name, self.org, _bytes_list, write_precision=WritePrecision.S)
250+
time.sleep(1)
251+
252+
query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
253+
print(query)
254+
255+
flux_result = self.client.query_api().query(query)
256+
257+
self.assertEqual(1, len(flux_result))
258+
259+
records = flux_result[0].records
260+
261+
self.assertEqual(2, len(records))
262+
263+
self.assertEqual("h2o_feet", records[0].get_measurement())
264+
self.assertEqual(1, records[0].get_value())
265+
self.assertEqual("level water_level", records[0].get_field())
266+
self.assertEqual("coyote_creek", records[0].values.get('location'))
267+
self.assertEqual(records[0].get_time(),
268+
datetime.datetime(1970, 1, 1, 0, 0, 1, tzinfo=datetime.timezone.utc))
269+
270+
self.assertEqual("h2o_feet", records[1].get_measurement())
271+
self.assertEqual(2, records[1].get_value())
272+
self.assertEqual("level water_level", records[1].get_field())
273+
self.assertEqual("coyote_creek", records[1].values.get('location'))
274+
self.assertEqual(records[1].get_time(),
275+
datetime.datetime(1970, 1, 1, 0, 0, 2, tzinfo=datetime.timezone.utc))
276+
277+
self.delete_test_bucket(bucket)
278+
221279

222280
if __name__ == '__main__':
223281
unittest.main()

tests/test_WriteApiBatching.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -277,15 +277,22 @@ def test_record_types(self):
277277
"time": 14, "fields": {"level water_level": 14.0}}
278278
_dict2 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
279279
"time": 15, "fields": {"level water_level": 15.0}}
280-
_dict3 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
281-
"time": 16, "fields": {"level water_level": 16.0}}
282-
self._write_client.write("my-bucket", "my-org", [_dict1, _dict2, _dict3])
280+
self._write_client.write("my-bucket", "my-org", [_dict1, _dict2])
281+
282+
# Bytes item
283+
_bytes = "h2o_feet,location=coyote_creek level\\ water_level=16.0 16".encode("utf-8")
284+
self._write_client.write("my-bucket", "my-org", _bytes)
285+
286+
# Bytes list
287+
_bytes1 = "h2o_feet,location=coyote_creek level\\ water_level=17.0 17".encode("utf-8")
288+
_bytes2 = "h2o_feet,location=coyote_creek level\\ water_level=18.0 18".encode("utf-8")
289+
self._write_client.write("my-bucket", "my-org", [_bytes1, _bytes2])
283290

284291
time.sleep(1)
285292

286293
_requests = httpretty.httpretty.latest_requests
287294

288-
self.assertEqual(8, len(_requests))
295+
self.assertEqual(9, len(_requests))
289296

290297
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n"
291298
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2", _requests[0].parsed_body)
@@ -303,6 +310,8 @@ def test_record_types(self):
303310
"h2o_feet,location=coyote_creek level\\ water_level=14.0 14", _requests[6].parsed_body)
304311
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=15.0 15\n"
305312
"h2o_feet,location=coyote_creek level\\ water_level=16.0 16", _requests[7].parsed_body)
313+
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=17.0 17\n"
314+
"h2o_feet,location=coyote_creek level\\ water_level=18.0 18", _requests[8].parsed_body)
306315

307316
pass
308317

0 commit comments

Comments
 (0)