diff --git a/CHANGELOG.md b/CHANGELOG.md index 6708882e..4e8da89f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.37.0 [unreleased] +### Bug Fixes +1. [#559](https://github.com/influxdata/influxdb-client-python/pull/559): Exceptions in callbacks can cause deadlocks + ## 1.36.0 [2023-01-26] ### Features diff --git a/README.rst b/README.rst index 5e69b75e..de6e64fc 100644 --- a/README.rst +++ b/README.rst @@ -447,6 +447,9 @@ The batching is configurable by ``write_options``\ : * - **max_retry_delay** - the maximum delay between each retry attempt in milliseconds - ``125_000`` + * - **max_close_wait** + - the maximum amount of time to wait for batches to flush when `.close()` is called + - ``300_000`` * - **exponential_base** - the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retry_interval * exponential_base^(attempts-1)`` and ``retry_interval * exponential_base^(attempts)``. Example for ``retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]`` - ``2`` @@ -470,6 +473,7 @@ The batching is configurable by ``write_options``\ : retry_interval=5_000, max_retries=5, max_retry_delay=30_000, + max_close_wait=300_000, exponential_base=2)) as _write_client: """ diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index f03a3c5f..0fcf27c1 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -51,6 +51,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, max_retry_delay=125_000, max_retry_time=180_000, exponential_base=2, + max_close_wait=300_000, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ Create write api configuration. @@ -66,6 +67,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param max_retry_delay: the maximum delay between each retry attempt in milliseconds :param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled :param exponential_base: base for the exponential retry delay + :parama max_close_wait: the maximum time to wait for writes to be flushed if close() is called :param write_scheduler: """ self.write_type = write_type @@ -78,6 +80,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.max_retry_time = max_retry_time self.exponential_base = exponential_base self.write_scheduler = write_scheduler + self.max_close_wait = max_close_wait def to_retry_strategy(self, **kwargs): """ @@ -410,9 +413,31 @@ def __del__(self): self._subject.dispose() self._subject = None - # Wait for finish writing + """ + We impose a maximum wait time to ensure that we do not cause a deadlock if the + background thread has exited abnormally + + Each iteration waits 100ms, but sleep expects the unit to be seconds so convert + the maximum wait time to seconds. + + We keep a counter of how long we've waited + """ + max_wait_time = self._write_options.max_close_wait / 1000 + waited = 0 + sleep_period = 0.1 + + # Wait for writing to finish while not self._disposable.is_disposed: - sleep(0.1) + sleep(sleep_period) + waited += sleep_period + + # Have we reached the upper limit? + if waited >= max_wait_time: + logger.warning( + "Reached max_close_wait (%s seconds) waiting for batches to finish writing. Force closing", + max_wait_time + ) + break if self._disposable: self._disposable = None @@ -505,11 +530,25 @@ def _on_next(self, response: _BatchResponse): if response.exception: logger.error("The batch item wasn't processed successfully because: %s", response.exception) if self._error_callback: - self._error_callback(response.data.to_key_tuple(), response.data.data, response.exception) + try: + self._error_callback(response.data.to_key_tuple(), response.data.data, response.exception) + except Exception as e: + """ + Unfortunately, because callbacks are user-provided generic code, exceptions can be entirely + arbitrary + + We trap it, log that it occurred and then proceed - there's not much more that we can + really do. + """ + logger.error("The configured error callback threw an exception: %s", e) + else: logger.debug("The batch item: %s was processed successfully.", response) if self._success_callback: - self._success_callback(response.data.to_key_tuple(), response.data.data) + try: + self._success_callback(response.data.to_key_tuple(), response.data.data) + except Exception as e: + logger.error("The configured success callback threw an exception: %s", e) @staticmethod def _on_error(ex): diff --git a/setup.py b/setup.py index 2afe906c..5c63fa2d 100644 --- a/setup.py +++ b/setup.py @@ -21,6 +21,7 @@ 'randomize>=0.13', 'pytest>=5.0.0', 'pytest-cov>=3.0.0', + 'pytest-timeout>=2.1.0', 'httpretty==1.0.5', 'psutil>=5.6.3', 'aioresponses>=0.7.3', diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index 917a4a57..8befd7e5 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -647,6 +647,51 @@ def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError): self.assertIsInstance(callback.error, InfluxDBError) self.assertEqual(400, callback.error.response.status) + @pytest.mark.timeout(timeout=20) + def test_error_callback_exception(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=400) + + class ErrorCallback(object): + def __init__(self): + self.conf = None + self.data = None + self.error = None + + def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError): + self.conf = conf + self.data = data + self.error = error + raise Exception('Test generated an error') + + + callback = ErrorCallback() + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=2, max_close_wait=2_000), error_callback=callback) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek water_level=1 x", + "h2o_feet,location=coyote_creek water_level=2 2"]) + + time.sleep(1) + _requests = httpretty.httpretty.latest_requests + self.assertEqual(1, len(_requests)) + self.assertEqual("h2o_feet,location=coyote_creek water_level=1 x\n" + "h2o_feet,location=coyote_creek water_level=2 2", _requests[0].parsed_body) + + self.assertEqual(b"h2o_feet,location=coyote_creek water_level=1 x\n" + b"h2o_feet,location=coyote_creek water_level=2 2", callback.data) + self.assertEqual("my-bucket", callback.conf[0]) + self.assertEqual("my-org", callback.conf[1]) + self.assertEqual("ns", callback.conf[2]) + self.assertIsNotNone(callback.error) + self.assertIsInstance(callback.error, InfluxDBError) + self.assertEqual(400, callback.error.response.status) + + self._write_client.close() + + def test_retry_callback(self): httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429, adding_headers={'Retry-After': '1'})