Skip to content

Commit 709b3eb

Browse files
authored
feat: client performance (#44)
* fix: optimize serialization to LineProtocol * fix: add reasonable arguments for import data * doc: clarified how to use client for import large amount of data * example: Add example how to stream data into InfluxDB
1 parent fdec77d commit 709b3eb

File tree

8 files changed

+323
-100
lines changed

8 files changed

+323
-100
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.2.0 [unreleased]
22

3+
### Features
4+
1. [#44](https://github.com/influxdata/influxdb-client-python/pull/44): Optimized serialization into LineProtocol, Clarified how to use client for import large amount of data
5+
36
### API
47
1. [#42](https://github.com/influxdata/influxdb-client-python/pull/42): Updated swagger to latest version
58

README.rst

+18-4
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,8 @@ Examples
428428
How to efficiently import large dataset
429429
"""""""""""""""""""""""""""""""""""""""
430430

431+
The following example shows how to import dataset with dozen megabytes.
432+
If you would like to import gigabytes of data then use our multiprocessing example: `import_data_set_multiprocessing.py <https://github.com/influxdata/influxdb-client-python/blob/master/examples/import_data_set_multiprocessing.py>`_ for use a full capability of your hardware.
431433

432434
* sources - `import_data_set.py <https://github.com/influxdata/influxdb-client-python/blob/master/examples/import_data_set.py>`_
433435

@@ -441,7 +443,6 @@ How to efficiently import large dataset
441443
442444
from collections import OrderedDict
443445
from csv import DictReader
444-
from datetime import datetime
445446
446447
import rx
447448
from rx import operators as ops
@@ -466,13 +467,26 @@ How to efficiently import large dataset
466467
:param row: the row of CSV file
467468
:return: Parsed csv row to [Point]
468469
"""
470+
471+
"""
472+
For better performance is sometimes useful directly create a LineProtocol to avoid unnecessary escaping overhead:
473+
"""
474+
# from pytz import UTC
475+
# import ciso8601
476+
# from influxdb_client.client.write.point import EPOCH
477+
#
478+
# time = (UTC.localize(ciso8601.parse_datetime(row["Date"])) - EPOCH).total_seconds() * 1e9
479+
# return f"financial-analysis,type=vix-daily" \
480+
# f" close={float(row['VIX Close'])},high={float(row['VIX High'])},low={float(row['VIX Low'])},open={float(row['VIX Open'])} " \
481+
# f" {int(time)}"
482+
469483
return Point("financial-analysis") \
470484
.tag("type", "vix-daily") \
471485
.field("open", float(row['VIX Open'])) \
472486
.field("high", float(row['VIX High'])) \
473487
.field("low", float(row['VIX Low'])) \
474488
.field("close", float(row['VIX Close'])) \
475-
.time(datetime.strptime(row['Date'], '%Y-%m-%d'))
489+
.time(row['Date'])
476490
477491
478492
"""
@@ -485,9 +499,9 @@ How to efficiently import large dataset
485499
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=True)
486500
487501
"""
488-
Create client that writes data in batches with 500 items.
502+
Create client that writes data in batches with 50_000 items.
489503
"""
490-
write_api = client.write_api(write_options=WriteOptions(batch_size=500, jitter_interval=1_000))
504+
write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))
491505
492506
"""
493507
Write data into InfluxDB

examples/import_data_set.py

+16-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
from collections import OrderedDict
88
from csv import DictReader
9-
from datetime import datetime
109

1110
import rx
1211
from rx import operators as ops
@@ -32,13 +31,26 @@ def parse_row(row: OrderedDict):
3231
:param row: the row of CSV file
3332
:return: Parsed csv row to [Point]
3433
"""
34+
35+
"""
36+
For better performance is sometimes useful directly create a LineProtocol to avoid unnecessary escaping overhead:
37+
"""
38+
# from pytz import UTC
39+
# import ciso8601
40+
# from influxdb_client.client.write.point import EPOCH
41+
#
42+
# time = (UTC.localize(ciso8601.parse_datetime(row["Date"])) - EPOCH).total_seconds() * 1e9
43+
# return f"financial-analysis,type=vix-daily" \
44+
# f" close={float(row['VIX Close'])},high={float(row['VIX High'])},low={float(row['VIX Low'])},open={float(row['VIX Open'])} " \
45+
# f" {int(time)}"
46+
3547
return Point("financial-analysis") \
3648
.tag("type", "vix-daily") \
3749
.field("open", float(row['VIX Open'])) \
3850
.field("high", float(row['VIX High'])) \
3951
.field("low", float(row['VIX Low'])) \
4052
.field("close", float(row['VIX Close'])) \
41-
.time(datetime.strptime(row['Date'], '%Y-%m-%d'))
53+
.time(row['Date'])
4254

4355

4456
"""
@@ -51,9 +63,9 @@ def parse_row(row: OrderedDict):
5163
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=True)
5264

5365
"""
54-
Create client that writes data in batches with 500 items.
66+
Create client that writes data in batches with 50_000 items.
5567
"""
56-
write_api = client.write_api(write_options=WriteOptions(batch_size=500, jitter_interval=1_000))
68+
write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))
5769

5870
"""
5971
Write data into InfluxDB
+219
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
"""
2+
Import public NYC taxi and for-hire vehicle (Uber, Lyft, etc.) trip data into InfluxDB 2.0
3+
4+
https://github.com/toddwschneider/nyc-taxi-data
5+
"""
6+
import concurrent.futures
7+
import io
8+
import multiprocessing
9+
from collections import OrderedDict
10+
from csv import DictReader
11+
from datetime import datetime
12+
from multiprocessing import Value
13+
from urllib.request import urlopen
14+
15+
import rx
16+
from rx import operators as ops
17+
18+
from influxdb_client import Point, InfluxDBClient, WriteOptions
19+
from influxdb_client.client.write_api import WriteType
20+
21+
22+
class ProgressTextIOWrapper(io.TextIOWrapper):
23+
"""
24+
TextIOWrapper that store progress of read.
25+
"""
26+
def __init__(self, *args, **kwargs):
27+
io.TextIOWrapper.__init__(self, *args, **kwargs)
28+
self.progress = None
29+
pass
30+
31+
def readline(self, *args, **kwarg) -> str:
32+
readline = super().readline(*args, **kwarg)
33+
self.progress.value += len(readline)
34+
return readline
35+
36+
37+
class InfluxDBWriter(multiprocessing.Process):
38+
"""
39+
Writer that writes data in batches with 50_000 items.
40+
"""
41+
def __init__(self, queue):
42+
multiprocessing.Process.__init__(self)
43+
self.queue = queue
44+
self.client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=False)
45+
self.write_api = self.client.write_api(
46+
write_options=WriteOptions(write_type=WriteType.batching, batch_size=50_000, flush_interval=10_000))
47+
48+
def run(self):
49+
while True:
50+
next_task = self.queue.get()
51+
if next_task is None:
52+
# Poison pill means terminate
53+
self.terminate()
54+
self.queue.task_done()
55+
break
56+
self.write_api.write(org="my-org", bucket="my-bucket", record=next_task)
57+
self.queue.task_done()
58+
59+
def terminate(self) -> None:
60+
proc_name = self.name
61+
print()
62+
print('Writer: flushing data...')
63+
self.write_api.__del__()
64+
self.client.__del__()
65+
print('Writer: closed'.format(proc_name))
66+
67+
68+
def parse_row(row: OrderedDict):
69+
"""Parse row of CSV file into Point with structure:
70+
71+
taxi-trip-data,DOLocationID=152,PULocationID=79,dispatching_base_num=B02510 dropoff_datetime="2019-01-01 01:27:24" 1546304267000000000
72+
73+
CSV format:
74+
dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
75+
B00001,2019-01-01 00:30:00,2019-01-01 02:51:55,,,
76+
B00001,2019-01-01 00:45:00,2019-01-01 00:54:49,,,
77+
B00001,2019-01-01 00:15:00,2019-01-01 00:54:52,,,
78+
B00008,2019-01-01 00:19:00,2019-01-01 00:39:00,,,
79+
B00008,2019-01-01 00:27:00,2019-01-01 00:37:00,,,
80+
B00008,2019-01-01 00:48:00,2019-01-01 01:02:00,,,
81+
B00008,2019-01-01 00:50:00,2019-01-01 00:59:00,,,
82+
B00008,2019-01-01 00:51:00,2019-01-01 00:56:00,,,
83+
B00009,2019-01-01 00:44:00,2019-01-01 00:58:00,,,
84+
B00009,2019-01-01 00:19:00,2019-01-01 00:36:00,,,
85+
B00009,2019-01-01 00:36:00,2019-01-01 00:49:00,,,
86+
B00009,2019-01-01 00:26:00,2019-01-01 00:32:00,,,
87+
...
88+
89+
:param row: the row of CSV file
90+
:return: Parsed csv row to [Point]
91+
"""
92+
93+
return Point("taxi-trip-data") \
94+
.tag("dispatching_base_num", row['dispatching_base_num']) \
95+
.tag("PULocationID", row['PULocationID']) \
96+
.tag("DOLocationID", row['DOLocationID']) \
97+
.tag("SR_Flag", row['SR_Flag']) \
98+
.field("dropoff_datetime", row['dropoff_datetime']) \
99+
.time(row['pickup_datetime']) \
100+
.to_line_protocol()
101+
102+
103+
def parse_rows(rows, total_size):
104+
"""
105+
Parse bunch of CSV rows into LineProtocol
106+
107+
:param total_size: Total size of file
108+
:param rows: CSV rows
109+
:return: List of line protocols
110+
"""
111+
_parsed_rows = list(map(parse_row, rows))
112+
113+
counter_.value += len(_parsed_rows)
114+
if counter_.value % 10_000 == 0:
115+
print('{0:8}{1}'.format(counter_.value, ' - {0:.2f} %'
116+
.format(100 * float(progress_.value) / float(int(total_size))) if total_size else ""))
117+
pass
118+
119+
queue_.put(_parsed_rows)
120+
return None
121+
122+
123+
def init_counter(counter, progress, queue):
124+
"""
125+
Initialize shared counter for display progress
126+
"""
127+
global counter_
128+
counter_ = counter
129+
global progress_
130+
progress_ = progress
131+
global queue_
132+
queue_ = queue
133+
134+
135+
"""
136+
Create multiprocess shared environment
137+
"""
138+
queue_ = multiprocessing.Manager().Queue()
139+
counter_ = Value('i', 0)
140+
progress_ = Value('i', 0)
141+
startTime = datetime.now()
142+
143+
url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv"
144+
# url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv"
145+
146+
"""
147+
Open URL and for stream data
148+
"""
149+
response = urlopen(url)
150+
if response.headers:
151+
content_length = response.headers['Content-length']
152+
io_wrapper = ProgressTextIOWrapper(response)
153+
io_wrapper.progress = progress_
154+
155+
"""
156+
Start writer as a new process
157+
"""
158+
writer = InfluxDBWriter(queue_)
159+
writer.start()
160+
161+
"""
162+
Create process pool for parallel encoding into LineProtocol
163+
"""
164+
cpu_count = multiprocessing.cpu_count()
165+
with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter,
166+
initargs=(counter_, progress_, queue_)) as executor:
167+
"""
168+
Converts incoming HTTP stream into sequence of LineProtocol
169+
"""
170+
data = rx \
171+
.from_iterable(DictReader(io_wrapper)) \
172+
.pipe(ops.buffer_with_count(10_000),
173+
# Parse 10_000 rows into LineProtocol on subprocess
174+
ops.flat_map(lambda rows: executor.submit(parse_rows, rows, content_length)))
175+
176+
"""
177+
Write data into InfluxDB
178+
"""
179+
data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}'))
180+
181+
"""
182+
Terminate Writer
183+
"""
184+
queue_.put(None)
185+
queue_.join()
186+
187+
print()
188+
print(f'Import finished in: {datetime.now() - startTime}')
189+
print()
190+
191+
"""
192+
Querying 10 pickups from dispatching 'B00008'
193+
"""
194+
query = 'from(bucket:"my-bucket")' \
195+
'|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \
196+
'|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \
197+
'|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \
198+
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
199+
'|> rename(columns: {_time: "pickup_datetime"})' \
200+
'|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)'
201+
202+
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=False)
203+
result = client.query_api().query(org="my-org", query=query)
204+
205+
"""
206+
Processing results
207+
"""
208+
print()
209+
print("=== Querying 10 pickups from dispatching 'B00008' ===")
210+
print()
211+
for table in result:
212+
for record in table.records:
213+
print(
214+
f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}')
215+
216+
"""
217+
Close client
218+
"""
219+
client.__del__()

0 commit comments

Comments
 (0)