Skip to content

Commit fb813e9

Browse files
committed
#2: Added example with iot sensor
1 parent b99c7e8 commit fb813e9

File tree

7 files changed

+180
-10
lines changed

7 files changed

+180
-10
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
## 1.0.0 [unreleased]
2+
3+
### Features
4+
1. [#2](https://github.com/bonitoo-io/influxdb-client-python/issues/2): The write client is able to write data in batches (configuration: `batch_size`, `flush_interval`, `jitter_interval`, `retry_interval`)

README.md

+87
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ InfluxDB 2.0 python client library. TODO...
99
- [Features](#how-to-use)
1010
- [Writing data](#writes)
1111
- [How to efficiently import large dataset](#how-to-efficiently-import-large-dataset)
12+
- [Efficiency write data from IOT sensor](#efficiency-write-data-from-iot-sensor)
1213

1314
## Requirements
1415

@@ -101,6 +102,8 @@ The [WriteApiClient](https://github.com/bonitoo-io/influxdb-client-python/blob/m
101102

102103
#### How to efficiently import large dataset
103104

105+
- sources - [import_data_set.py](https://github.com/bonitoo-io/influxdb-client-python/blob/master/influxdb2_test/import_data_set.py)
106+
104107
```python
105108
"""
106109
Import VIX - CBOE Volatility Index - from "vix-daily.csv" file into InfluxDB 2.0
@@ -189,5 +192,89 @@ for table in result:
189192
Close client
190193
"""
191194
client.__del__()
195+
```
196+
197+
#### Efficiency write data from IOT sensor
198+
199+
- sources - [iot_sensor.py](https://github.com/bonitoo-io/influxdb-client-python/blob/master/influxdb2_test/iot_sensor.py)
200+
201+
```python
202+
"""
203+
Efficiency write data from IOT sensor - write changed temperature every minute
204+
"""
205+
import atexit
206+
import platform
207+
from datetime import timedelta
208+
209+
import psutil as psutil
210+
import rx
211+
from rx import operators as ops
212+
213+
from influxdb2.client.influxdb_client import InfluxDBClient
214+
from influxdb2.client.write_api import WriteApi
215+
from influxdb2.client.write_api import WriteOptions
216+
217+
218+
def on_exit(db_client: InfluxDBClient, write_api: WriteApi):
219+
"""Close clients after terminate a script.
220+
221+
:param db_client: InfluxDB client
222+
:param write_api: WriteApi
223+
:return: nothing
224+
"""
225+
write_api.__del__()
226+
db_client.__del__()
227+
228+
229+
def sensor_temperature():
230+
"""Read a CPU temperature. The [psutil] doesn't support MacOS so we use [sysctl].
231+
232+
:return: actual CPU temperature
233+
"""
234+
os_name = platform.system()
235+
if os_name == 'Darwin':
236+
from subprocess import check_output
237+
output = check_output(["sysctl", "machdep.xcpm.cpu_thermal_level"])
238+
import re
239+
return re.findall(r'\d+', str(output))[0]
240+
else:
241+
return psutil.sensors_temperatures()["coretemp"][0]
242+
243+
244+
def line_protocol(temperature):
245+
"""Create a InfluxDB line protocol with structure:
246+
247+
iot_sensor,hostname=mine_sensor_12,type=temperature value=68
248+
249+
:param temperature: the sensor temperature
250+
:return: Line protocol to write into InfluxDB
251+
"""
252+
253+
import socket
254+
return 'iot_sensor,hostname={},type=temperature value={}'.format(socket.gethostname(), temperature)
255+
256+
257+
"""
258+
Read temperature every minute; distinct_until_changed - produce only if temperature change
259+
"""
260+
data = rx.interval(period=timedelta(seconds=60))\
261+
.pipe(ops.map(lambda t: sensor_temperature()),
262+
ops.map(lambda temperature: line_protocol(temperature)),
263+
ops.distinct_until_changed())
264+
265+
_db_client = InfluxDBClient(url="http://localhost:9999/api/v2", token="my-token-123", org="my-org", debug=True)
266+
267+
"""
268+
Create client that writes data into InfluxDB
269+
"""
270+
_write_api = _db_client.write_api(write_options=WriteOptions(batch_size=1))
271+
_write_api.write(org="my-org", bucket="my-bucket", record=data)
272+
273+
274+
"""
275+
Call after terminate a script
276+
"""
277+
atexit.register(on_exit, _db_client, _write_api)
192278

279+
input()
193280
```

influxdb2/client/influxdb_client.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from influxdb2.client.organizations_api import OrganizationsApi
77
from influxdb2.client.query_api import QueryApi
88
from influxdb2.client.users_api import UsersApi
9-
from influxdb2.client.write_api import WriteApiClient, WriteOptions
9+
from influxdb2.client.write_api import WriteApi, WriteOptions
1010

1111

1212
class InfluxDBClient(object):
@@ -40,7 +40,7 @@ def __init__(self,
4040

4141
def write_api(self, write_options=WriteOptions()):
4242
service = influxdb2.service.write_service.WriteService(self.api_client)
43-
return WriteApiClient(service=service, write_options=write_options)
43+
return WriteApi(service=service, write_options=write_options)
4444

4545
def query_api(self):
4646
return QueryApi(self)

influxdb2/client/write_api.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def _window_to_group(value):
108108
ops.group_by(_group_by), ops.map(_group_to_batch), ops.merge_all())), ops.merge_all())
109109

110110

111-
class WriteApiClient(AbstractClient):
111+
class WriteApi(AbstractClient):
112112

113113
def __init__(self, service, write_options=WriteOptions()) -> None:
114114
self._write_service = service

influxdb2_test/iot_sensor.py

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""
2+
Efficiency write data from IOT sensor - write changed temperature every minute
3+
"""
4+
import atexit
5+
import platform
6+
from datetime import timedelta
7+
8+
import psutil as psutil
9+
import rx
10+
from rx import operators as ops
11+
12+
from influxdb2.client.influxdb_client import InfluxDBClient
13+
from influxdb2.client.write_api import WriteApi
14+
from influxdb2.client.write_api import WriteOptions
15+
16+
17+
def on_exit(db_client: InfluxDBClient, write_api: WriteApi):
18+
"""Close clients after terminate a script.
19+
20+
:param db_client: InfluxDB client
21+
:param write_api: WriteApi
22+
:return: nothing
23+
"""
24+
write_api.__del__()
25+
db_client.__del__()
26+
27+
28+
def sensor_temperature():
29+
"""Read a CPU temperature. The [psutil] doesn't support MacOS so we use [sysctl].
30+
31+
:return: actual CPU temperature
32+
"""
33+
os_name = platform.system()
34+
if os_name == 'Darwin':
35+
from subprocess import check_output
36+
output = check_output(["sysctl", "machdep.xcpm.cpu_thermal_level"])
37+
import re
38+
return re.findall(r'\d+', str(output))[0]
39+
else:
40+
return psutil.sensors_temperatures()["coretemp"][0]
41+
42+
43+
def line_protocol(temperature):
44+
"""Create a InfluxDB line protocol with structure:
45+
46+
iot_sensor,hostname=mine_sensor_12,type=temperature value=68
47+
48+
:param temperature: the sensor temperature
49+
:return: Line protocol to write into InfluxDB
50+
"""
51+
52+
import socket
53+
return 'iot_sensor,hostname={},type=temperature value={}'.format(socket.gethostname(), temperature)
54+
55+
56+
"""
57+
Read temperature every minute; distinct_until_changed - produce only if temperature change
58+
"""
59+
data = rx.interval(period=timedelta(seconds=60))\
60+
.pipe(ops.map(lambda t: sensor_temperature()),
61+
ops.map(lambda temperature: line_protocol(temperature)),
62+
ops.distinct_until_changed())
63+
64+
_db_client = InfluxDBClient(url="http://localhost:9999/api/v2", token="my-token-123", org="my-org", debug=True)
65+
66+
"""
67+
Create client that writes data into InfluxDB
68+
"""
69+
_write_api = _db_client.write_api(write_options=WriteOptions(batch_size=1))
70+
_write_api.write(org="my-org", bucket="my-bucket", record=data)
71+
72+
73+
"""
74+
Call after terminate a script
75+
"""
76+
atexit.register(on_exit, _db_client, _write_api)
77+
78+
input()

influxdb2_test/test_WriteApiBatching.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import influxdb2
1212
from influxdb2 import WritePrecision, WriteService
1313
from influxdb2.client.write.point import Point
14-
from influxdb2.client.write_api import WriteOptions, WriteApiClient
14+
from influxdb2.client.write_api import WriteOptions, WriteApi
1515
from influxdb2_test.base_test import BaseTest
1616

1717

@@ -33,9 +33,9 @@ def setUp(self) -> None:
3333
self._api_client = influxdb2.ApiClient(configuration=conf, header_name="Authorization",
3434
header_value="Token my-token")
3535

36-
self._write_client = WriteApiClient(service=WriteService(api_client=self._api_client),
37-
write_options=WriteOptions(batch_size=2, flush_interval=5_000,
38-
retry_interval=3_000))
36+
self._write_client = WriteApi(service=WriteService(api_client=self._api_client),
37+
write_options=WriteOptions(batch_size=2, flush_interval=5_000,
38+
retry_interval=3_000))
3939

4040
def tearDown(self) -> None:
4141
pass
@@ -159,9 +159,9 @@ def test_flush_interval(self):
159159

160160
def test_jitter_interval(self):
161161
self._write_client.__del__()
162-
self._write_client = WriteApiClient(service=WriteService(api_client=self._api_client),
163-
write_options=WriteOptions(batch_size=2, flush_interval=5_000,
164-
jitter_interval=3_000))
162+
self._write_client = WriteApi(service=WriteService(api_client=self._api_client),
163+
write_options=WriteOptions(batch_size=2, flush_interval=5_000,
164+
jitter_interval=3_000))
165165

166166
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204)
167167

test-requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ py>=1.4.31
55
randomize>=0.13
66
pytest>=5.0.0
77
httpretty>=0.9.6
8+
psutil>=5.6.3
89

0 commit comments

Comments
 (0)