Skip to content

Commit 039c35a

Browse files
committed
docs: add asynchronous batching
1 parent c2efbb3 commit 039c35a

File tree

3 files changed

+71
-0
lines changed

3 files changed

+71
-0
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ jobs:
137137
python examples/write_api_callbacks.py
138138
python examples/asynchronous.py
139139
python examples/asynchronous_management.py
140+
python examples/asynchronous_batching.py
140141
check-sphinx:
141142
docker:
142143
- image: *default-python

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@
3333
## Asynchronous
3434
- [asynchronous.py](asynchronous.py) - How to use Asyncio with InfluxDB client
3535
- [asynchronous_management.py](asynchronous_management.py) - How to use asynchronous Management API
36+
- [asynchronous_batching.py](asynchronous_batching.py) - HHow to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for Asyncio client
3637

examples/asynchronous_batching.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""
2+
How to use RxPY to prepare batches for asyncio client.
3+
"""
4+
import asyncio
5+
from csv import DictReader
6+
7+
import rx
8+
from rx import operators as ops
9+
from rx.scheduler.eventloop import AsyncIOScheduler
10+
11+
from influxdb_client import Point
12+
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
13+
14+
15+
def csv_to_generator(csv_file_path):
16+
"""
17+
Parse your CSV file into generator
18+
"""
19+
for row in DictReader(open(csv_file_path, 'r')):
20+
point = Point('financial-analysis') \
21+
.tag('type', 'vix-daily') \
22+
.field('open', float(row['VIX Open'])) \
23+
.field('high', float(row['VIX High'])) \
24+
.field('low', float(row['VIX Low'])) \
25+
.field('close', float(row['VIX Close'])) \
26+
.time(row['Date'])
27+
yield point
28+
29+
30+
async def main():
31+
async with InfluxDBClientAsync(url='http://localhost:8086', token='my-token', org='my-org') as client:
32+
write_api = client.write_api()
33+
34+
"""
35+
Async write
36+
"""
37+
38+
async def async_write(batch):
39+
"""
40+
Prepare async task
41+
"""
42+
await write_api.write(bucket='my-bucket', record=batch)
43+
return batch
44+
45+
"""
46+
Prepare batches from generator
47+
"""
48+
batches = rx \
49+
.from_iterable(csv_to_generator('vix-daily.csv')) \
50+
.pipe(ops.buffer_with_count(500)) \
51+
.pipe(ops.map(lambda batch: rx.from_future(asyncio.ensure_future(async_write(batch)))), ops.merge_all())
52+
53+
done = asyncio.Future()
54+
55+
"""
56+
Write batches by subscribing to Rx generator
57+
"""
58+
batches.subscribe(on_next=lambda batch: print(f'Written batch... {len(batch)}'),
59+
on_error=lambda ex: print(f'Unexpected error: {ex}'),
60+
on_completed=lambda: done.set_result(0),
61+
scheduler=AsyncIOScheduler(asyncio.get_event_loop()))
62+
"""
63+
Wait to finish all writes
64+
"""
65+
await done
66+
67+
68+
if __name__ == "__main__":
69+
asyncio.run(main())

0 commit comments

Comments
 (0)