Skip to content

feat: add MultiprocessingWriter to help user write data in independent OS process #356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- `BucketsApi` - add possibility to: `update`
- `OrganizationsApi` - add possibility to: `update`
- `UsersApi` - add possibility to: `update`, `delete`, `find`
1. [#356](https://github.com/influxdata/influxdb-client-python/pull/356): Add `MultiprocessingWriter` to write data in independent OS process

### Bug Fixes
1. [#359](https://github.com/influxdata/influxdb-client-python/pull/359): Correct serialization empty columns into LineProtocol [DataFrame]
Expand Down
12 changes: 12 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,15 @@ DeleteApi

.. autoclass:: influxdb_client.domain.DeletePredicateRequest
:members:

Helpers
"""""""
.. autoclass:: influxdb_client.client.util.date_utils.DateHelper
:members:

.. autoclass:: influxdb_client.client.util.date_utils_pandas.PandasDateTimeHelper
:members:

.. autoclass:: influxdb_client.client.util.multiprocessing_helper.MultiprocessingWriter
:members:

205 changes: 205 additions & 0 deletions influxdb_client/client/util/multiprocessing_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""
Helpers classes to make easier use the client in multiprocessing environment.

For more information how the multiprocessing works see Python's
`reference docs <https://docs.python.org/3/library/multiprocessing.html>`_.
"""
import logging
import multiprocessing

from influxdb_client import InfluxDBClient, WriteOptions
from influxdb_client.client.exceptions import InfluxDBError

logger = logging.getLogger(__name__)


def _success_callback(conf: (str, str, str), data: str):
"""Successfully writen batch."""
logger.debug(f"Written batch: {conf}, data: {data}")


def _error_callback(conf: (str, str, str), data: str, exception: InfluxDBError):
"""Unsuccessfully writen batch."""
logger.debug(f"Cannot write batch: {conf}, data: {data} due: {exception}")


def _retry_callback(conf: (str, str, str), data: str, exception: InfluxDBError):
"""Retryable error."""
logger.debug(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")


class _PoisonPill:
"""To notify process to terminate."""

pass


class MultiprocessingWriter(multiprocessing.Process):
"""
The Helper class to write data into InfluxDB in independent OS process.

Example:
.. code-block:: python

from influxdb_client import WriteOptions
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter


def main():
writer = MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org",
write_options=WriteOptions(batch_size=100))
writer.start()

for x in range(1, 1000):
writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}")

writer.__del__()


if __name__ == '__main__':
main()


How to use with context_manager:
.. code-block:: python

from influxdb_client import WriteOptions
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter


def main():
with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org",
write_options=WriteOptions(batch_size=100)) as writer:
for x in range(1, 1000):
writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}")


if __name__ == '__main__':
main()


How to handle batch events:
.. code-block:: python

from influxdb_client import WriteOptions
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter


class BatchingCallback(object):

def success(self, conf: (str, str, str), data: str):
print(f"Written batch: {conf}, data: {data}")

def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")


def main():
callback = BatchingCallback()
with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org",
success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry) as writer:

for x in range(1, 1000):
writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}")


if __name__ == '__main__':
main()


"""

__started__ = False
__disposed__ = False

def __init__(self, **kwargs) -> None:
"""
Initialize defaults.

For more information how to initialize the writer see the examples above.

:param kwargs: arguments are passed into ``__init__`` function of ``InfluxDBClient`` and ``write_api``.
"""
multiprocessing.Process.__init__(self)
self.kwargs = kwargs
self.client = None
self.write_api = None
self.queue_ = multiprocessing.Manager().Queue()

def write(self, **kwargs) -> None:
"""
Append time-series data into underlying queue.

For more information how to pass arguments see the examples above.

:param kwargs: arguments are passed into ``write`` function of ``WriteApi``
:return: None
"""
assert self.__disposed__ is False, 'Cannot write data: the writer is closed.'
assert self.__started__ is True, 'Cannot write data: the writer is not started.'
self.queue_.put(kwargs)

def run(self):
"""Initialize ``InfluxDBClient`` and waits for data to writes into InfluxDB."""
# Initialize Client and Write API
self.client = InfluxDBClient(**self.kwargs)
self.write_api = self.client.write_api(write_options=self.kwargs.get('write_options', WriteOptions()),
success_callback=self.kwargs.get('success_callback', _success_callback),
error_callback=self.kwargs.get('error_callback', _error_callback),
retry_callback=self.kwargs.get('retry_callback', _retry_callback))
# Infinite loop - until poison pill
while True:
next_record = self.queue_.get()
if type(next_record) is _PoisonPill:
# Poison pill means break the loop
self.terminate()
self.queue_.task_done()
break
self.write_api.write(**next_record)
self.queue_.task_done()

def start(self) -> None:
"""Start independent process for writing data into InfluxDB."""
super().start()
self.__started__ = True

def terminate(self) -> None:
"""
Cleanup resources in independent process.

This function **cannot be used** to terminate the ``MultiprocessingWriter``.
If you want to finish your writes please call: ``__del__``.
"""
if self.write_api:
logger.info("flushing data...")
self.write_api.__del__()
self.write_api = None
if self.client:
self.client.__del__()
self.client = None
logger.info("closed")

def __enter__(self):
"""Enter the runtime context related to this object."""
self.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
"""Exit the runtime context related to this object."""
self.__del__()

def __del__(self):
"""Dispose the client and write_api."""
if self.__started__:
self.queue_.put(_PoisonPill())
self.queue_.join()
self.join()
self.queue_ = None
self.__started__ = False
self.__disposed__ = True
7 changes: 6 additions & 1 deletion influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,4 +565,9 @@ def __setstate__(self, state):
"""Set your object with the provided dict."""
self.__dict__.update(state)
# Init Rx
self.__init__(self._influxdb_client, self._write_options, self._point_settings)
self.__init__(self._influxdb_client,
self._write_options,
self._point_settings,
success_callback=self._success_callback,
error_callback=self._error_callback,
retry_callback=self._retry_callback)
72 changes: 72 additions & 0 deletions tests/test_MultiprocessingWriter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os
import unittest
from datetime import datetime

from influxdb_client import WritePrecision, InfluxDBClient
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
from influxdb_client.client.write_api import SYNCHRONOUS


# noinspection PyMethodMayBeStatic
class MultiprocessingWriterTest(unittest.TestCase):

def setUp(self) -> None:
self.url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086")
self.token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")
self.org = os.getenv('INFLUXDB_V2_ORG', "my-org")
self.writer = None

def tearDown(self) -> None:
if self.writer:
self.writer.__del__()

def test_write_without_start(self):
self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org,
write_options=SYNCHRONOUS)

with self.assertRaises(AssertionError) as ve:
self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5")

self.assertEqual('Cannot write data: the writer is not started.', f'{ve.exception}')

def test_write_after_terminate(self):
self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org,
write_options=SYNCHRONOUS)
self.writer.start()
self.writer.__del__()

with self.assertRaises(AssertionError) as ve:
self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5")

self.assertEqual('Cannot write data: the writer is closed.', f'{ve.exception}')

def test_terminate_twice(self):
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
writer.__del__()
writer.terminate()
writer.terminate()
writer.__del__()

def test_use_context_manager(self):
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
self.assertIsNotNone(writer)

def test_pass_parameters(self):
unique = get_date_helper().to_nanoseconds(datetime.utcnow() - datetime.utcfromtimestamp(0))

# write data
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
writer.write(bucket="my-bucket", record=f"mem_{unique},tag=a value=5i 10", write_precision=WritePrecision.S)

# query data
with InfluxDBClient(url=self.url, token=self.token, org=self.org) as client:
query_api = client.query_api()
tables = query_api.query(
f'from(bucket: "my-bucket") |> range(start: 0) |> filter(fn: (r) => r._measurement == "mem_{unique}")',
self.org)
record = tables[0].records[0]
self.assertIsNotNone(record)
self.assertEqual("a", record["tag"])
self.assertEqual(5, record["_value"])
self.assertEqual(get_date_helper().to_utc(datetime.utcfromtimestamp(10)), record["_time"])