Skip to content

feat(write): add support for callback notification for batching mode #341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 13, 2021
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ jobs:
python examples/monitoring_and_alerting.py
python examples/buckets_management.py
python examples/write_structured_data.py
python examples/write_api_callbacks.py
check-sphinx:
docker:
- image: *default-python
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
### Features
1. [#330](https://github.com/influxdata/influxdb-client-python/pull/330): Add support for write structured data - `NamedTuple`, `Data Classes`
1. [#335](https://github.com/influxdata/influxdb-client-python/pull/335): Add support for custom precision for index specified as number [DataFrame]
1. [#341](https://github.com/influxdata/influxdb-client-python/pull/341): Add support for handling batch events

### Documentation
1. [#331](https://github.com/influxdata/influxdb-client-python/pull/331): Add [Migration Guide](MIGRATION_GUIDE.rst)
1. [#341](https://github.com/influxdata/influxdb-client-python/pull/341): How to handle client errors

## 1.21.0 [2021-09-17]

Expand Down
59 changes: 58 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ InfluxDB 2.0 client features
- `Proxy configuration`_
- `Nanosecond precision`_
- `Delete data`_
- `Handling Errors`_

Installation
------------
Expand Down Expand Up @@ -1152,8 +1153,62 @@ The following forward compatible APIs are available:

For detail info see `InfluxDB 1.8 example <examples/influxdb_18_example.py>`_.

Handling Errors
^^^^^^^^^^^^^^^
.. marker-handling-errors-start

Errors happen and it's important that your code is prepared for them. All client related exceptions are delivered from
``InfluxDBError``. If the exception cannot be recovered in the client it is returned to the application.
These exceptions are left for the developer to handle.

Almost all APIs directly return unrecoverable exceptions to be handled this way:

.. code-block:: python

from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS

with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
try:
client.write_api(write_options=SYNCHRONOUS).write("my-bucket", record="mem,tag=a value=86")
except InfluxDBError as e:
if e.response.status == 401:
raise Exception(f"Insufficient write permissions to 'my-bucket'.") from e
raise


The only exception is **batching** ``WriteAPI`` (for more info see `Batching`_). where you need to register custom callbacks to handle batch events.
This is because this API runs in the ``background`` in a ``separate`` thread and isn't possible to directly
return underlying exceptions.

.. code-block:: python

from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError


class BatchingCallback(object):

def success(self, conf: (str, str, str), data: str):
print(f"Written batch: {conf}, data: {data}")

def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")


with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
callback = BatchingCallback()
with client.write_api(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry) as write_api:
pass

HTTP Retry Strategy
^^^^^^^^^^^^^^^^^^^
"""""""""""""""""""
By default the client uses a retry strategy only for batching writes (for more info see `Batching`_).
For other HTTP requests there is no one retry strategy, but it could be configured by ``retries``
parameter of ``InfluxDBClient``.
Expand All @@ -1169,6 +1224,8 @@ For more info about how configure HTTP retry see details in `urllib3 documentati
retries = Retry(connect=5, read=2, redirect=5)
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", retries=retries)

.. marker-handling-errors-end

Nanosecond precision
^^^^^^^^^^^^^^^^^^^^
.. marker-nanosecond-start
Expand Down
6 changes: 6 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ Nanosecond precision
:start-after: marker-nanosecond-start
:end-before: marker-nanosecond-end

Handling Errors
^^^^^^^^^^^^^^^
.. include:: ../README.rst
:start-after: marker-handling-errors-start
:end-before: marker-handling-errors-end

Debugging
^^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame
- [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/)
- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB
- [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events
- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_)

## Queries
Expand Down
49 changes: 49 additions & 0 deletions examples/write_api_callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
How to use WriteApi's callbacks to notify about state of background batches.
"""

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError

"""
Configuration
"""
url = 'http://localhost:8086'
token = 'my-token'
org = 'my-org'
bucket = 'my-bucket'

"""
Data
"""
points = [Point("my-temperature").tag("location", "Prague").field("temperature", 25.3),
Point("my-temperature").tag("location", "New York").field("temperature", 18.4)]


class BatchingCallback(object):

def success(self, conf: (str, str, str), data: str):
"""Successfully writen batch."""
print(f"Written batch: {conf}, data: {data}")

def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
"""Unsuccessfully writen batch."""
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
"""Retryable error."""
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")


callback = BatchingCallback()
with InfluxDBClient(url=url, token=token, org=org) as client:
"""
Use batching API
"""
with client.write_api(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry) as write_api:
write_api.write(bucket=bucket, record=points)
print()
print("Wait to finishing ingesting...")
print()
5 changes: 3 additions & 2 deletions influxdb_client/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# flake8: noqa

"""
Influx API Service.
Influx OSS API Service.

No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501

OpenAPI spec version: 0.1.0
OpenAPI spec version: 2.0.0
Generated by: https://openapi-generator.tech
"""

Expand All @@ -19,6 +19,7 @@
from influxdb_client.service.checks_service import ChecksService
from influxdb_client.service.dbr_ps_service import DBRPsService
from influxdb_client.service.dashboards_service import DashboardsService
from influxdb_client.service.delete_service import DeleteService
from influxdb_client.service.health_service import HealthService
from influxdb_client.service.labels_service import LabelsService
from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService
Expand Down
81 changes: 77 additions & 4 deletions influxdb_client/client/influxdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,88 @@ def from_env_properties(cls, debug=None, enable_gzip=False):
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
profilers=profilers)

def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi:
def write_api(self, write_options=WriteOptions(), point_settings=PointSettings(), **kwargs) -> WriteApi:
"""
Create a Write API instance.

:param point_settings:
:param write_options: write api configuration
Example:
.. code-block:: python

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS


# Initialize SYNCHRONOUS instance of WriteApi
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
write_api = client.write_api(write_options=SYNCHRONOUS)

If you would like to use a **background batching**, you have to configure client like this:

.. code-block:: python

from influxdb_client import InfluxDBClient

# Initialize background batching instance of WriteApi
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
with client.write_api() as write_api:
pass

There is also possibility to use callbacks to notify about state of background batches:

.. code-block:: python

from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError


class BatchingCallback(object):

def success(self, conf: (str, str, str), data: str):
print(f"Written batch: {conf}, data: {data}")

def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")


with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
callback = BatchingCallback()
with client.write_api(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry) as write_api:
pass

:param write_options: Write API configuration
:param point_settings: settings to store default tags
:key success_callback: The callable ``callback`` to run after successfully writen a batch.

The callable must accept two arguments:
- `Tuple`: ``(bucket, organization, precision)``
- `str`: written data

**[batching mode]**

:key error_callback: The callable ``callback`` to run after unsuccessfully writen a batch.

The callable must accept three arguments:
- `Tuple`: ``(bucket, organization, precision)``
- `str`: written data
- `Exception`: an occurred error

**[batching mode]**
:key retry_callback: The callable ``callback`` to run after retryable error occurred.

The callable must accept three arguments:
- `Tuple`: ``(bucket, organization, precision)``
- `str`: written data
- `Exception`: an retryable error

**[batching mode]**
:return: write api instance
"""
return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings)
return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings, **kwargs)

def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi:
"""
Expand Down
5 changes: 3 additions & 2 deletions influxdb_client/client/write/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# flake8: noqa

"""
Influx API Service.
Influx OSS API Service.

No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501

OpenAPI spec version: 0.1.0
OpenAPI spec version: 2.0.0
Generated by: https://openapi-generator.tech
"""

Expand All @@ -19,6 +19,7 @@
from influxdb_client.service.checks_service import ChecksService
from influxdb_client.service.dbr_ps_service import DBRPsService
from influxdb_client.service.dashboards_service import DashboardsService
from influxdb_client.service.delete_service import DeleteService
from influxdb_client.service.health_service import HealthService
from influxdb_client.service.labels_service import LabelsService
from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService
Expand Down
42 changes: 28 additions & 14 deletions influxdb_client/client/write/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timedelta
from itertools import takewhile
from random import random
from typing import Callable

from urllib3 import Retry
from urllib3.exceptions import MaxRetryError, ResponseError
Expand All @@ -17,25 +18,32 @@ class WritesRetry(Retry):
"""
Writes retry configuration.

:param int jitter_interval: random milliseconds when retrying writes
:param num max_retry_delay: maximum delay when retrying write in seconds
:param int max_retry_time: maximum total retry timeout in seconds, attempt after this timout throws MaxRetryError
:param int total: maximum number of retries
:param num retry_interval: initial first retry delay range in seconds
:param int exponential_base: base for the exponential retry delay,

The next delay is computed as random value between range
`retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts)

Example: for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5
retry delays are random distributed values within the ranges of
[5-10, 10-20, 20-40, 40-80, 80-125]
`retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts)

Example:
for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5
retry delays are random distributed values within the ranges of
[5-10, 10-20, 20-40, 40-80, 80-125]
"""

def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, max_retry_time=180, total=5,
retry_interval=5, **kw):
"""Initialize defaults."""
retry_interval=5, retry_callback: Callable[[Exception], int] = None, **kw):
"""
Initialize defaults.

:param int jitter_interval: random milliseconds when retrying writes
:param num max_retry_delay: maximum delay when retrying write in seconds
:param int max_retry_time: maximum total retry timeout in seconds,
attempt after this timout throws MaxRetryError
:param int total: maximum number of retries
:param num retry_interval: initial first retry delay range in seconds
:param int exponential_base: base for the exponential retry delay,
:param Callable[[Exception], int] retry_callback: the callable ``callback`` to run after retryable
error occurred.
The callable must accept one argument:
- `Exception`: an retryable error
"""
super().__init__(**kw)
self.jitter_interval = jitter_interval
self.total = total
Expand All @@ -44,6 +52,7 @@ def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, m
self.max_retry_time = max_retry_time
self.exponential_base = exponential_base
self.retry_timeout = datetime.now() + timedelta(seconds=max_retry_time)
self.retry_callback = retry_callback

def new(self, **kw):
"""Initialize defaults."""
Expand All @@ -57,6 +66,8 @@ def new(self, **kw):
kw['max_retry_time'] = self.max_retry_time
if 'exponential_base' not in kw:
kw['exponential_base'] = self.exponential_base
if 'retry_callback' not in kw:
kw['retry_callback'] = self.retry_callback

new = super().new(**kw)
new.retry_timeout = self.retry_timeout
Expand Down Expand Up @@ -123,6 +134,9 @@ def increment(self, method=None, url=None, response=None, error=None, _pool=None
if isinstance(parsed_error, InfluxDBError):
message += f" Retry in {parsed_error.retry_after}s."

if self.retry_callback:
self.retry_callback(parsed_error)

logger.warning(message)

return new_retry
Expand Down
Loading