Skip to content

Commit 8c8ab27

Browse files
committed
docs: add an example: How to use RxPY to prepare batches by maximum bytes count.
1 parent 1380242 commit 8c8ab27

File tree

2 files changed

+117
-0
lines changed

2 files changed

+117
-0
lines changed

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB
1010
- [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events
1111
- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_)
12+
- [write_batching_by_bytes_count.py](write_batching_by_bytes_count.py) - How to use RxPY to prepare batches by maximum bytes count.
1213

1314
## Queries
1415
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
"""
2+
How to use RxPY to prepare batches by maximum bytes count.
3+
"""
4+
5+
from csv import DictReader
6+
from functools import reduce
7+
from typing import Collection
8+
9+
import rx
10+
from rx import operators as ops, Observable
11+
12+
from influxdb_client import InfluxDBClient, Point
13+
from influxdb_client.client.write.retry import WritesRetry
14+
from influxdb_client.client.write_api import SYNCHRONOUS
15+
16+
17+
def csv_to_generator(csv_file_path):
18+
"""
19+
Parse your CSV file into generator
20+
"""
21+
for row in DictReader(open(csv_file_path, 'r')):
22+
point = Point('financial-analysis') \
23+
.tag('type', 'vix-daily') \
24+
.field('open', float(row['VIX Open'])) \
25+
.field('high', float(row['VIX High'])) \
26+
.field('low', float(row['VIX Low'])) \
27+
.field('close', float(row['VIX Close'])) \
28+
.time(row['Date'])
29+
yield point
30+
31+
32+
def _buffer_bytes_size(buffer: Collection['bytes']):
33+
"""
34+
Calculate size of buffer
35+
"""
36+
return reduce(lambda total, actual: total + actual, map(lambda x: len(x), buffer)) + (
37+
len(buffer))
38+
39+
40+
def buffer_by_bytes_count(bytes_count: int = 5120):
41+
"""
42+
Buffer emitted items until the byte count is reached.
43+
"""
44+
45+
def _buffer_by_bytes_count(source: Observable) -> Observable:
46+
def subscribe(observer, scheduler=None):
47+
observer.buffer = []
48+
49+
def on_next(current):
50+
observer.buffer.append(current)
51+
# Emit new batch if the buffer size is greater then boundary
52+
if (_buffer_bytes_size(observer.buffer) + len(current)) >= bytes_count:
53+
# emit batch
54+
observer.on_next(observer.buffer)
55+
observer.buffer = []
56+
57+
def on_error(exception):
58+
observer.buffer = []
59+
observer.on_error(exception)
60+
61+
def on_completed():
62+
if len(observer.buffer) >= 0:
63+
# flush rest of buffer
64+
observer.on_next(observer.buffer)
65+
observer.buffer = []
66+
observer.on_completed()
67+
68+
return source.subscribe(
69+
on_next,
70+
on_error,
71+
on_completed,
72+
scheduler)
73+
74+
return Observable(subscribe)
75+
76+
return _buffer_by_bytes_count
77+
78+
79+
"""
80+
Define Retry strategy - 3 attempts => 2, 4, 8
81+
"""
82+
retries = WritesRetry(total=3, retry_interval=1, exponential_base=2)
83+
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries) as client:
84+
"""
85+
Use synchronous version of WriteApi to strongly depends on result of write
86+
"""
87+
write_api = client.write_api(write_options=SYNCHRONOUS)
88+
89+
"""
90+
Prepare batches from generator:
91+
1. Map Point into LineProtocol
92+
2. Map LineProtocol into bytes
93+
3. Create batches by bytes count - 5120 - 5KiB
94+
"""
95+
batches = rx \
96+
.from_iterable(csv_to_generator('vix-daily.csv')) \
97+
.pipe(ops.map(lambda point: point.to_line_protocol())) \
98+
.pipe(ops.map(lambda line_protocol: line_protocol.encode("utf-8"))) \
99+
.pipe(buffer_by_bytes_count(bytes_count=5120))
100+
101+
102+
def write_batch(batch):
103+
"""
104+
Synchronous write
105+
"""
106+
print(f'Writing batch...')
107+
write_api.write(bucket='my-bucket', record=batch)
108+
print(f' > {_buffer_bytes_size(batch)} bytes')
109+
110+
111+
"""
112+
Write batches
113+
"""
114+
batches.subscribe(on_next=lambda batch: write_batch(batch),
115+
on_error=lambda ex: print(f'Unexpected error: {ex}'),
116+
on_completed=lambda: print('Import finished!'))

0 commit comments

Comments
 (0)