Skip to content

Commit 832a661

Browse files
committed
#2: Implemented flush_interval
1 parent d732462 commit 832a661

File tree

3 files changed

+33
-7
lines changed

3 files changed

+33
-7
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ The [WriteApiClient](https://github.com/bonitoo-io/influxdb-client-python/blob/m
8989
| --- | --- | --- |
9090
| [**write_type**](#write_type) | how the client writes data; allowed values: `batching`, `asynchronous`, `synchronous`| `batching` |
9191
| **batch_size** | the number of data point to collect in batch | `1000` |
92+
| **flush_interval** | the number of milliseconds before the batch is written | `1000` |
9293

9394
##### write_type
9495
* `batching` - data are writes in batches defined by `batch_size`, `flush_interval`, ...

influxdb2/client/write_api.py

+8-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# coding: utf-8
22
import logging
3-
import time
3+
from datetime import timedelta
44
from enum import Enum
5+
from time import sleep
56

67
import rx
78
from rx import operators as ops, Observable
@@ -24,8 +25,8 @@ class WriteType(Enum):
2425

2526
class WriteOptions(object):
2627

27-
def __init__(self, write_type=WriteType.batching, batch_size=1_000, flush_interval=None, jitter_interval=None,
28-
retry_interval=None, buffer_limit=None, write_scheduler=None) -> None:
28+
def __init__(self, write_type=WriteType.batching, batch_size=1_000, flush_interval=1_000, jitter_interval=None,
29+
retry_interval=None, buffer_limit=None, write_scheduler=NewThreadScheduler()) -> None:
2930
self.write_type = write_type
3031
self.batch_size = batch_size
3132
self.flush_interval = flush_interval
@@ -118,9 +119,10 @@ def __init__(self, service, write_options=WriteOptions()) -> None:
118119
if self._write_options.write_type is WriteType.batching:
119120
self._subject = Subject()
120121

121-
observable = self._subject.pipe(ops.observe_on(NewThreadScheduler()))
122+
observable = self._subject.pipe(ops.observe_on(self._write_options.write_scheduler))
122123
self._disposable = observable\
123-
.pipe(ops.window_with_count(write_options.batch_size),
124+
.pipe(ops.window_with_time_or_count(count=write_options.batch_size,
125+
timespan=timedelta(milliseconds=write_options.flush_interval)),
124126
ops.flat_map(lambda v: _window_to_group(v)),
125127
ops.map(mapper=lambda x: self._retryable(x)),
126128
ops.merge_all()) \
@@ -172,7 +174,7 @@ def __del__(self):
172174
self._subject.dispose()
173175
self._subject = None
174176
# TODO remove sleep
175-
time.sleep(2)
177+
sleep(2)
176178
if self._disposable:
177179
self._disposable.dispose()
178180
self._disposable = None

influxdb2_test/test_WriteApiBatching.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def setUp(self) -> None:
3434
header_value="Token my-token")
3535

3636
self._write_client = WriteApiClient(service=WriteService(api_client=self._api_client),
37-
write_options=WriteOptions(batch_size=2))
37+
write_options=WriteOptions(batch_size=2, flush_interval=5_000))
3838

3939
def tearDown(self) -> None:
4040
pass
@@ -133,6 +133,29 @@ def test_batch_size_group_by(self):
133133

134134
pass
135135

136+
def test_flush_interval(self):
137+
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204)
138+
139+
self._write_client.write("my-bucket", "my-org",
140+
["h2o_feet,location=coyote_creek level\\ water_level=1.0 1",
141+
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2"])
142+
143+
time.sleep(1)
144+
self.assertEqual(1, len(httpretty.httpretty.latest_requests))
145+
146+
self._write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek level\\ water_level=3.0 3")
147+
148+
time.sleep(2)
149+
150+
self.assertEqual(1, len(httpretty.httpretty.latest_requests))
151+
152+
time.sleep(3)
153+
154+
self.assertEqual(2, len(httpretty.httpretty.latest_requests))
155+
156+
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=3.0 3",
157+
httpretty.httpretty.latest_requests[1].parsed_body)
158+
136159
def test_recover_from_error(self):
137160
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204)
138161
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=400)

0 commit comments

Comments
 (0)