diff --git a/.circleci/config.yml b/.circleci/config.yml index 561d3bb3..96510792 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -124,6 +124,7 @@ jobs: python examples/monitoring_and_alerting.py python examples/buckets_management.py python examples/write_structured_data.py + python examples/write_api_callbacks.py check-sphinx: docker: - image: *default-python diff --git a/CHANGELOG.md b/CHANGELOG.md index ca628bc6..4ebe951b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,11 @@ ### Features 1. [#330](https://github.com/influxdata/influxdb-client-python/pull/330): Add support for write structured data - `NamedTuple`, `Data Classes` 1. [#335](https://github.com/influxdata/influxdb-client-python/pull/335): Add support for custom precision for index specified as number [DataFrame] +1. [#341](https://github.com/influxdata/influxdb-client-python/pull/341): Add support for handling batch events ### Documentation 1. [#331](https://github.com/influxdata/influxdb-client-python/pull/331): Add [Migration Guide](MIGRATION_GUIDE.rst) +1. [#341](https://github.com/influxdata/influxdb-client-python/pull/341): How to handle client errors ## 1.21.0 [2021-09-17] diff --git a/README.rst b/README.rst index 73580246..124056e6 100644 --- a/README.rst +++ b/README.rst @@ -83,6 +83,7 @@ InfluxDB 2.0 client features - `Proxy configuration`_ - `Nanosecond precision`_ - `Delete data`_ + - `Handling Errors`_ Installation ------------ @@ -1152,8 +1153,62 @@ The following forward compatible APIs are available: For detail info see `InfluxDB 1.8 example `_. +Handling Errors +^^^^^^^^^^^^^^^ +.. marker-handling-errors-start + +Errors happen and it's important that your code is prepared for them. All client related exceptions are delivered from +``InfluxDBError``. If the exception cannot be recovered in the client it is returned to the application. +These exceptions are left for the developer to handle. + +Almost all APIs directly return unrecoverable exceptions to be handled this way: + +.. code-block:: python + + from influxdb_client import InfluxDBClient + from influxdb_client.client.exceptions import InfluxDBError + from influxdb_client.client.write_api import SYNCHRONOUS + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + try: + client.write_api(write_options=SYNCHRONOUS).write("my-bucket", record="mem,tag=a value=86") + except InfluxDBError as e: + if e.response.status == 401: + raise Exception(f"Insufficient write permissions to 'my-bucket'.") from e + raise + + +The only exception is **batching** ``WriteAPI`` (for more info see `Batching`_). where you need to register custom callbacks to handle batch events. +This is because this API runs in the ``background`` in a ``separate`` thread and isn't possible to directly +return underlying exceptions. + +.. code-block:: python + + from influxdb_client import InfluxDBClient + from influxdb_client.client.exceptions import InfluxDBError + + + class BatchingCallback(object): + + def success(self, conf: (str, str, str), data: str): + print(f"Written batch: {conf}, data: {data}") + + def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"Cannot write batch: {conf}, data: {data} due: {exception}") + + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + callback = BatchingCallback() + with client.write_api(success_callback=callback.success, + error_callback=callback.error, + retry_callback=callback.retry) as write_api: + pass + HTTP Retry Strategy -^^^^^^^^^^^^^^^^^^^ +""""""""""""""""""" By default the client uses a retry strategy only for batching writes (for more info see `Batching`_). For other HTTP requests there is no one retry strategy, but it could be configured by ``retries`` parameter of ``InfluxDBClient``. @@ -1169,6 +1224,8 @@ For more info about how configure HTTP retry see details in `urllib3 documentati retries = Retry(connect=5, read=2, redirect=5) client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", retries=retries) +.. marker-handling-errors-end + Nanosecond precision ^^^^^^^^^^^^^^^^^^^^ .. marker-nanosecond-start diff --git a/docs/usage.rst b/docs/usage.rst index 9388dc68..1fae61fa 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -46,6 +46,12 @@ Nanosecond precision :start-after: marker-nanosecond-start :end-before: marker-nanosecond-end +Handling Errors +^^^^^^^^^^^^^^^ +.. include:: ../README.rst + :start-after: marker-handling-errors-start + :end-before: marker-handling-errors-end + Debugging ^^^^^^^^^ diff --git a/examples/README.md b/examples/README.md index a4ed5210..08620e4b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,6 +7,7 @@ - [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame - [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/) - [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB +- [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events - [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_) ## Queries diff --git a/examples/write_api_callbacks.py b/examples/write_api_callbacks.py new file mode 100644 index 00000000..560fb3c1 --- /dev/null +++ b/examples/write_api_callbacks.py @@ -0,0 +1,49 @@ +""" +How to use WriteApi's callbacks to notify about state of background batches. +""" + +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.exceptions import InfluxDBError + +""" +Configuration +""" +url = 'http://localhost:8086' +token = 'my-token' +org = 'my-org' +bucket = 'my-bucket' + +""" +Data +""" +points = [Point("my-temperature").tag("location", "Prague").field("temperature", 25.3), + Point("my-temperature").tag("location", "New York").field("temperature", 18.4)] + + +class BatchingCallback(object): + + def success(self, conf: (str, str, str), data: str): + """Successfully writen batch.""" + print(f"Written batch: {conf}, data: {data}") + + def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): + """Unsuccessfully writen batch.""" + print(f"Cannot write batch: {conf}, data: {data} due: {exception}") + + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): + """Retryable error.""" + print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + + +callback = BatchingCallback() +with InfluxDBClient(url=url, token=token, org=org) as client: + """ + Use batching API + """ + with client.write_api(success_callback=callback.success, + error_callback=callback.error, + retry_callback=callback.retry) as write_api: + write_api.write(bucket=bucket, record=points) + print() + print("Wait to finishing ingesting...") + print() diff --git a/influxdb_client/client/__init__.py b/influxdb_client/client/__init__.py index 1545ae8c..3ce3d1f6 100644 --- a/influxdb_client/client/__init__.py +++ b/influxdb_client/client/__init__.py @@ -1,11 +1,11 @@ # flake8: noqa """ -Influx API Service. +Influx OSS API Service. No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501 -OpenAPI spec version: 0.1.0 +OpenAPI spec version: 2.0.0 Generated by: https://openapi-generator.tech """ @@ -19,6 +19,7 @@ from influxdb_client.service.checks_service import ChecksService from influxdb_client.service.dbr_ps_service import DBRPsService from influxdb_client.service.dashboards_service import DashboardsService +from influxdb_client.service.delete_service import DeleteService from influxdb_client.service.health_service import HealthService from influxdb_client.service.labels_service import LabelsService from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index e0c471e2..bacc88dd 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -247,15 +247,88 @@ def from_env_properties(cls, debug=None, enable_gzip=False): connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic), profilers=profilers) - def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi: + def write_api(self, write_options=WriteOptions(), point_settings=PointSettings(), **kwargs) -> WriteApi: """ Create a Write API instance. - :param point_settings: - :param write_options: write api configuration + Example: + .. code-block:: python + + from influxdb_client import InfluxDBClient + from influxdb_client.client.write_api import SYNCHRONOUS + + + # Initialize SYNCHRONOUS instance of WriteApi + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + write_api = client.write_api(write_options=SYNCHRONOUS) + + If you would like to use a **background batching**, you have to configure client like this: + + .. code-block:: python + + from influxdb_client import InfluxDBClient + + # Initialize background batching instance of WriteApi + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + with client.write_api() as write_api: + pass + + There is also possibility to use callbacks to notify about state of background batches: + + .. code-block:: python + + from influxdb_client import InfluxDBClient + from influxdb_client.client.exceptions import InfluxDBError + + + class BatchingCallback(object): + + def success(self, conf: (str, str, str), data: str): + print(f"Written batch: {conf}, data: {data}") + + def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"Cannot write batch: {conf}, data: {data} due: {exception}") + + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + callback = BatchingCallback() + with client.write_api(success_callback=callback.success, + error_callback=callback.error, + retry_callback=callback.retry) as write_api: + pass + + :param write_options: Write API configuration + :param point_settings: settings to store default tags + :key success_callback: The callable ``callback`` to run after successfully writen a batch. + + The callable must accept two arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + + **[batching mode]** + + :key error_callback: The callable ``callback`` to run after unsuccessfully writen a batch. + + The callable must accept three arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + - `Exception`: an occurred error + + **[batching mode]** + :key retry_callback: The callable ``callback`` to run after retryable error occurred. + + The callable must accept three arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + - `Exception`: an retryable error + + **[batching mode]** :return: write api instance """ - return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings) + return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings, **kwargs) def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi: """ diff --git a/influxdb_client/client/write/__init__.py b/influxdb_client/client/write/__init__.py index 1545ae8c..3ce3d1f6 100644 --- a/influxdb_client/client/write/__init__.py +++ b/influxdb_client/client/write/__init__.py @@ -1,11 +1,11 @@ # flake8: noqa """ -Influx API Service. +Influx OSS API Service. No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501 -OpenAPI spec version: 0.1.0 +OpenAPI spec version: 2.0.0 Generated by: https://openapi-generator.tech """ @@ -19,6 +19,7 @@ from influxdb_client.service.checks_service import ChecksService from influxdb_client.service.dbr_ps_service import DBRPsService from influxdb_client.service.dashboards_service import DashboardsService +from influxdb_client.service.delete_service import DeleteService from influxdb_client.service.health_service import HealthService from influxdb_client.service.labels_service import LabelsService from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService diff --git a/influxdb_client/client/write/retry.py b/influxdb_client/client/write/retry.py index 136b8c1d..50fdaa46 100644 --- a/influxdb_client/client/write/retry.py +++ b/influxdb_client/client/write/retry.py @@ -4,6 +4,7 @@ from datetime import datetime, timedelta from itertools import takewhile from random import random +from typing import Callable from urllib3 import Retry from urllib3.exceptions import MaxRetryError, ResponseError @@ -17,25 +18,32 @@ class WritesRetry(Retry): """ Writes retry configuration. - :param int jitter_interval: random milliseconds when retrying writes - :param num max_retry_delay: maximum delay when retrying write in seconds - :param int max_retry_time: maximum total retry timeout in seconds, attempt after this timout throws MaxRetryError - :param int total: maximum number of retries - :param num retry_interval: initial first retry delay range in seconds - :param int exponential_base: base for the exponential retry delay, - The next delay is computed as random value between range - `retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts) - - Example: for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5 - retry delays are random distributed values within the ranges of - [5-10, 10-20, 20-40, 40-80, 80-125] + `retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts) + Example: + for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5 + retry delays are random distributed values within the ranges of + [5-10, 10-20, 20-40, 40-80, 80-125] """ def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, max_retry_time=180, total=5, - retry_interval=5, **kw): - """Initialize defaults.""" + retry_interval=5, retry_callback: Callable[[Exception], int] = None, **kw): + """ + Initialize defaults. + + :param int jitter_interval: random milliseconds when retrying writes + :param num max_retry_delay: maximum delay when retrying write in seconds + :param int max_retry_time: maximum total retry timeout in seconds, + attempt after this timout throws MaxRetryError + :param int total: maximum number of retries + :param num retry_interval: initial first retry delay range in seconds + :param int exponential_base: base for the exponential retry delay, + :param Callable[[Exception], int] retry_callback: the callable ``callback`` to run after retryable + error occurred. + The callable must accept one argument: + - `Exception`: an retryable error + """ super().__init__(**kw) self.jitter_interval = jitter_interval self.total = total @@ -44,6 +52,7 @@ def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, m self.max_retry_time = max_retry_time self.exponential_base = exponential_base self.retry_timeout = datetime.now() + timedelta(seconds=max_retry_time) + self.retry_callback = retry_callback def new(self, **kw): """Initialize defaults.""" @@ -57,6 +66,8 @@ def new(self, **kw): kw['max_retry_time'] = self.max_retry_time if 'exponential_base' not in kw: kw['exponential_base'] = self.exponential_base + if 'retry_callback' not in kw: + kw['retry_callback'] = self.retry_callback new = super().new(**kw) new.retry_timeout = self.retry_timeout @@ -123,6 +134,9 @@ def increment(self, method=None, url=None, response=None, error=None, _pool=None if isinstance(parsed_error, InfluxDBError): message += f" Retry in {parsed_error.retry_after}s." + if self.retry_callback: + self.retry_callback(parsed_error) + logger.warning(message) return new_retry diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 9b662c5c..af7c5b03 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -79,8 +79,14 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.exponential_base = exponential_base self.write_scheduler = write_scheduler - def to_retry_strategy(self): - """Create a Retry strategy from write options.""" + def to_retry_strategy(self, **kwargs): + """ + Create a Retry strategy from write options. + + :key retry_callback: The callable ``callback`` to run after retryable error occurred. + The callable must accept one argument: + - `Exception`: an retryable error + """ return WritesRetry( total=self.max_retries, retry_interval=self.retry_interval / 1_000, @@ -88,6 +94,7 @@ def to_retry_strategy(self): max_retry_delay=self.max_retry_delay / 1_000, max_retry_time=self.max_retry_time / 1_000, exponential_base=self.exponential_base, + retry_callback=kwargs.get("retry_callback", None), method_whitelist=["POST"]) def __getstate__(self): @@ -135,18 +142,6 @@ def add_default_tag(self, key, value) -> None: self.defaultTags[key] = self._get_value(value) -class _BatchItem(object): - def __init__(self, key, data, size=1) -> None: - self.key = key - self.data = data - self.size = size - pass - - def __str__(self) -> str: - return '_BatchItem[key:\'{}\', size: \'{}\']' \ - .format(str(self.key), str(self.size)) - - class _BatchItemKey(object): def __init__(self, bucket, org, precision=DEFAULT_WRITE_PRECISION) -> None: self.bucket = bucket @@ -166,8 +161,23 @@ def __str__(self) -> str: .format(str(self.bucket), str(self.org), str(self.precision)) +class _BatchItem(object): + def __init__(self, key: _BatchItemKey, data, size=1) -> None: + self.key = key + self.data = data + self.size = size + pass + + def to_key_tuple(self) -> (str, str, str): + return self.key.bucket, self.key.org, self.key.precision + + def __str__(self) -> str: + return '_BatchItem[key:\'{}\', size: \'{}\']' \ + .format(str(self.key), str(self.size)) + + class _BatchResponse(object): - def __init__(self, data: _BatchItem, exception=None): + def __init__(self, data: _BatchItem, exception: Exception = None): self.data = data self.exception = exception pass @@ -197,13 +207,48 @@ class WriteApi: write_api = client.write_api(write_options=SYNCHRONOUS) """ - def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions(), - point_settings: PointSettings = PointSettings()) -> None: - """Initialize defaults.""" + def __init__(self, + influxdb_client, + write_options: WriteOptions = WriteOptions(), + point_settings: PointSettings = PointSettings(), + **kwargs) -> None: + """ + Initialize defaults. + + :param influxdb_client: with default settings (organization) + :param write_options: write api configuration + :param point_settings: settings to store default tags. + :key success_callback: The callable ``callback`` to run after successfully writen a batch. + + The callable must accept two arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + + **[batching mode]** + :key error_callback: The callable ``callback`` to run after unsuccessfully writen a batch. + + The callable must accept three arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + - `Exception`: an occurred error + + **[batching mode]** + :key retry_callback: The callable ``callback`` to run after retryable error occurred. + + The callable must accept three arguments: + - `Tuple`: ``(bucket, organization, precision)`` + - `str`: written data + - `Exception`: an retryable error + + **[batching mode]** + """ self._influxdb_client = influxdb_client self._write_service = WriteService(influxdb_client.api_client) self._write_options = write_options self._point_settings = point_settings + self._success_callback = kwargs.get('success_callback', None) + self._error_callback = kwargs.get('error_callback', None) + self._retry_callback = kwargs.get('retry_callback', None) if influxdb_client.default_tags: for key, value in influxdb_client.default_tags.items(): @@ -452,7 +497,13 @@ def _http(self, batch_item: _BatchItem): logger.debug("Write time series data into InfluxDB: %s", batch_item) - retry = self._write_options.to_retry_strategy() + if self._retry_callback: + def _retry_callback_delegate(exception): + return self._retry_callback(batch_item.to_key_tuple(), batch_item.data, exception) + else: + _retry_callback_delegate = None + + retry = self._write_options.to_retry_strategy(retry_callback=_retry_callback_delegate) self._post_write(False, batch_item.key.bucket, batch_item.key.org, batch_item.data, batch_item.key.precision, urlopen_kw={'retries': retry}) @@ -483,12 +534,15 @@ def _to_response(self, data: _BatchItem, delay: timedelta): def _jitter_delay(self): return timedelta(milliseconds=random() * self._write_options.jitter_interval) - @staticmethod - def _on_next(response: _BatchResponse): + def _on_next(self, response: _BatchResponse): if response.exception: logger.error("The batch item wasn't processed successfully because: %s", response.exception) + if self._error_callback: + self._error_callback(response.data.to_key_tuple(), response.data.data, response.exception) else: logger.debug("The batch item: %s was processed successfully.", response) + if self._success_callback: + self._success_callback(response.data.to_key_tuple(), response.data.data) @staticmethod def _on_error(ex): diff --git a/influxdb_client/rest.py b/influxdb_client/rest.py index 49a35921..8fc5e53a 100644 --- a/influxdb_client/rest.py +++ b/influxdb_client/rest.py @@ -22,6 +22,8 @@ import six from six.moves.urllib.parse import urlencode +from influxdb_client.client.exceptions import InfluxDBError + try: import urllib3 except ImportError: @@ -346,7 +348,7 @@ def __setstate__(self, state): self.__init__(self.configuration, self.pools_size, self.maxsize, self.retries) -class ApiException(Exception): +class ApiException(InfluxDBError): """NOTE: This class is auto generated by OpenAPI Generator. Ref: https://openapi-generator.tech @@ -355,6 +357,7 @@ class ApiException(Exception): def __init__(self, status=None, reason=None, http_resp=None): """Initialize with HTTP response.""" + super().__init__(response=http_resp) if http_resp: self.status = http_resp.status self.reason = http_resp.reason diff --git a/influxdb_client/service/authorizations_service.py b/influxdb_client/service/authorizations_service.py index 502f6ff6..61a78aa2 100644 --- a/influxdb_client/service/authorizations_service.py +++ b/influxdb_client/service/authorizations_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class AuthorizationsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class AuthorizationsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """AuthorizationsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_authorizations_id(self, auth_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/buckets_service.py b/influxdb_client/service/buckets_service.py index c0bfda4f..0ae373a6 100644 --- a/influxdb_client/service/buckets_service.py +++ b/influxdb_client/service/buckets_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class BucketsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class BucketsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """BucketsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_buckets_id(self, bucket_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/cells_service.py b/influxdb_client/service/cells_service.py index ee9622d0..21c9741a 100644 --- a/influxdb_client/service/cells_service.py +++ b/influxdb_client/service/cells_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class CellsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class CellsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """CellsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_dashboards_id_cells_id(self, dashboard_id, cell_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/checks_service.py b/influxdb_client/service/checks_service.py index 717c4263..9525993d 100644 --- a/influxdb_client/service/checks_service.py +++ b/influxdb_client/service/checks_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class ChecksService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class ChecksService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """ChecksService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def create_check(self, post_check, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/dashboards_service.py b/influxdb_client/service/dashboards_service.py index 04ce379a..47fa7e96 100644 --- a/influxdb_client/service/dashboards_service.py +++ b/influxdb_client/service/dashboards_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class DashboardsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class DashboardsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """DashboardsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_dashboards_id(self, dashboard_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/dbr_ps_service.py b/influxdb_client/service/dbr_ps_service.py index 83290744..bc58883f 100644 --- a/influxdb_client/service/dbr_ps_service.py +++ b/influxdb_client/service/dbr_ps_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class DBRPsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class DBRPsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """DBRPsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_dbrpid(self, org_id, dbrp_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/default_service.py b/influxdb_client/service/default_service.py index 4a199216..d79a76f9 100644 --- a/influxdb_client/service/default_service.py +++ b/influxdb_client/service/default_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class DefaultService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class DefaultService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """DefaultService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_routes(self, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/delete_service.py b/influxdb_client/service/delete_service.py index 7ff51de7..4adadf13 100644 --- a/influxdb_client/service/delete_service.py +++ b/influxdb_client/service/delete_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class DeleteService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class DeleteService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """DeleteService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def post_delete(self, delete_predicate_request, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/health_service.py b/influxdb_client/service/health_service.py index 3843366b..0ffc48a4 100644 --- a/influxdb_client/service/health_service.py +++ b/influxdb_client/service/health_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class HealthService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class HealthService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """HealthService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_health(self, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/labels_service.py b/influxdb_client/service/labels_service.py index 6453a4bc..7319645e 100644 --- a/influxdb_client/service/labels_service.py +++ b/influxdb_client/service/labels_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class LabelsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class LabelsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """LabelsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_labels_id(self, label_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/notification_endpoints_service.py b/influxdb_client/service/notification_endpoints_service.py index 8594c31d..5d498f2f 100644 --- a/influxdb_client/service/notification_endpoints_service.py +++ b/influxdb_client/service/notification_endpoints_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class NotificationEndpointsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class NotificationEndpointsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """NotificationEndpointsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def create_notification_endpoint(self, post_notification_endpoint, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/notification_rules_service.py b/influxdb_client/service/notification_rules_service.py index 3ae278f1..6b543643 100644 --- a/influxdb_client/service/notification_rules_service.py +++ b/influxdb_client/service/notification_rules_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class NotificationRulesService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class NotificationRulesService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """NotificationRulesService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def create_notification_rule(self, post_notification_rule, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/organizations_service.py b/influxdb_client/service/organizations_service.py index c07cd340..e44c06f7 100644 --- a/influxdb_client/service/organizations_service.py +++ b/influxdb_client/service/organizations_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class OrganizationsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class OrganizationsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """OrganizationsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_orgs_id(self, org_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/query_service.py b/influxdb_client/service/query_service.py index 45625471..8638a595 100644 --- a/influxdb_client/service/query_service.py +++ b/influxdb_client/service/query_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class QueryService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class QueryService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """QueryService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_query_suggestions(self, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/ready_service.py b/influxdb_client/service/ready_service.py index 81edeb37..aa2bb21e 100644 --- a/influxdb_client/service/ready_service.py +++ b/influxdb_client/service/ready_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class ReadyService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class ReadyService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """ReadyService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_ready(self, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/rules_service.py b/influxdb_client/service/rules_service.py index 0ae274d6..8b2b95a1 100644 --- a/influxdb_client/service/rules_service.py +++ b/influxdb_client/service/rules_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class RulesService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class RulesService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """RulesService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_notification_rules_id_query(self, rule_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/scraper_targets_service.py b/influxdb_client/service/scraper_targets_service.py index 39bfbd3c..be35a7fa 100644 --- a/influxdb_client/service/scraper_targets_service.py +++ b/influxdb_client/service/scraper_targets_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class ScraperTargetsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class ScraperTargetsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """ScraperTargetsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_scrapers_id(self, scraper_target_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/secrets_service.py b/influxdb_client/service/secrets_service.py index 26df30fb..21fe6e05 100644 --- a/influxdb_client/service/secrets_service.py +++ b/influxdb_client/service/secrets_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class SecretsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class SecretsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """SecretsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_orgs_id_secrets(self, org_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/setup_service.py b/influxdb_client/service/setup_service.py index f28b913f..190cd841 100644 --- a/influxdb_client/service/setup_service.py +++ b/influxdb_client/service/setup_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class SetupService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class SetupService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """SetupService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_setup(self, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/sources_service.py b/influxdb_client/service/sources_service.py index e83eb013..acf2fa39 100644 --- a/influxdb_client/service/sources_service.py +++ b/influxdb_client/service/sources_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class SourcesService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class SourcesService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """SourcesService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_sources_id(self, source_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/tasks_service.py b/influxdb_client/service/tasks_service.py index 309756c2..a6a9e95a 100644 --- a/influxdb_client/service/tasks_service.py +++ b/influxdb_client/service/tasks_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class TasksService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class TasksService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """TasksService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_tasks_id(self, task_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/telegrafs_service.py b/influxdb_client/service/telegrafs_service.py index 5fca177a..64e5c0c2 100644 --- a/influxdb_client/service/telegrafs_service.py +++ b/influxdb_client/service/telegrafs_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class TelegrafsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class TelegrafsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """TelegrafsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_telegrafs_id(self, telegraf_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/templates_service.py b/influxdb_client/service/templates_service.py index e5583bc3..390fde8e 100644 --- a/influxdb_client/service/templates_service.py +++ b/influxdb_client/service/templates_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class TemplatesService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class TemplatesService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """TemplatesService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_documents_templates_id(self, template_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/users_service.py b/influxdb_client/service/users_service.py index 1dd6b1a6..d80861c0 100644 --- a/influxdb_client/service/users_service.py +++ b/influxdb_client/service/users_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class UsersService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class UsersService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """UsersService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_users_id(self, user_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/variables_service.py b/influxdb_client/service/variables_service.py index f1705a32..999e4c9b 100644 --- a/influxdb_client/service/variables_service.py +++ b/influxdb_client/service/variables_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class VariablesService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class VariablesService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """VariablesService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def delete_variables_id(self, variable_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/views_service.py b/influxdb_client/service/views_service.py index 47a2ee5e..02f1c61b 100644 --- a/influxdb_client/service/views_service.py +++ b/influxdb_client/service/views_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class ViewsService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class ViewsService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """ViewsService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def get_dashboards_id_cells_id_view(self, dashboard_id, cell_id, **kwargs): # noqa: E501,D401,D403 diff --git a/influxdb_client/service/write_service.py b/influxdb_client/service/write_service.py index eca85b09..68baf36c 100644 --- a/influxdb_client/service/write_service.py +++ b/influxdb_client/service/write_service.py @@ -17,8 +17,6 @@ # python 2 and python 3 compatibility library import six -from influxdb_client.api_client import ApiClient - class WriteService(object): """NOTE: This class is auto generated by OpenAPI Generator. @@ -31,7 +29,7 @@ class WriteService(object): def __init__(self, api_client=None): # noqa: E501,D401,D403 """WriteService - a operation defined in OpenAPI.""" if api_client is None: - api_client = ApiClient() + raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client def post_write(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 diff --git a/tests/__init__.py b/tests/__init__.py index 1545ae8c..3ce3d1f6 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,11 +1,11 @@ # flake8: noqa """ -Influx API Service. +Influx OSS API Service. No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501 -OpenAPI spec version: 0.1.0 +OpenAPI spec version: 2.0.0 Generated by: https://openapi-generator.tech """ @@ -19,6 +19,7 @@ from influxdb_client.service.checks_service import ChecksService from influxdb_client.service.dbr_ps_service import DBRPsService from influxdb_client.service.dashboards_service import DashboardsService +from influxdb_client.service.delete_service import DeleteService from influxdb_client.service.health_service import HealthService from influxdb_client.service.labels_service import LabelsService from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService diff --git a/tests/test_QueryApi.py b/tests/test_QueryApi.py index c3127fd4..29375203 100644 --- a/tests/test_QueryApi.py +++ b/tests/test_QueryApi.py @@ -479,11 +479,11 @@ def test_profiler_mock(self): self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False) query_api = self.client.query_api(query_options=QueryOptions(profilers=["query", "operator"])) tables = query_api.query(query=query) - self.assertEquals(len(tables), 1) - self.assertEquals(len(tables[0].columns), 10) - self.assertEquals(len(tables[0].records), 6) + self.assertEqual(len(tables), 1) + self.assertEqual(len(tables[0].columns), 10) + self.assertEqual(len(tables[0].records), 6) - self.assertEquals(tables[0].records[5].values, + self.assertEqual(tables[0].records[5].values, {'result': '_result', 'table': 0, '_start': datetime.datetime(2021, 5, 24, 8, 40, 44, 785000, tzinfo=tzutc()), '_stop': datetime.datetime(2021, 5, 24, 8, 45, 44, 785000, tzinfo=tzutc()), diff --git a/tests/test_TasksApi.py b/tests/test_TasksApi.py index 9511f338..40cebd57 100644 --- a/tests/test_TasksApi.py +++ b/tests/test_TasksApi.py @@ -165,7 +165,7 @@ def test_find_task_by_user_id(self): self.organization.id) tasks = self.tasks_api.find_tasks_by_user(task_user_id=task_user.id) print(tasks) - self.assertEquals(len(tasks), 1) + self.assertEqual(len(tasks), 1) def test_delete_task(self): task = self.tasks_api.create_task_cron(self.generate_name("it_task"), TASK_FLUX, "0 2 * * *", diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index ec10e9cf..e9e91034 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -14,6 +14,7 @@ import influxdb_client from influxdb_client import WritePrecision, InfluxDBClient, CLIENT_VERSION +from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.write.point import Point from influxdb_client.client.write_api import WriteOptions, WriteApi, PointSettings @@ -263,7 +264,7 @@ def test_retry_disabled_max_retry_time(self): self._write_client.close() self._write_client = WriteApi(influxdb_client=self.influxdb_client, - write_options=WriteOptions(max_retry_time=0,batch_size=2, flush_interval=1_000)) + write_options=WriteOptions(max_retry_time=0, batch_size=2, flush_interval=1_000)) self._write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek level\\ water_level=1 1", @@ -349,7 +350,7 @@ def test_record_types(self): # Tuple _bytes3 = "h2o_feet,location=coyote_creek level\\ water_level=19 19".encode("utf-8") _bytes4 = "h2o_feet,location=coyote_creek level\\ water_level=20 20".encode("utf-8") - self._write_client.write("my-bucket", "my-org", (_bytes3, _bytes4, )) + self._write_client.write("my-bucket", "my-org", (_bytes3, _bytes4,)) time.sleep(1) @@ -573,6 +574,123 @@ class Car: self.assertEqual(1, len(_requests)) self.assertEqual("performance,engine=12V-BT,type=sport-cars speed=125.25", _requests[0].parsed_body) + def test_success_callback(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + class SuccessCallback(object): + def __init__(self): + self.conf = None + self.data = None + + def __call__(self, conf: (str, str, str), data: str): + self.conf = conf + self.data = data + + callback = SuccessCallback() + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=2), success_callback=callback) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek water_level=1 1", + "h2o_feet,location=coyote_creek water_level=2 2"]) + + time.sleep(1) + _requests = httpretty.httpretty.latest_requests + self.assertEqual(1, len(_requests)) + self.assertEqual("h2o_feet,location=coyote_creek water_level=1 1\n" + "h2o_feet,location=coyote_creek water_level=2 2", _requests[0].parsed_body) + + self.assertEqual(b"h2o_feet,location=coyote_creek water_level=1 1\n" + b"h2o_feet,location=coyote_creek water_level=2 2", callback.data) + self.assertEqual("my-bucket", callback.conf[0]) + self.assertEqual("my-org", callback.conf[1]) + self.assertEqual("ns", callback.conf[2]) + + def test_error_callback(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=400) + + class ErrorCallback(object): + def __init__(self): + self.conf = None + self.data = None + self.error = None + + def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError): + self.conf = conf + self.data = data + self.error = error + + callback = ErrorCallback() + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=2), error_callback=callback) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek water_level=1 x", + "h2o_feet,location=coyote_creek water_level=2 2"]) + + time.sleep(1) + _requests = httpretty.httpretty.latest_requests + self.assertEqual(1, len(_requests)) + self.assertEqual("h2o_feet,location=coyote_creek water_level=1 x\n" + "h2o_feet,location=coyote_creek water_level=2 2", _requests[0].parsed_body) + + self.assertEqual(b"h2o_feet,location=coyote_creek water_level=1 x\n" + b"h2o_feet,location=coyote_creek water_level=2 2", callback.data) + self.assertEqual("my-bucket", callback.conf[0]) + self.assertEqual("my-org", callback.conf[1]) + self.assertEqual("ns", callback.conf[2]) + self.assertIsNotNone(callback.error) + self.assertIsInstance(callback.error, InfluxDBError) + self.assertEqual(400, callback.error.response.status) + + def test_retry_callback(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429, adding_headers={'Retry-After': '1'}) + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=503, adding_headers={'Retry-After': '1'}) + + class RetryCallback(object): + def __init__(self): + self.count = 0 + self.conf = None + self.data = None + self.error = None + + def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError): + self.count += 1 + self.conf = conf + self.data = data + self.error = error + + callback = RetryCallback() + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=2), retry_callback=callback) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek water_level=1 1", + "h2o_feet,location=coyote_creek water_level=2 2"]) + + time.sleep(3) + _requests = httpretty.httpretty.latest_requests + self.assertEqual(3, len(_requests)) + self.assertEqual("h2o_feet,location=coyote_creek water_level=1 1\n" + "h2o_feet,location=coyote_creek water_level=2 2", _requests[0].parsed_body) + + self.assertEqual(2, callback.count) + self.assertEqual(b"h2o_feet,location=coyote_creek water_level=1 1\n" + b"h2o_feet,location=coyote_creek water_level=2 2", callback.data) + self.assertEqual("my-bucket", callback.conf[0]) + self.assertEqual("my-org", callback.conf[1]) + self.assertEqual("ns", callback.conf[2]) + self.assertIsNotNone(callback.error) + self.assertIsInstance(callback.error, InfluxDBError) + self.assertEqual(429, callback.error.response.status) + if __name__ == '__main__': unittest.main()