1
1
# coding: utf-8
2
2
import logging
3
3
import os
4
- import re
4
+ from collections import defaultdict
5
5
from datetime import timedelta
6
6
from enum import Enum
7
- from functools import reduce
8
- from itertools import chain
9
7
from random import random
10
8
from time import sleep
11
- from typing import Union , List
9
+ from typing import Union , List , Any
12
10
13
11
import rx
14
12
from rx import operators as ops , Observable
@@ -186,13 +184,13 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
186
184
def write (self , bucket : str , org : str = None ,
187
185
record : Union [
188
186
str , List ['str' ], Point , List ['Point' ], dict , List ['dict' ], bytes , List ['bytes' ], Observable ] = None ,
189
- write_precision : WritePrecision = DEFAULT_WRITE_PRECISION , ** kwargs ) -> None :
187
+ write_precision : WritePrecision = DEFAULT_WRITE_PRECISION , ** kwargs ) -> Any :
190
188
"""
191
189
Writes time-series data into influxdb.
192
190
193
191
:param str org: specifies the destination organization for writes; take either the ID or Name interchangeably; if both orgID and org are specified, org takes precedence. (required)
194
192
:param str bucket: specifies the destination bucket for writes (required)
195
- :param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol
193
+ :param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
196
194
:param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write
197
195
:param data_frame_measurement_name: name of measurement for writing Pandas DataFrame
198
196
:param data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
@@ -217,11 +215,21 @@ def write(self, bucket: str, org: str = None,
217
215
return self ._write_batching (bucket , org , record ,
218
216
write_precision , ** kwargs )
219
217
220
- final_string = self ._serialize (record , write_precision , ** kwargs )
218
+ payloads = defaultdict (list )
219
+ self ._serialize (record , write_precision , payloads , ** kwargs )
221
220
222
221
_async_req = True if self ._write_options .write_type == WriteType .asynchronous else False
223
222
224
- return self ._post_write (_async_req , bucket , org , final_string , write_precision )
223
+ def write_payload (payload ):
224
+ final_string = b'\n ' .join (payload [1 ])
225
+ return self ._post_write (_async_req , bucket , org , final_string , payload [0 ])
226
+
227
+ results = list (map (write_payload , payloads .items ()))
228
+ if not _async_req :
229
+ return None
230
+ elif len (results ) == 1 :
231
+ return results [0 ]
232
+ return results
225
233
226
234
def flush (self ):
227
235
# TODO
@@ -241,44 +249,39 @@ def __del__(self):
241
249
self ._disposable = None
242
250
pass
243
251
244
- def _serialize (self , record , write_precision , ** kwargs ) -> bytes :
245
- _result = b''
252
+ def _serialize (self , record , write_precision , payload , ** kwargs ):
246
253
if isinstance (record , bytes ):
247
- _result = record
254
+ payload [ write_precision ]. append ( record )
248
255
249
256
elif isinstance (record , str ):
250
- _result = record .encode ("utf-8" )
257
+ self . _serialize ( record .encode ("utf-8" ), write_precision , payload , ** kwargs )
251
258
252
259
elif isinstance (record , Point ):
253
- _result = self ._serialize (record .to_line_protocol (), write_precision , ** kwargs )
260
+ self ._serialize (record .to_line_protocol (), record . write_precision , payload , ** kwargs )
254
261
255
262
elif isinstance (record , dict ):
256
- _result = self ._serialize (Point .from_dict (record , write_precision = write_precision ),
257
- write_precision , ** kwargs )
263
+ self ._serialize (Point .from_dict (record , write_precision = write_precision ), write_precision , payload , ** kwargs )
258
264
elif 'DataFrame' in type (record ).__name__ :
259
265
_data = self ._data_frame_to_list_of_points (record , precision = write_precision , ** kwargs )
260
- _result = self ._serialize (_data , write_precision , ** kwargs )
266
+ self ._serialize (_data , write_precision , payload , ** kwargs )
261
267
262
268
elif isinstance (record , list ):
263
- _result = b'\n ' .join ([self ._serialize (item , write_precision ,
264
- ** kwargs ) for item in record ])
265
-
266
- return _result
269
+ for item in record :
270
+ self ._serialize (item , write_precision , payload , ** kwargs )
267
271
268
272
def _write_batching (self , bucket , org , data ,
269
273
precision = DEFAULT_WRITE_PRECISION ,
270
274
** kwargs ):
271
- _key = _BatchItemKey (bucket , org , precision )
272
275
if isinstance (data , bytes ):
276
+ _key = _BatchItemKey (bucket , org , precision )
273
277
self ._subject .on_next (_BatchItem (key = _key , data = data ))
274
278
275
279
elif isinstance (data , str ):
276
280
self ._write_batching (bucket , org , data .encode ("utf-8" ),
277
281
precision , ** kwargs )
278
282
279
283
elif isinstance (data , Point ):
280
- self ._write_batching (bucket , org , data .to_line_protocol (),
281
- precision , ** kwargs )
284
+ self ._write_batching (bucket , org , data .to_line_protocol (), data .write_precision , ** kwargs )
282
285
283
286
elif isinstance (data , dict ):
284
287
self ._write_batching (bucket , org , Point .from_dict (data , write_precision = precision ),
0 commit comments