Skip to content

Commit d732462

Browse files
committed
#2: Support observable as a data to write
1 parent 06c6c8d commit d732462

File tree

2 files changed

+51
-10
lines changed

2 files changed

+51
-10
lines changed

influxdb2/client/write_api.py

+8-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from enum import Enum
55

66
import rx
7-
from rx import operators as ops
7+
from rx import operators as ops, Observable
88
from rx.core import GroupedObservable
99
from rx.scheduler import NewThreadScheduler
1010
from rx.subject import Subject
@@ -185,13 +185,15 @@ def _write_batching(self, bucket, org, data, precision=DEFAULT_WRITE_PRECISION):
185185

186186
elif isinstance(data, Point):
187187
self._subject.on_next(_BatchItem(key=_key, data=data.to_line_protocol()))
188+
188189
elif isinstance(data, list):
189190
for item in data:
190-
if isinstance(item, str):
191-
self._subject.on_next(
192-
_BatchItem(key=_key, data=item))
193-
if isinstance(item, Point):
194-
self._subject.on_next(_BatchItem(key=_key, data=item.to_line_protocol()))
191+
self._write_batching(bucket, org, item, precision)
192+
193+
elif isinstance(data, Observable):
194+
data.subscribe(lambda it: self._write_batching(bucket, org, it, precision))
195+
pass
196+
195197
return None
196198

197199
def _http(self, batch_item: _BatchItem):

influxdb2_test/test_WriteApiBatching.py

+43-4
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
import unittest
77

88
import httpretty
9+
import rx
910

1011
import influxdb2
1112
from influxdb2 import WritePrecision, WriteService
13+
from influxdb2.client.write.point import Point
1214
from influxdb2.client.write_api import WriteOptions, WriteApiClient
1315
from influxdb2_test.base_test import BaseTest
1416

@@ -132,7 +134,6 @@ def test_batch_size_group_by(self):
132134
pass
133135

134136
def test_recover_from_error(self):
135-
136137
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204)
137138
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=400)
138139

@@ -152,10 +153,48 @@ def test_recover_from_error(self):
152153

153154
pass
154155

155-
@unittest.skip(reason="TODO")
156156
def test_record_types(self):
157-
self.assertTrue(False, msg="TODO")
158-
self.assertTrue(False, msg="Add observable")
157+
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204)
158+
159+
# Record item
160+
_record = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1"
161+
self._write_client.write("my-bucket", "my-org", _record)
162+
163+
# Point item
164+
_point = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 2.0).time(2)
165+
self._write_client.write("my-bucket", "my-org", _point)
166+
167+
# Record list
168+
self._write_client.write("my-bucket", "my-org",
169+
["h2o_feet,location=coyote_creek level\\ water_level=3.0 3",
170+
"h2o_feet,location=coyote_creek level\\ water_level=4.0 4"])
171+
172+
# Point list
173+
_point1 = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 5.0).time(5)
174+
_point2 = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 6.0).time(6)
175+
self._write_client.write("my-bucket", "my-org", [_point1, _point2])
176+
177+
# Observable
178+
_recordObs = "h2o_feet,location=coyote_creek level\\ water_level=7.0 7"
179+
_pointObs = Point("h2o_feet").tag("location", "coyote_creek").field("level water_level", 8.0).time(8)
180+
181+
self._write_client.write("my-bucket", "my-org", rx.of(_recordObs, _pointObs))
182+
183+
time.sleep(1)
184+
185+
_requests = httpretty.httpretty.latest_requests
186+
187+
self.assertEqual(4, len(_requests))
188+
189+
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n"
190+
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2", _requests[0].parsed_body)
191+
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=3.0 3\n"
192+
"h2o_feet,location=coyote_creek level\\ water_level=4.0 4", _requests[1].parsed_body)
193+
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=5.0 5\n"
194+
"h2o_feet,location=coyote_creek level\\ water_level=6.0 6", _requests[2].parsed_body)
195+
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=7.0 7\n"
196+
"h2o_feet,location=coyote_creek level\\ water_level=8.0 8", _requests[3].parsed_body)
197+
159198
pass
160199

161200
def test_write_result(self):

0 commit comments

Comments
 (0)