Skip to content

Commit 4f1e14e

Browse files
authored
docs: added an example how to use RxPY and sync batching (#202)
1 parent 75f4579 commit 4f1e14e

File tree

5 files changed

+105
-12
lines changed

5 files changed

+105
-12
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.16.0 [unreleased]
22

3+
### Documentation
4+
1. [#202](https://github.com/influxdata/influxdb-client-python/pull/202): Added an example how to use RxPY and sync batching
5+
36
## 1.15.0 [2021-03-05]
47

58
### Bug Fixes

README.rst

+10-12
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,6 @@ The data could be written as
236236
Batching
237237
""""""""
238238

239-
.. marker-batching-start
240-
241239
The batching is configurable by ``write_options``\ :
242240

243241
.. list-table::
@@ -348,11 +346,9 @@ The batching is configurable by ``write_options``\ :
348346
_write_client.close()
349347
_client.close()
350348
351-
.. marker-batching-end
352349
353350
Default Tags
354351
""""""""""""
355-
.. marker-default-tags-start
356352

357353
Sometimes is useful to store same information in every measurement e.g. ``hostname``, ``location``, ``customer``.
358354
The client is able to use static value or env property as a tag value.
@@ -415,8 +411,6 @@ Examples:
415411
416412
self.client = InfluxDBClient.from_env_properties()
417413
418-
.. marker-default-tags-end
419-
420414
Asynchronous client
421415
"""""""""""""""""""
422416

@@ -458,6 +452,8 @@ Data are writes in a synchronous HTTP request.
458452
459453
client.close()
460454
455+
.. marker-writes-end
456+
461457
Queries
462458
^^^^^^^
463459

@@ -595,6 +591,8 @@ Output:
595591
Examples
596592
^^^^^^^^
597593

594+
.. marker-examples-start
595+
598596
How to efficiently import large dataset
599597
"""""""""""""""""""""""""""""""""""""""
600598

@@ -703,12 +701,8 @@ If you would like to import gigabytes of data then use our multiprocessing examp
703701
"""
704702
client.close()
705703
706-
.. marker-writes-end
707-
708-
709704
Efficiency write data from IOT sensor
710705
"""""""""""""""""""""""""""""""""""""
711-
.. marker-iot-start
712706

713707
* sources - `iot_sensor.py <https://github.com/influxdata/influxdb-client-python/blob/master/examples/iot_sensor.py>`_
714708

@@ -791,8 +785,6 @@ Efficiency write data from IOT sensor
791785
792786
input()
793787
794-
.. marker-iot-end
795-
796788
Connect to InfluxDB Cloud
797789
"""""""""""""""""""""""""
798790
The following example demonstrate a simplest way how to write and query date with the InfluxDB Cloud.
@@ -888,6 +880,12 @@ The second example shows how to use client capabilities to realtime visualizatio
888880

889881
.. image:: https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/realtime-result.gif
890882

883+
Other examples
884+
""""""""""""""
885+
886+
You could find all examples at GitHub: `influxdb-client-python/examples <https://github.com/influxdata/influxdb-client-python/tree/master/examples#examples>`_.
887+
888+
.. marker-examples-end
891889
892890
Advanced Usage
893891
--------------

docs/usage.rst

+6
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,9 @@ Both request header and body will be logged to standard output.
4343
.. code-block:: python
4444
4545
_client = InfluxDBClient(url="http://localhost:8086", token="my-token", debug=True, org="my-org")
46+
47+
Examples
48+
^^^^^^^^
49+
.. include:: ../README.rst
50+
:start-after: marker-examples-start
51+
:end-before: marker-examples-end

examples/README.md

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Examples
2+
3+
## Writes
4+
- [import_data_set.py](import_data_set.py) - How to import CSV file
5+
- [import_data_set_multiprocessing.py](import_data_set_multiprocessing.py) - How to large CSV file by Python Multiprocessing
6+
- [ingest_dataframe_default_tags.py](ingest_dataframe_default_tags.py) - How to ingest DataFrame with default tags
7+
- [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/)
8+
- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB
9+
10+
## Queries
11+
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`
12+
- [query_from_file.py](query_from_file.py) - How to use a Flux query defined in a separate file
13+
14+
## Others
15+
- [influx_cloud.py](influx_cloud.py) - How to connect to InfluxDB 2 Cloud
16+
- [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8
17+
- [nanosecond_precision.py](nanosecond_precision.py) - How to use nanoseconds precision
18+
+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
"""
2+
How to use RxPY to prepare batches for synchronous write into InfluxDB
3+
"""
4+
5+
from csv import DictReader
6+
7+
import rx
8+
from rx import operators as ops
9+
10+
from influxdb_client import InfluxDBClient, Point
11+
from influxdb_client.client.write.retry import WritesRetry
12+
from influxdb_client.client.write_api import SYNCHRONOUS
13+
14+
15+
def csv_to_generator(csv_file_path):
16+
"""
17+
Parse your CSV file into generator
18+
"""
19+
for row in DictReader(open(csv_file_path, 'r')):
20+
point = Point('financial-analysis') \
21+
.tag('type', 'vix-daily') \
22+
.field('open', float(row['VIX Open'])) \
23+
.field('high', float(row['VIX High'])) \
24+
.field('low', float(row['VIX Low'])) \
25+
.field('close', float(row['VIX Close'])) \
26+
.time(row['Date'])
27+
yield point
28+
29+
30+
"""
31+
Define Retry strategy - 3 attempts => 2, 4, 8
32+
"""
33+
retries = WritesRetry(total=3, backoff_factor=1, exponential_base=2)
34+
client = InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries)
35+
36+
"""
37+
Use synchronous version of WriteApi to strongly depends on result of write
38+
"""
39+
write_api = client.write_api(write_options=SYNCHRONOUS)
40+
41+
"""
42+
Prepare batches from generator
43+
"""
44+
batches = rx \
45+
.from_iterable(csv_to_generator('vix-daily.csv')) \
46+
.pipe(ops.buffer_with_count(500))
47+
48+
49+
def write_batch(batch):
50+
"""
51+
Synchronous write
52+
"""
53+
print(f'Writing... {len(batch)}')
54+
write_api.write(bucket='my-bucket', record=batch)
55+
56+
57+
"""
58+
Write batches
59+
"""
60+
batches.subscribe(on_next=lambda batch: write_batch(batch),
61+
on_error=lambda ex: print(f'Unexpected error: {ex}'),
62+
on_completed=lambda: print('Import finished!'))
63+
64+
"""
65+
Dispose client
66+
"""
67+
write_api.close()
68+
client.close()

0 commit comments

Comments
 (0)