Skip to content

Commit 3820f52

Browse files
authored
fix: rest_api and write_api is able to be pickled (#70)
1 parent 877fcaa commit 3820f52

File tree

6 files changed

+112
-0
lines changed

6 files changed

+112
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ celerybeat-schedule
8787
.venv
8888
env/
8989
venv/
90+
venv-*/
9091
ENV/
9192
env.bak/
9293
venv.bak/
@@ -112,3 +113,4 @@ sandbox
112113

113114
# OpenAPI-generator
114115
/.openapi-generator*
116+
/tests/writer.pickle

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
### Bugs
77
1. [#61](https://github.com/influxdata/influxdb-client-python/issues/61): Correctly parse CSV where multiple results include multiple tables
88
1. [#66](https://github.com/influxdata/influxdb-client-python/issues/66): Correctly close connection pool manager at exit
9+
1. [#69](https://github.com/influxdata/influxdb-client-python/issues/69): `InfluxDBClient` and `WriteApi` could serialized by [pickle](https://docs.python.org/3/library/pickle.html#object.__getstate__) (python3.7 or higher)
910

1011
## 1.4.0 [2020-02-14]
1112

influxdb_client/client/write_api.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5252
self.retry_interval = retry_interval
5353
self.write_scheduler = write_scheduler
5454

55+
def __getstate__(self):
56+
state = self.__dict__.copy()
57+
# Remove write scheduler
58+
del state['write_scheduler']
59+
return state
60+
61+
def __setstate__(self, state):
62+
self.__dict__.update(state)
63+
# Init default write Scheduler
64+
self.write_scheduler = ThreadPoolScheduler(max_workers=1)
65+
5566

5667
SYNCHRONOUS = WriteOptions(write_type=WriteType.synchronous)
5768
ASYNCHRONOUS = WriteOptions(write_type=WriteType.asynchronous)
@@ -322,3 +333,16 @@ def _on_error(ex):
322333
def _on_complete(self):
323334
self._disposable.dispose()
324335
logger.info("the batching processor was disposed")
336+
337+
def __getstate__(self):
338+
state = self.__dict__.copy()
339+
# Remove rx
340+
del state['_subject']
341+
del state['_disposable']
342+
del state['_write_service']
343+
return state
344+
345+
def __setstate__(self, state):
346+
self.__dict__.update(state)
347+
# Init Rx
348+
self.__init__(self._influxdb_client, self._write_options, self._point_settings)

influxdb_client/rest.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ def __init__(self, configuration, pools_size=4, maxsize=None):
5858
# maxsize is the number of requests to host that are allowed in parallel # noqa: E501
5959
# Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501
6060

61+
self.configuration = configuration
62+
self.pools_size = pools_size
63+
self.maxsize = maxsize
64+
6165
# cert_reqs
6266
if configuration.verify_ssl:
6367
cert_reqs = ssl.CERT_REQUIRED
@@ -293,6 +297,17 @@ def PATCH(self, url, headers=None, query_params=None, post_params=None,
293297
_request_timeout=_request_timeout,
294298
body=body)
295299

300+
def __getstate__(self):
301+
state = self.__dict__.copy()
302+
# Remove Pool managaer
303+
del state['pool_manager']
304+
return state
305+
306+
def __setstate__(self, state):
307+
self.__dict__.update(state)
308+
# Init Pool manager
309+
self.__init__(self.configuration, self.pools_size, self.maxsize)
310+
296311

297312
class ApiException(Exception):
298313

openapi-generator/src/main/resources/python/rest.mustache

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ class RESTClientObject(object):
5050
# maxsize is the number of requests to host that are allowed in parallel # noqa: E501
5151
# Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501
5252

53+
self.configuration = configuration
54+
self.pools_size = pools_size
55+
self.maxsize = maxsize
56+
5357
# cert_reqs
5458
if configuration.verify_ssl:
5559
cert_reqs = ssl.CERT_REQUIRED
@@ -285,6 +289,17 @@ class RESTClientObject(object):
285289
_request_timeout=_request_timeout,
286290
body=body)
287291

292+
def __getstate__(self):
293+
state = self.__dict__.copy()
294+
# Remove Pool managaer
295+
del state['pool_manager']
296+
return state
297+
298+
def __setstate__(self, state):
299+
self.__dict__.update(state)
300+
# Init Pool manager
301+
self.__init__(self.configuration, self.pools_size, self.maxsize)
302+
288303

289304
class ApiException(Exception):
290305

tests/test_WriteApiPickle.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import pickle
2+
import sys
3+
4+
import pytest
5+
6+
from influxdb_client import InfluxDBClient, WriteOptions
7+
from influxdb_client.client.write_api import WriteType
8+
from tests.base_test import current_milli_time, BaseTest
9+
10+
11+
class InfluxDBWriterToPickle:
12+
13+
def __init__(self):
14+
self.client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=False)
15+
self.write_api = self.client.write_api(
16+
write_options=WriteOptions(write_type=WriteType.batching, batch_size=50_000, flush_interval=10_000))
17+
18+
def write(self, record):
19+
self.write_api.write(bucket="my-bucket", record=record)
20+
21+
def terminate(self) -> None:
22+
self.write_api.__del__()
23+
self.client.__del__()
24+
25+
26+
class WriteApiPickle(BaseTest):
27+
28+
def setUp(self) -> None:
29+
super().setUp()
30+
31+
def tearDown(self) -> None:
32+
super().tearDown()
33+
34+
@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7 or higher")
35+
def test_write_line_protocol(self):
36+
writer = InfluxDBWriterToPickle()
37+
38+
pickle_out = open("writer.pickle", "wb")
39+
pickle.dump(writer, pickle_out)
40+
pickle_out.close()
41+
42+
writer = pickle.load(open("writer.pickle", "rb"))
43+
44+
measurement = "h2o_feet_" + str(current_milli_time())
45+
writer.write(record=f"{measurement},location=coyote_creek water_level=1.0")
46+
writer.terminate()
47+
48+
tables = self.query_api.query(
49+
f'from(bucket: "my-bucket") |> range(start: 0) |> filter(fn: (r) => r._measurement == "{measurement}")')
50+
51+
self.assertEqual(len(tables), 1)
52+
self.assertEqual(len(tables[0].records), 1)
53+
self.assertEqual(tables[0].records[0].get_measurement(), measurement)
54+
self.assertEqual(tables[0].records[0].get_value(), 1.0)
55+
self.assertEqual(tables[0].records[0].get_field(), "water_level")

0 commit comments

Comments
 (0)