From e857942178694020247ad05bb2a71530654b5e25 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Wed, 6 Oct 2021 14:02:40 +0200 Subject: [PATCH 01/11] feat(write): add support for callback notification for batching mode --- influxdb_client/client/influxdb_client.py | 43 +++++++- influxdb_client/client/write/retry.py | 42 +++++--- influxdb_client/client/write_api.py | 96 +++++++++++++---- tests/test_WriteApiBatching.py | 123 +++++++++++++++++++++- 4 files changed, 263 insertions(+), 41 deletions(-) diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index e0c471e2..f70f71d3 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -247,15 +247,50 @@ def from_env_properties(cls, debug=None, enable_gzip=False): connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic), profilers=profilers) - def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi: + def write_api(self, write_options=WriteOptions(), point_settings=PointSettings(), **kwargs) -> WriteApi: """ Create a Write API instance. - :param point_settings: - :param write_options: write api configuration + Example: + .. code-block:: python + + from influxdb_client import InfluxDBClient + from influxdb_client.client.write_api import SYNCHRONOUS + + + # Initialize SYNCHRONOUS instance of WriteApi + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + write_api = client.write_api(write_options=SYNCHRONOUS) + + :param write_options: Write API configuration + :param point_settings: settings to store default tags + :key success_callback: The callable ``callback`` to run after successfully writen a batch. + + The callable must accept two arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + + **[batching mode]** + + :key error_callback: The callable ``callback`` to run after unsuccessfully writen a batch. + + The callable must accept three arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + - `Exception`: an occurred error + + **[batching mode]** + :key retry_callback: The callable ``callback`` to run after retryable error occurred. + + The callable must accept three arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + - `Exception`: an retryable error + + **[batching mode]** :return: write api instance """ - return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings) + return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings, **kwargs) def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi: """ diff --git a/influxdb_client/client/write/retry.py b/influxdb_client/client/write/retry.py index 136b8c1d..50fdaa46 100644 --- a/influxdb_client/client/write/retry.py +++ b/influxdb_client/client/write/retry.py @@ -4,6 +4,7 @@ from datetime import datetime, timedelta from itertools import takewhile from random import random +from typing import Callable from urllib3 import Retry from urllib3.exceptions import MaxRetryError, ResponseError @@ -17,25 +18,32 @@ class WritesRetry(Retry): """ Writes retry configuration. - :param int jitter_interval: random milliseconds when retrying writes - :param num max_retry_delay: maximum delay when retrying write in seconds - :param int max_retry_time: maximum total retry timeout in seconds, attempt after this timout throws MaxRetryError - :param int total: maximum number of retries - :param num retry_interval: initial first retry delay range in seconds - :param int exponential_base: base for the exponential retry delay, - The next delay is computed as random value between range - `retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts) - - Example: for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5 - retry delays are random distributed values within the ranges of - [5-10, 10-20, 20-40, 40-80, 80-125] + `retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts) + Example: + for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5 + retry delays are random distributed values within the ranges of + [5-10, 10-20, 20-40, 40-80, 80-125] """ def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, max_retry_time=180, total=5, - retry_interval=5, **kw): - """Initialize defaults.""" + retry_interval=5, retry_callback: Callable[[Exception], int] = None, **kw): + """ + Initialize defaults. + + :param int jitter_interval: random milliseconds when retrying writes + :param num max_retry_delay: maximum delay when retrying write in seconds + :param int max_retry_time: maximum total retry timeout in seconds, + attempt after this timout throws MaxRetryError + :param int total: maximum number of retries + :param num retry_interval: initial first retry delay range in seconds + :param int exponential_base: base for the exponential retry delay, + :param Callable[[Exception], int] retry_callback: the callable ``callback`` to run after retryable + error occurred. + The callable must accept one argument: + - `Exception`: an retryable error + """ super().__init__(**kw) self.jitter_interval = jitter_interval self.total = total @@ -44,6 +52,7 @@ def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, m self.max_retry_time = max_retry_time self.exponential_base = exponential_base self.retry_timeout = datetime.now() + timedelta(seconds=max_retry_time) + self.retry_callback = retry_callback def new(self, **kw): """Initialize defaults.""" @@ -57,6 +66,8 @@ def new(self, **kw): kw['max_retry_time'] = self.max_retry_time if 'exponential_base' not in kw: kw['exponential_base'] = self.exponential_base + if 'retry_callback' not in kw: + kw['retry_callback'] = self.retry_callback new = super().new(**kw) new.retry_timeout = self.retry_timeout @@ -123,6 +134,9 @@ def increment(self, method=None, url=None, response=None, error=None, _pool=None if isinstance(parsed_error, InfluxDBError): message += f" Retry in {parsed_error.retry_after}s." + if self.retry_callback: + self.retry_callback(parsed_error) + logger.warning(message) return new_retry diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 9b662c5c..af7c5b03 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -79,8 +79,14 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.exponential_base = exponential_base self.write_scheduler = write_scheduler - def to_retry_strategy(self): - """Create a Retry strategy from write options.""" + def to_retry_strategy(self, **kwargs): + """ + Create a Retry strategy from write options. + + :key retry_callback: The callable ``callback`` to run after retryable error occurred. + The callable must accept one argument: + - `Exception`: an retryable error + """ return WritesRetry( total=self.max_retries, retry_interval=self.retry_interval / 1_000, @@ -88,6 +94,7 @@ def to_retry_strategy(self): max_retry_delay=self.max_retry_delay / 1_000, max_retry_time=self.max_retry_time / 1_000, exponential_base=self.exponential_base, + retry_callback=kwargs.get("retry_callback", None), method_whitelist=["POST"]) def __getstate__(self): @@ -135,18 +142,6 @@ def add_default_tag(self, key, value) -> None: self.defaultTags[key] = self._get_value(value) -class _BatchItem(object): - def __init__(self, key, data, size=1) -> None: - self.key = key - self.data = data - self.size = size - pass - - def __str__(self) -> str: - return '_BatchItem[key:\'{}\', size: \'{}\']' \ - .format(str(self.key), str(self.size)) - - class _BatchItemKey(object): def __init__(self, bucket, org, precision=DEFAULT_WRITE_PRECISION) -> None: self.bucket = bucket @@ -166,8 +161,23 @@ def __str__(self) -> str: .format(str(self.bucket), str(self.org), str(self.precision)) +class _BatchItem(object): + def __init__(self, key: _BatchItemKey, data, size=1) -> None: + self.key = key + self.data = data + self.size = size + pass + + def to_key_tuple(self) -> (str, str, str): + return self.key.bucket, self.key.org, self.key.precision + + def __str__(self) -> str: + return '_BatchItem[key:\'{}\', size: \'{}\']' \ + .format(str(self.key), str(self.size)) + + class _BatchResponse(object): - def __init__(self, data: _BatchItem, exception=None): + def __init__(self, data: _BatchItem, exception: Exception = None): self.data = data self.exception = exception pass @@ -197,13 +207,48 @@ class WriteApi: write_api = client.write_api(write_options=SYNCHRONOUS) """ - def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions(), - point_settings: PointSettings = PointSettings()) -> None: - """Initialize defaults.""" + def __init__(self, + influxdb_client, + write_options: WriteOptions = WriteOptions(), + point_settings: PointSettings = PointSettings(), + **kwargs) -> None: + """ + Initialize defaults. + + :param influxdb_client: with default settings (organization) + :param write_options: write api configuration + :param point_settings: settings to store default tags. + :key success_callback: The callable ``callback`` to run after successfully writen a batch. + + The callable must accept two arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + + **[batching mode]** + :key error_callback: The callable ``callback`` to run after unsuccessfully writen a batch. + + The callable must accept three arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + - `Exception`: an occurred error + + **[batching mode]** + :key retry_callback: The callable ``callback`` to run after retryable error occurred. + + The callable must accept three arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + - `Exception`: an retryable error + + **[batching mode]** + """ self._influxdb_client = influxdb_client self._write_service = WriteService(influxdb_client.api_client) self._write_options = write_options self._point_settings = point_settings + self._success_callback = kwargs.get('success_callback', None) + self._error_callback = kwargs.get('error_callback', None) + self._retry_callback = kwargs.get('retry_callback', None) if influxdb_client.default_tags: for key, value in influxdb_client.default_tags.items(): @@ -452,7 +497,13 @@ def _http(self, batch_item: _BatchItem): logger.debug("Write time series data into InfluxDB: %s", batch_item) - retry = self._write_options.to_retry_strategy() + if self._retry_callback: + def _retry_callback_delegate(exception): + return self._retry_callback(batch_item.to_key_tuple(), batch_item.data, exception) + else: + _retry_callback_delegate = None + + retry = self._write_options.to_retry_strategy(retry_callback=_retry_callback_delegate) self._post_write(False, batch_item.key.bucket, batch_item.key.org, batch_item.data, batch_item.key.precision, urlopen_kw={'retries': retry}) @@ -483,12 +534,15 @@ def _to_response(self, data: _BatchItem, delay: timedelta): def _jitter_delay(self): return timedelta(milliseconds=random() * self._write_options.jitter_interval) - @staticmethod - def _on_next(response: _BatchResponse): + def _on_next(self, response: _BatchResponse): if response.exception: logger.error("The batch item wasn't processed successfully because: %s", response.exception) + if self._error_callback: + self._error_callback(response.data.to_key_tuple(), response.data.data, response.exception) else: logger.debug("The batch item: %s was processed successfully.", response) + if self._success_callback: + self._success_callback(response.data.to_key_tuple(), response.data.data) @staticmethod def _on_error(ex): diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index ec10e9cf..d18ebdc0 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -14,8 +14,10 @@ import influxdb_client from influxdb_client import WritePrecision, InfluxDBClient, CLIENT_VERSION +from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.write.point import Point from influxdb_client.client.write_api import WriteOptions, WriteApi, PointSettings +from influxdb_client.rest import ApiException class BatchingWriteTest(unittest.TestCase): @@ -263,7 +265,7 @@ def test_retry_disabled_max_retry_time(self): self._write_client.close() self._write_client = WriteApi(influxdb_client=self.influxdb_client, - write_options=WriteOptions(max_retry_time=0,batch_size=2, flush_interval=1_000)) + write_options=WriteOptions(max_retry_time=0, batch_size=2, flush_interval=1_000)) self._write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek level\\ water_level=1 1", @@ -349,7 +351,7 @@ def test_record_types(self): # Tuple _bytes3 = "h2o_feet,location=coyote_creek level\\ water_level=19 19".encode("utf-8") _bytes4 = "h2o_feet,location=coyote_creek level\\ water_level=20 20".encode("utf-8") - self._write_client.write("my-bucket", "my-org", (_bytes3, _bytes4, )) + self._write_client.write("my-bucket", "my-org", (_bytes3, _bytes4,)) time.sleep(1) @@ -573,6 +575,123 @@ class Car: self.assertEqual(1, len(_requests)) self.assertEqual("performance,engine=12V-BT,type=sport-cars speed=125.25", _requests[0].parsed_body) + def test_success_callback(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + class SuccessCallback(object): + def __init__(self): + self.conf = None + self.data = None + + def __call__(self, conf: (str, str, str), data: str): + self.conf = conf + self.data = data + + callback = SuccessCallback() + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=2), success_callback=callback) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek water_level=1 1", + "h2o_feet,location=coyote_creek water_level=2 2"]) + + time.sleep(1) + _requests = httpretty.httpretty.latest_requests + self.assertEqual(1, len(_requests)) + self.assertEqual("h2o_feet,location=coyote_creek water_level=1 1\n" + "h2o_feet,location=coyote_creek water_level=2 2", _requests[0].parsed_body) + + self.assertEqual(b"h2o_feet,location=coyote_creek water_level=1 1\n" + b"h2o_feet,location=coyote_creek water_level=2 2", callback.data) + self.assertEqual("my-bucket", callback.conf[0]) + self.assertEqual("my-org", callback.conf[1]) + self.assertEqual("ns", callback.conf[2]) + + def test_error_callback(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=400) + + class ErrorCallback(object): + def __init__(self): + self.conf = None + self.data = None + self.error = None + + def __call__(self, conf: (str, str, str), data: str, error: ApiException): + self.conf = conf + self.data = data + self.error = error + + callback = ErrorCallback() + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=2), error_callback=callback) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek water_level=1 x", + "h2o_feet,location=coyote_creek water_level=2 2"]) + + time.sleep(1) + _requests = httpretty.httpretty.latest_requests + self.assertEqual(1, len(_requests)) + self.assertEqual("h2o_feet,location=coyote_creek water_level=1 x\n" + "h2o_feet,location=coyote_creek water_level=2 2", _requests[0].parsed_body) + + self.assertEqual(b"h2o_feet,location=coyote_creek water_level=1 x\n" + b"h2o_feet,location=coyote_creek water_level=2 2", callback.data) + self.assertEqual("my-bucket", callback.conf[0]) + self.assertEqual("my-org", callback.conf[1]) + self.assertEqual("ns", callback.conf[2]) + self.assertIsNotNone(callback.error) + self.assertEqual(ApiException, type(callback.error)) + self.assertEqual(400, callback.error.status) + + def test_retry_callback(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429, adding_headers={'Retry-After': '1'}) + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=503, adding_headers={'Retry-After': '1'}) + + class RetryCallback(object): + def __init__(self): + self.count = 0 + self.conf = None + self.data = None + self.error = None + + def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError): + self.count += 1 + self.conf = conf + self.data = data + self.error = error + + callback = RetryCallback() + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=2), retry_callback=callback) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek water_level=1 1", + "h2o_feet,location=coyote_creek water_level=2 2"]) + + time.sleep(3) + _requests = httpretty.httpretty.latest_requests + self.assertEqual(3, len(_requests)) + self.assertEqual("h2o_feet,location=coyote_creek water_level=1 1\n" + "h2o_feet,location=coyote_creek water_level=2 2", _requests[0].parsed_body) + + self.assertEqual(2, callback.count) + self.assertEqual(b"h2o_feet,location=coyote_creek water_level=1 1\n" + b"h2o_feet,location=coyote_creek water_level=2 2", callback.data) + self.assertEqual("my-bucket", callback.conf[0]) + self.assertEqual("my-org", callback.conf[1]) + self.assertEqual("ns", callback.conf[2]) + self.assertIsNotNone(callback.error) + self.assertEqual(InfluxDBError, type(callback.error)) + self.assertEqual(429, callback.error.response.status) + if __name__ == '__main__': unittest.main() From 871c73871343be6a1f77c5bc188ea7ba0f5433b8 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Wed, 6 Oct 2021 15:04:00 +0200 Subject: [PATCH 02/11] docs: add an example how to use callbacks --- .circleci/config.yml | 1 + examples/README.md | 1 + examples/write_api_callbacks.py | 53 +++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+) create mode 100644 examples/write_api_callbacks.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 561d3bb3..96510792 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -124,6 +124,7 @@ jobs: python examples/monitoring_and_alerting.py python examples/buckets_management.py python examples/write_structured_data.py + python examples/write_api_callbacks.py check-sphinx: docker: - image: *default-python diff --git a/examples/README.md b/examples/README.md index a4ed5210..36191f53 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,6 +7,7 @@ - [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame - [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/) - [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB +- [write_api_callbacks.py](write_api_callbacks.py) - How to use `WriteApi`'s callbacks to notify about state of background batches - [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_) ## Queries diff --git a/examples/write_api_callbacks.py b/examples/write_api_callbacks.py new file mode 100644 index 00000000..49c36616 --- /dev/null +++ b/examples/write_api_callbacks.py @@ -0,0 +1,53 @@ +""" +How to use WriteApi's callbacks to notify about state of background batches. +""" + +from influxdb_client import InfluxDBClient, Point + +""" +Configuration +""" +url = 'http://localhost:8086' +token = 'my-token' +org = 'my-org' +bucket = 'my-bucket' + +""" +Data +""" +points = [Point("my-temperature").tag("location", "Prague").field("temperature", 25.3), + Point("my-temperature").tag("location", "New York").field("temperature", 18.4)] + + +class BatchingCallback(object): + """ + Define callback for success, error and retry state + """ + def __init__(self): + pass + + def success(self, conf: (str, str, str), data: str): + """Successfully writen batch.""" + print(f"Written batch: {conf}, data: {data}") + + def error(self, conf: (str, str, str), data: str, exception: Exception): + """Unsuccessfully writen batch.""" + print(f"Cannot write batch: {conf}, data: {data} due: {exception}") + + def retry(self, conf: (str, str, str), data: str, exception: Exception): + """Retryable error.""" + print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + + +callback = BatchingCallback() +with InfluxDBClient(url=url, token=token, org=org, debug=False) as client: + """ + Use batching API + """ + with client.write_api(success_callback=callback.success, + error_callback=callback.error, + retry_callback=callback.retry) as write_api: + write_api.write(bucket=bucket, record=points) + print() + print("Wait to finishing ingesting...") + print() From 1a17bfeff2b134bb048fcbef1e0964345004e422 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Wed, 6 Oct 2021 15:04:18 +0200 Subject: [PATCH 03/11] docs: add an example how to use callbacks --- examples/write_api_callbacks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/write_api_callbacks.py b/examples/write_api_callbacks.py index 49c36616..52c1c111 100644 --- a/examples/write_api_callbacks.py +++ b/examples/write_api_callbacks.py @@ -40,7 +40,7 @@ def retry(self, conf: (str, str, str), data: str, exception: Exception): callback = BatchingCallback() -with InfluxDBClient(url=url, token=token, org=org, debug=False) as client: +with InfluxDBClient(url=url, token=token, org=org) as client: """ Use batching API """ From 0ac20c1985f6ebb7d020aa0d6503ddadeaab9e53 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Wed, 6 Oct 2021 15:15:50 +0200 Subject: [PATCH 04/11] docs: how to initialize batching client --- influxdb_client/client/influxdb_client.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index f70f71d3..e239f97d 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -262,6 +262,23 @@ def write_api(self, write_options=WriteOptions(), point_settings=PointSettings() with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS) + If you would like to use a **background batching**, you have to configure client like this: + + .. code-block:: python + + from influxdb_client import InfluxDBClient + + # Initialize background batching instance of WriteApi + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + with client.write_api() as write_api: + pass + + There is also possibility to use callbacks to notify about state of background batches: + + .. code-block:: python + + TBD + :param write_options: Write API configuration :param point_settings: settings to store default tags :key success_callback: The callable ``callback`` to run after successfully writen a batch. From 95e29798e4b568294dc4e2e3d45b718f975ab299 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 7 Oct 2021 07:54:16 +0200 Subject: [PATCH 05/11] docs: add an example how to use callbacks --- examples/write_api_callbacks.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/examples/write_api_callbacks.py b/examples/write_api_callbacks.py index 52c1c111..db634177 100644 --- a/examples/write_api_callbacks.py +++ b/examples/write_api_callbacks.py @@ -20,11 +20,6 @@ class BatchingCallback(object): - """ - Define callback for success, error and retry state - """ - def __init__(self): - pass def success(self, conf: (str, str, str), data: str): """Successfully writen batch.""" From 01175f19e65d02330729651b23c19f8a3340e1c4 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 7 Oct 2021 07:57:51 +0200 Subject: [PATCH 06/11] docs: add an example how to use callbacks --- influxdb_client/client/influxdb_client.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index e239f97d..f0d2ca1c 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -277,7 +277,27 @@ def write_api(self, write_options=WriteOptions(), point_settings=PointSettings() .. code-block:: python - TBD + from influxdb_client import InfluxDBClient + + + class BatchingCallback(object): + + def success(self, conf: (str, str, str), data: str): + print(f"Written batch: {conf}, data: {data}") + + def error(self, conf: (str, str, str), data: str, exception: Exception): + print(f"Cannot write batch: {conf}, data: {data} due: {exception}") + + def retry(self, conf: (str, str, str), data: str, exception: Exception): + print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + callback = BatchingCallback() + with client.write_api(success_callback=callback.success, + error_callback=callback.error, + retry_callback=callback.retry) as write_api: + pass :param write_options: Write API configuration :param point_settings: settings to store default tags From dcd43dd30ccba98c9a02966f5e5ac9f8c9277a74 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 7 Oct 2021 08:24:35 +0200 Subject: [PATCH 07/11] feat: `InfluxDBError` is a base exception for all produced exceptions --- influxdb_client/client/__init__.py | 5 +++-- influxdb_client/client/write/__init__.py | 5 +++-- influxdb_client/rest.py | 5 ++++- influxdb_client/service/authorizations_service.py | 4 +--- influxdb_client/service/buckets_service.py | 4 +--- influxdb_client/service/cells_service.py | 4 +--- influxdb_client/service/checks_service.py | 4 +--- influxdb_client/service/dashboards_service.py | 4 +--- influxdb_client/service/dbr_ps_service.py | 4 +--- influxdb_client/service/default_service.py | 4 +--- influxdb_client/service/delete_service.py | 4 +--- influxdb_client/service/health_service.py | 4 +--- influxdb_client/service/labels_service.py | 4 +--- influxdb_client/service/notification_endpoints_service.py | 4 +--- influxdb_client/service/notification_rules_service.py | 4 +--- influxdb_client/service/organizations_service.py | 4 +--- influxdb_client/service/query_service.py | 4 +--- influxdb_client/service/ready_service.py | 4 +--- influxdb_client/service/rules_service.py | 4 +--- influxdb_client/service/scraper_targets_service.py | 4 +--- influxdb_client/service/secrets_service.py | 4 +--- influxdb_client/service/setup_service.py | 4 +--- influxdb_client/service/sources_service.py | 4 +--- influxdb_client/service/tasks_service.py | 4 +--- influxdb_client/service/telegrafs_service.py | 4 +--- influxdb_client/service/templates_service.py | 4 +--- influxdb_client/service/users_service.py | 4 +--- influxdb_client/service/variables_service.py | 4 +--- influxdb_client/service/views_service.py | 4 +--- influxdb_client/service/write_service.py | 4 +--- tests/__init__.py | 5 +++-- tests/test_QueryApi.py | 8 ++++---- tests/test_TasksApi.py | 2 +- 33 files changed, 45 insertions(+), 93 deletions(-) diff --git a/influxdb_client/client/__init__.py b/influxdb_client/client/__init__.py index 1545ae8c..3ce3d1f6 100644 --- a/influxdb_client/client/__init__.py +++ b/influxdb_client/client/__init__.py @@ -1,11 +1,11 @@ # flake8: noqa """ -Influx API Service. +Influx OSS API Service. No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501 -OpenAPI spec version: 0.1.0 +OpenAPI spec version: 2.0.0 Generated by: https://openapi-generator.tech """ @@ -19,6 +19,7 @@ from influxdb_client.service.checks_service import ChecksService from influxdb_client.service.dbr_ps_service import DBRPsService from influxdb_client.service.dashboards_service import DashboardsService +from influxdb_client.service.delete_service import DeleteService from influxdb_client.service.health_service import HealthService from influxdb_client.service.labels_service import LabelsService from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService diff --git a/influxdb_client/client/write/__init__.py b/influxdb_client/client/write/__init__.py index 1545ae8c..3ce3d1f6 100644 --- a/influxdb_client/client/write/__init__.py +++ b/influxdb_client/client/write/__init__.py @@ -1,11 +1,11 @@ # flake8: noqa """ -Influx API Service. +Influx OSS API Service. No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501 -OpenAPI spec version: 0.1.0 +OpenAPI spec version: 2.0.0 Generated by: https://openapi-generator.tech """ @@ -19,6 +19,7 @@ from influxdb_client.service.checks_service import ChecksService from influxdb_client.service.dbr_ps_service import DBRPsService from influxdb_client.service.dashboards_service import DashboardsService +from influxdb_client.service.delete_service import DeleteService from influxdb_client.service.health_service import HealthService from influxdb_client.service.labels_service import LabelsService from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService diff --git a/influxdb_client/rest.py b/influxdb_client/rest.py index 49a35921..8fc5e53a 100644 --- a/influxdb_client/rest.py +++ b/influxdb_client/rest.py @@ -22,6 +22,8 @@ import six from six.moves.urllib.parse import urlencode +from influxdb_client.client.exceptions import InfluxDBError + try: import urllib3 except ImportError: @@ -346,7 +348,7 @@ def __setstate__(self, state): self.__init__(self.configuration, self.pools_size, self.maxsize, self.retries) -class ApiException(Exception): +class ApiException(InfluxDBError): """NOTE: This class is auto generated by OpenAPI Generator. Ref: https://openapi-generator.tech @@ -355,6 +357,7 @@ class ApiException(Exception): def __init__(self, status=None, reason=None, http_resp=None): """Initialize with HTTP response.""" + super().__init__(response=http_resp) if http_resp: self.status = http_resp.status self.reason = http_resp.reason diff --git a/influxdb_client/service/authorizations_service.py b/influxdb_client/service/authorizations_service.py index 502f6ff6..61a78aa2 100644 --- a/influxdb_client/service/authorizations_service.py +++ b/influxdb_client/service/authorizations_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class AuthorizationsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class AuthorizationsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """AuthorizationsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_authorizations_id(self, auth_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/buckets_service.py b/influxdb_client/service/buckets_service.py index c0bfda4f..0ae373a6 100644 --- a/influxdb_client/service/buckets_service.py +++ b/influxdb_client/service/buckets_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class BucketsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class BucketsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """BucketsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_buckets_id(self, bucket_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/cells_service.py b/influxdb_client/service/cells_service.py index ee9622d0..21c9741a 100644 --- a/influxdb_client/service/cells_service.py +++ b/influxdb_client/service/cells_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class CellsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class CellsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """CellsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_dashboards_id_cells_id(self, dashboard_id, cell_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/checks_service.py b/influxdb_client/service/checks_service.py index 717c4263..9525993d 100644 --- a/influxdb_client/service/checks_service.py +++ b/influxdb_client/service/checks_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class ChecksService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class ChecksService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """ChecksService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def create_check(self, post_check, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/dashboards_service.py b/influxdb_client/service/dashboards_service.py index 04ce379a..47fa7e96 100644 --- a/influxdb_client/service/dashboards_service.py +++ b/influxdb_client/service/dashboards_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class DashboardsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class DashboardsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """DashboardsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_dashboards_id(self, dashboard_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/dbr_ps_service.py b/influxdb_client/service/dbr_ps_service.py index 83290744..bc58883f 100644 --- a/influxdb_client/service/dbr_ps_service.py +++ b/influxdb_client/service/dbr_ps_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class DBRPsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class DBRPsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """DBRPsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_dbrpid(self, org_id, dbrp_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/default_service.py b/influxdb_client/service/default_service.py index 4a199216..d79a76f9 100644 --- a/influxdb_client/service/default_service.py +++ b/influxdb_client/service/default_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class DefaultService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class DefaultService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """DefaultService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_routes(self, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/delete_service.py b/influxdb_client/service/delete_service.py index 7ff51de7..4adadf13 100644 --- a/influxdb_client/service/delete_service.py +++ b/influxdb_client/service/delete_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class DeleteService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class DeleteService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """DeleteService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def post_delete(self, delete_predicate_request, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/health_service.py b/influxdb_client/service/health_service.py index 3843366b..0ffc48a4 100644 --- a/influxdb_client/service/health_service.py +++ b/influxdb_client/service/health_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class HealthService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class HealthService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """HealthService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_health(self, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/labels_service.py b/influxdb_client/service/labels_service.py index 6453a4bc..7319645e 100644 --- a/influxdb_client/service/labels_service.py +++ b/influxdb_client/service/labels_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class LabelsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class LabelsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """LabelsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_labels_id(self, label_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/notification_endpoints_service.py b/influxdb_client/service/notification_endpoints_service.py index 8594c31d..5d498f2f 100644 --- a/influxdb_client/service/notification_endpoints_service.py +++ b/influxdb_client/service/notification_endpoints_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class NotificationEndpointsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class NotificationEndpointsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """NotificationEndpointsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def create_notification_endpoint(self, post_notification_endpoint, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/notification_rules_service.py b/influxdb_client/service/notification_rules_service.py index 3ae278f1..6b543643 100644 --- a/influxdb_client/service/notification_rules_service.py +++ b/influxdb_client/service/notification_rules_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class NotificationRulesService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class NotificationRulesService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """NotificationRulesService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def create_notification_rule(self, post_notification_rule, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/organizations_service.py b/influxdb_client/service/organizations_service.py index c07cd340..e44c06f7 100644 --- a/influxdb_client/service/organizations_service.py +++ b/influxdb_client/service/organizations_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class OrganizationsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class OrganizationsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """OrganizationsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_orgs_id(self, org_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/query_service.py b/influxdb_client/service/query_service.py index 45625471..8638a595 100644 --- a/influxdb_client/service/query_service.py +++ b/influxdb_client/service/query_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class QueryService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class QueryService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """QueryService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_query_suggestions(self, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/ready_service.py b/influxdb_client/service/ready_service.py index 81edeb37..aa2bb21e 100644 --- a/influxdb_client/service/ready_service.py +++ b/influxdb_client/service/ready_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class ReadyService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class ReadyService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """ReadyService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_ready(self, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/rules_service.py b/influxdb_client/service/rules_service.py index 0ae274d6..8b2b95a1 100644 --- a/influxdb_client/service/rules_service.py +++ b/influxdb_client/service/rules_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class RulesService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class RulesService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """RulesService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_notification_rules_id_query(self, rule_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/scraper_targets_service.py b/influxdb_client/service/scraper_targets_service.py index 39bfbd3c..be35a7fa 100644 --- a/influxdb_client/service/scraper_targets_service.py +++ b/influxdb_client/service/scraper_targets_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class ScraperTargetsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class ScraperTargetsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """ScraperTargetsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_scrapers_id(self, scraper_target_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/secrets_service.py b/influxdb_client/service/secrets_service.py index 26df30fb..21fe6e05 100644 --- a/influxdb_client/service/secrets_service.py +++ b/influxdb_client/service/secrets_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class SecretsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class SecretsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """SecretsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_orgs_id_secrets(self, org_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/setup_service.py b/influxdb_client/service/setup_service.py index f28b913f..190cd841 100644 --- a/influxdb_client/service/setup_service.py +++ b/influxdb_client/service/setup_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class SetupService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class SetupService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """SetupService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_setup(self, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/sources_service.py b/influxdb_client/service/sources_service.py index e83eb013..acf2fa39 100644 --- a/influxdb_client/service/sources_service.py +++ b/influxdb_client/service/sources_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class SourcesService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class SourcesService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """SourcesService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_sources_id(self, source_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/tasks_service.py b/influxdb_client/service/tasks_service.py index 309756c2..a6a9e95a 100644 --- a/influxdb_client/service/tasks_service.py +++ b/influxdb_client/service/tasks_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class TasksService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class TasksService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """TasksService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_tasks_id(self, task_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/telegrafs_service.py b/influxdb_client/service/telegrafs_service.py index 5fca177a..64e5c0c2 100644 --- a/influxdb_client/service/telegrafs_service.py +++ b/influxdb_client/service/telegrafs_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class TelegrafsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class TelegrafsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """TelegrafsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_telegrafs_id(self, telegraf_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/templates_service.py b/influxdb_client/service/templates_service.py index e5583bc3..390fde8e 100644 --- a/influxdb_client/service/templates_service.py +++ b/influxdb_client/service/templates_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class TemplatesService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class TemplatesService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """TemplatesService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_documents_templates_id(self, template_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/users_service.py b/influxdb_client/service/users_service.py index 1dd6b1a6..d80861c0 100644 --- a/influxdb_client/service/users_service.py +++ b/influxdb_client/service/users_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class UsersService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class UsersService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """UsersService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_users_id(self, user_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/variables_service.py b/influxdb_client/service/variables_service.py index f1705a32..999e4c9b 100644 --- a/influxdb_client/service/variables_service.py +++ b/influxdb_client/service/variables_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class VariablesService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class VariablesService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """VariablesService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_variables_id(self, variable_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/views_service.py b/influxdb_client/service/views_service.py index 47a2ee5e..02f1c61b 100644 --- a/influxdb_client/service/views_service.py +++ b/influxdb_client/service/views_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class ViewsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class ViewsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """ViewsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_dashboards_id_cells_id_view(self, dashboard_id, cell_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/write_service.py b/influxdb_client/service/write_service.py index eca85b09..68baf36c 100644 --- a/influxdb_client/service/write_service.py +++ b/influxdb_client/service/write_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class WriteService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class WriteService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """WriteService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def post_write(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 diff --git a/tests/__init__.py b/tests/__init__.py index 1545ae8c..3ce3d1f6 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,11 +1,11 @@ # flake8: noqa """ -Influx API Service. +Influx OSS API Service. No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501 -OpenAPI spec version: 0.1.0 +OpenAPI spec version: 2.0.0 Generated by: https://openapi-generator.tech """ @@ -19,6 +19,7 @@ from influxdb_client.service.checks_service import ChecksService from influxdb_client.service.dbr_ps_service import DBRPsService from influxdb_client.service.dashboards_service import DashboardsService +from influxdb_client.service.delete_service import DeleteService from influxdb_client.service.health_service import HealthService from influxdb_client.service.labels_service import LabelsService from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService diff --git a/tests/test_QueryApi.py b/tests/test_QueryApi.py index c3127fd4..29375203 100644 --- a/tests/test_QueryApi.py +++ b/tests/test_QueryApi.py @@ -479,11 +479,11 @@ def test_profiler_mock(self): self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False) query_api = self.client.query_api(query_options=QueryOptions(profilers=["query", "operator"])) tables = query_api.query(query=query) - self.assertEquals(len(tables), 1) - self.assertEquals(len(tables[0].columns), 10) - self.assertEquals(len(tables[0].records), 6) + self.assertEqual(len(tables), 1) + self.assertEqual(len(tables[0].columns), 10) + self.assertEqual(len(tables[0].records), 6) - self.assertEquals(tables[0].records[5].values, + self.assertEqual(tables[0].records[5].values, {'result': '_result', 'table': 0, '_start': datetime.datetime(2021, 5, 24, 8, 40, 44, 785000, tzinfo=tzutc()), '_stop': datetime.datetime(2021, 5, 24, 8, 45, 44, 785000, tzinfo=tzutc()), diff --git a/tests/test_TasksApi.py b/tests/test_TasksApi.py index 9511f338..40cebd57 100644 --- a/tests/test_TasksApi.py +++ b/tests/test_TasksApi.py @@ -165,7 +165,7 @@ def test_find_task_by_user_id(self): self.organization.id) tasks = self.tasks_api.find_tasks_by_user(task_user_id=task_user.id) print(tasks) - self.assertEquals(len(tasks), 1) + self.assertEqual(len(tasks), 1) def test_delete_task(self): task = self.tasks_api.create_task_cron(self.generate_name("it_task"), TASK_FLUX, "0 2 * * *", From 9b5ac687243135ca05ecb24fcec1e7688e36fc06 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 7 Oct 2021 09:03:28 +0200 Subject: [PATCH 08/11] docs: how to handle errors --- README.rst | 58 +++++++++++++++++++++++++++++++++++++++++++++++++- docs/usage.rst | 6 ++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 73580246..1bc2e937 100644 --- a/README.rst +++ b/README.rst @@ -83,6 +83,7 @@ InfluxDB 2.0 client features - `Proxy configuration`_ - `Nanosecond precision`_ - `Delete data`_ + - `Handling Errors`_ Installation ------------ @@ -1152,8 +1153,61 @@ The following forward compatible APIs are available: For detail info see `InfluxDB 1.8 example `_. +Handling Errors +^^^^^^^^^^^^^^^ +.. marker-handling-errors-start + +Errors happen and it's important that your code is prepared for them. All client related exceptions are delivered from +``InfluxDBError``. If the exception cannot be recovered in the client it is returned to the application. +These exceptions are left for the developer to handle. + +Almost all APIs directly return unrecoverable exceptions to be handled this way: + +.. code-block:: python + + from influxdb_client import InfluxDBClient + from influxdb_client.client.exceptions import InfluxDBError + from influxdb_client.client.write_api import SYNCHRONOUS + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + try: + client.write_api(write_options=SYNCHRONOUS).write("my-bucket", record="mem,tag=a value=86") + except InfluxDBError as e: + if e.response.status == 401: + raise Exception(f"Insufficient write permissions to 'my-bucket'.") from e + raise + + +The only exception is **batching** ``WriteAPI`` (for more info see `Batching`_). where you need to register custom callbacks to handle batch events. +This is because this API runs in the ``background`` in a ``separate`` thread and isn't possible to directly +return underlying exceptions. + +.. code-block:: python + + from influxdb_client import InfluxDBClient + + + class BatchingCallback(object): + + def success(self, conf: (str, str, str), data: str): + print(f"Written batch: {conf}, data: {data}") + + def error(self, conf: (str, str, str), data: str, exception: Exception): + print(f"Cannot write batch: {conf}, data: {data} due: {exception}") + + def retry(self, conf: (str, str, str), data: str, exception: Exception): + print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + callback = BatchingCallback() + with client.write_api(success_callback=callback.success, + error_callback=callback.error, + retry_callback=callback.retry) as write_api: + pass + HTTP Retry Strategy -^^^^^^^^^^^^^^^^^^^ +""""""""""""""""""" By default the client uses a retry strategy only for batching writes (for more info see `Batching`_). For other HTTP requests there is no one retry strategy, but it could be configured by ``retries`` parameter of ``InfluxDBClient``. @@ -1169,6 +1223,8 @@ For more info about how configure HTTP retry see details in `urllib3 documentati retries = Retry(connect=5, read=2, redirect=5) client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", retries=retries) +.. marker-handling-errors-end + Nanosecond precision ^^^^^^^^^^^^^^^^^^^^ .. marker-nanosecond-start diff --git a/docs/usage.rst b/docs/usage.rst index 9388dc68..1fae61fa 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -46,6 +46,12 @@ Nanosecond precision :start-after: marker-nanosecond-start :end-before: marker-nanosecond-end +Handling Errors +^^^^^^^^^^^^^^^ +.. include:: ../README.rst + :start-after: marker-handling-errors-start + :end-before: marker-handling-errors-end + Debugging ^^^^^^^^^ From 922dc0929b41515c5e231feaea32db740bba59b2 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 7 Oct 2021 09:10:54 +0200 Subject: [PATCH 09/11] docs: update CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ca628bc6..4ebe951b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,11 @@ ### Features 1. [#330](https://github.com/influxdata/influxdb-client-python/pull/330): Add support for write structured data - `NamedTuple`, `Data Classes` 1. [#335](https://github.com/influxdata/influxdb-client-python/pull/335): Add support for custom precision for index specified as number [DataFrame] +1. [#341](https://github.com/influxdata/influxdb-client-python/pull/341): Add support for handling batch events ### Documentation 1. [#331](https://github.com/influxdata/influxdb-client-python/pull/331): Add [Migration Guide](MIGRATION_GUIDE.rst) +1. [#341](https://github.com/influxdata/influxdb-client-python/pull/341): How to handle client errors ## 1.21.0 [2021-09-17] From 239bce1ec9766d9c3cf6f7ccf213714c6de85dce Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 7 Oct 2021 09:18:53 +0200 Subject: [PATCH 10/11] chore: use InfluxDBError instead ApiException --- README.rst | 5 +++-- examples/README.md | 2 +- examples/write_api_callbacks.py | 5 +++-- influxdb_client/client/influxdb_client.py | 5 +++-- tests/test_WriteApiBatching.py | 8 ++++---- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/README.rst b/README.rst index 1bc2e937..124056e6 100644 --- a/README.rst +++ b/README.rst @@ -1185,6 +1185,7 @@ return underlying exceptions. .. code-block:: python from influxdb_client import InfluxDBClient + from influxdb_client.client.exceptions import InfluxDBError class BatchingCallback(object): @@ -1192,10 +1193,10 @@ return underlying exceptions. def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") - def error(self, conf: (str, str, str), data: str, exception: Exception): + def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") - def retry(self, conf: (str, str, str), data: str, exception: Exception): + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") diff --git a/examples/README.md b/examples/README.md index 36191f53..08620e4b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,7 +7,7 @@ - [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame - [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/) - [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB -- [write_api_callbacks.py](write_api_callbacks.py) - How to use `WriteApi`'s callbacks to notify about state of background batches +- [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events - [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_) ## Queries diff --git a/examples/write_api_callbacks.py b/examples/write_api_callbacks.py index db634177..560fb3c1 100644 --- a/examples/write_api_callbacks.py +++ b/examples/write_api_callbacks.py @@ -3,6 +3,7 @@ """ from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.exceptions import InfluxDBError """ Configuration @@ -25,11 +26,11 @@ def success(self, conf: (str, str, str), data: str): """Successfully writen batch.""" print(f"Written batch: {conf}, data: {data}") - def error(self, conf: (str, str, str), data: str, exception: Exception): + def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): """Unsuccessfully writen batch.""" print(f"Cannot write batch: {conf}, data: {data} due: {exception}") - def retry(self, conf: (str, str, str), data: str, exception: Exception): + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): """Retryable error.""" print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index f0d2ca1c..bacc88dd 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -278,6 +278,7 @@ def write_api(self, write_options=WriteOptions(), point_settings=PointSettings() .. code-block:: python from influxdb_client import InfluxDBClient + from influxdb_client.client.exceptions import InfluxDBError class BatchingCallback(object): @@ -285,10 +286,10 @@ class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") - def error(self, conf: (str, str, str), data: str, exception: Exception): + def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") - def retry(self, conf: (str, str, str), data: str, exception: Exception): + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index d18ebdc0..8d567fbf 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -618,7 +618,7 @@ def __init__(self): self.data = None self.error = None - def __call__(self, conf: (str, str, str), data: str, error: ApiException): + def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError): self.conf = conf self.data = data self.error = error @@ -645,8 +645,8 @@ def __call__(self, conf: (str, str, str), data: str, error: ApiException): self.assertEqual("my-org", callback.conf[1]) self.assertEqual("ns", callback.conf[2]) self.assertIsNotNone(callback.error) - self.assertEqual(ApiException, type(callback.error)) - self.assertEqual(400, callback.error.status) + self.assertIsInstance(callback.error, InfluxDBError) + self.assertEqual(400, callback.error.response.status) def test_retry_callback(self): httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) @@ -689,7 +689,7 @@ def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError): self.assertEqual("my-org", callback.conf[1]) self.assertEqual("ns", callback.conf[2]) self.assertIsNotNone(callback.error) - self.assertEqual(InfluxDBError, type(callback.error)) + self.assertIsInstance(callback.error, InfluxDBError) self.assertEqual(429, callback.error.response.status) From ffd984402693b3a34e830d23503295427bb4a39e Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 12 Oct 2021 10:40:36 +0200 Subject: [PATCH 11/11] chore: optimize imports --- tests/test_WriteApiBatching.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index 8d567fbf..e9e91034 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -17,7 +17,6 @@ from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.write.point import Point from influxdb_client.client.write_api import WriteOptions, WriteApi, PointSettings -from influxdb_client.rest import ApiException class BatchingWriteTest(unittest.TestCase):