Skip to content

Commit 3753ec8

Browse files
committed
#2: Added support for synchronous and asynchronous writes
1 parent fbd445f commit 3753ec8

File tree

6 files changed

+100
-36
lines changed

6 files changed

+100
-36
lines changed

README.md

+24-4
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22

33
[![Build Status](https://travis-ci.org/bonitoo-io/influxdb-client-python.svg?branch=master)](https://travis-ci.org/bonitoo-io/influxdb-client-python)
44

5-
65
InfluxDB 2.0 python client library. TODO...
76

8-
## Requirements.
7+
- [Requirements](#requirements)
8+
- [Getting Started](#getting-started)
9+
- [How To Use](#how-to-use)
10+
- [To write into InfluxDB 2.0](#writes)
11+
12+
## Requirements
913

1014
Python 2.7 and 3.4+
1115

@@ -45,12 +49,13 @@ Please follow the [installation procedure](#installation--usage) and then run th
4549
```python
4650
from influxdb2.client.influxdb_client import InfluxDBClient
4751
from influxdb2.client.write.point import Point
52+
from influxdb2.client.write_api import SYNCHRONOUS
4853

4954
bucket = "test_bucket"
5055

5156
client = InfluxDBClient(url="http://localhost:9999/api/v2", token="my-token-123", org="my-org")
5257

53-
write_api = client.write_api()
58+
write_api = client.write_api(write_options=SYNCHRONOUS)
5459
query_api = client.query_api()
5560

5661
p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
@@ -72,7 +77,22 @@ val_count = 0
7277
for row in csv_result:
7378
for cell in row:
7479
val_count += 1
80+
```
7581

82+
## How to use
7683

84+
### Writes
7785

78-
```
86+
The [WriteApiClient](https://github.com/bonitoo-io/influxdb-client-python/blob/master/influxdb2/client/write_api.py) that supports synchronous, asynchronous and batching writes into InfluxDB 2.0.
87+
88+
The write client could be configured by `WriteOptions`:
89+
90+
| Property | Description | Default Value |
91+
| --- | --- | --- |
92+
| [**write_type**](#write_type) | how the client writes data ; allowed values: `batching`, `asynchronous`, `synchronous`| `batching` |
93+
| **batch_size** | the number of data point to collect in batch | `1000` |
94+
95+
##### write_type
96+
* `batching` - data are writes in batches defined by `batch_size`, `flush_interval`, ...
97+
* `asynchronous` - data are writes in asynchronous HTTP request
98+
* `synchronous` - data are writes in synchronous HTTP request

influxdb2/client/influxdb_client.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from influxdb2.client.organizations_api import OrganizationsApi
77
from influxdb2.client.query_api import QueryApi
88
from influxdb2.client.users_api import UsersApi
9-
from influxdb2.client.write_api import WriteApiClient
9+
from influxdb2.client.write_api import WriteApiClient, WriteOptions
1010

1111

1212
class InfluxDBClient(object):
@@ -38,10 +38,9 @@ def __init__(self,
3838
self.api_client = influxdb2.ApiClient(configuration=conf, header_name=auth_header_name,
3939
header_value=auth_header_value)
4040

41-
def write_api(self):
41+
def write_api(self, write_options=WriteOptions()):
4242
service = influxdb2.service.write_service.WriteService(self.api_client)
43-
return WriteApiClient(service=service)
44-
# return
43+
return WriteApiClient(service=service, write_options=write_options)
4544

4645
def query_api(self):
4746
return QueryApi(self)

influxdb2/client/write_api.py

+23-12
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
11
# coding: utf-8
2-
3-
4-
from rx.scheduler import NewThreadScheduler
5-
from rx.subject import Subject
2+
from enum import Enum
63

74
from influxdb2 import WritePrecision
85
from influxdb2.client.abstract_client import AbstractClient
96
from influxdb2.client.write.point import Point
107

118

9+
class WriteType(Enum):
10+
batching = 1
11+
asynchronous = 2
12+
synchronous = 3
13+
14+
1215
class WriteOptions(object):
1316

14-
def __init__(self, batch_size=5000, flush_interval=1000, jitter_interval=0, retry_interval=1000,
15-
buffer_limit=10000,
16-
write_scheduler=NewThreadScheduler) -> None:
17+
def __init__(self, write_type=WriteType.batching, batch_size=None, flush_interval=None, jitter_interval=None,
18+
retry_interval=None, buffer_limit=None, write_scheduler=None) -> None:
19+
self.write_type = write_type
1720
self.batch_size = batch_size
1821
self.flush_interval = flush_interval
1922
self.jitter_interval = jitter_interval
@@ -22,13 +25,15 @@ def __init__(self, batch_size=5000, flush_interval=1000, jitter_interval=0, retr
2225
self.write_scheduler = write_scheduler
2326

2427

28+
SYNCHRONOUS = WriteOptions(write_type=WriteType.synchronous)
29+
ASYNCHRONOUS = WriteOptions(write_type=WriteType.asynchronous)
30+
31+
2532
class WriteApiClient(AbstractClient):
2633

27-
def __init__(self, service, write_options=None) -> None:
34+
def __init__(self, service, write_options=WriteOptions()) -> None:
2835
self._write_service = service
29-
self.write_options = write_options
30-
31-
_subject = Subject
36+
self._write_options = write_options
3237

3338
def write(self, bucket, org, record, write_precision=None):
3439

@@ -52,8 +57,14 @@ def write(self, bucket, org, record, write_precision=None):
5257
lines.append(item.to_line_protocol())
5358
final_string = '\n'.join(lines)
5459

55-
return self._write_service.post_write(org=org, bucket=bucket, body=final_string, precision=write_precision)
60+
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
61+
62+
return self._write_service.post_write(org=org, bucket=bucket, body=final_string, precision=write_precision,
63+
async_req=_async_req)
5664

5765
def flush(self):
5866
# TODO
5967
pass
68+
69+
def __del__(self):
70+
pass

influxdb2_test/base_test.py

-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ def setUp(self) -> None:
2626
self.client = InfluxDBClient(conf.host, auth_token, debug=conf.debug, org="my-org")
2727
self.api_client = self.client.api_client
2828

29-
self.write_api = influxdb2.service.write_service.WriteService(self.api_client)
30-
self.write_client = self.client.write_api()
31-
3229
self.query_client = self.client.query_api()
3330
self.buckets_client = self.client.buckets_api()
3431
self.my_organization = self.find_my_org()

influxdb2_test/example.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
import codecs
2-
import io
3-
import builtins
42
from datetime import datetime
53

6-
import pandas
7-
84
from influxdb2 import WritePrecision
95
from influxdb2.client.influxdb_client import InfluxDBClient
106
from influxdb2.client.write.point import Point
7+
from influxdb2.client.write_api import SYNCHRONOUS
118

129
bucket = "test_bucket"
1310

1411
client = InfluxDBClient(url="http://localhost:9999/api/v2", token="my-token-123", org="my-org")
1512

16-
write_api = client.write_api()
13+
write_api = client.write_api(write_options=SYNCHRONOUS)
1714
query_api = client.query_api()
1815

1916
p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(datetime.now(), WritePrecision.MS)

influxdb2_test/test_WriteApi.py

+48-8
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,22 @@
44

55
import datetime
66
import unittest
7+
from multiprocessing.pool import ApplyResult
78

89
from influxdb2 import WritePrecision
10+
from influxdb2.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
911
from influxdb2_test.base_test import BaseTest
1012

1113

12-
class SimpleWriteTest(BaseTest):
14+
class SynchronousWriteTest(BaseTest):
15+
16+
def setUp(self) -> None:
17+
super().setUp()
18+
self.write_client = self.client.write_api(write_options=SYNCHRONOUS)
19+
20+
def tearDown(self) -> None:
21+
self.write_client.__del__()
22+
super().tearDown()
1323

1424
def test_write_line_protocol(self):
1525
bucket = self.create_test_bucket()
@@ -32,11 +42,11 @@ def test_write_line_protocol(self):
3242

3343
#####################################
3444

35-
def test_write_precission(self):
45+
def test_write_precision(self):
3646
bucket = self.create_test_bucket()
3747

38-
self.client.write_api().write(org="my-org", bucket=bucket.name, record="air,location=Python humidity=99",
39-
write_precision=WritePrecision.MS)
48+
self.write_client.write(org="my-org", bucket=bucket.name, record="air,location=Python humidity=99",
49+
write_precision=WritePrecision.MS)
4050

4151
result = self.query_client.query(
4252
"from(bucket:\"" + bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z) |> last()", self.org)
@@ -45,17 +55,17 @@ def test_write_precission(self):
4555

4656
self.delete_test_bucket(bucket)
4757

48-
def test_WriteRecordsList(self):
58+
def test_write_records_list(self):
4959
bucket = self.create_test_bucket()
5060

5161
_record1 = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1"
5262
_record2 = "h2o_feet,location=coyote_creek level\\ water_level=2.0 2"
5363

54-
list = [_record1, _record2]
64+
record_list = [_record1, _record2]
5565

56-
self.client.write_api().write(bucket.name, self.org, list)
66+
self.write_client.write(bucket.name, self.org, record_list)
5767

58-
self.client.write_api().flush()
68+
self.write_client.flush()
5969

6070
query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
6171
print(query)
@@ -76,6 +86,36 @@ def test_WriteRecordsList(self):
7686
self.assertEqual(2, records[1].get_value())
7787
self.assertEqual("level water_level", records[1].get_field())
7888

89+
def test_write_result(self):
90+
91+
_bucket = self.create_test_bucket()
92+
93+
_record = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1"
94+
result = self.write_client.write(_bucket.name, self.org, _record)
95+
96+
# The success response is 204 - No Content
97+
self.assertEqual(None, result)
98+
99+
100+
class AsynchronousWriteTest(BaseTest):
101+
102+
def setUp(self) -> None:
103+
super().setUp()
104+
self.write_client = self.client.write_api(write_options=ASYNCHRONOUS)
105+
106+
def tearDown(self) -> None:
107+
self.write_client.__del__()
108+
super().tearDown()
109+
110+
def test_write_result(self):
111+
_bucket = self.create_test_bucket()
112+
113+
_record = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1"
114+
result = self.write_client.write(_bucket.name, self.org, _record)
115+
116+
self.assertEqual(ApplyResult, type(result))
117+
self.assertEqual(None, result.get())
118+
79119

80120
if __name__ == '__main__':
81121
unittest.main()

0 commit comments

Comments
 (0)