Skip to content

fix: prevent exceptions in callbacks from causing deadlocks #559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.37.0 [unreleased]

### Bug Fixes
1. [#559](https://github.com/influxdata/influxdb-client-python/pull/559): Exceptions in callbacks can cause deadlocks

## 1.36.0 [2023-01-26]

### Features
Expand Down
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,9 @@ The batching is configurable by ``write_options``\ :
* - **max_retry_delay**
- the maximum delay between each retry attempt in milliseconds
- ``125_000``
* - **max_close_wait**
- the maximum amount of time to wait for batches to flush when `.close()` is called
- ``300_000``
* - **exponential_base**
- the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retry_interval * exponential_base^(attempts-1)`` and ``retry_interval * exponential_base^(attempts)``. Example for ``retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]``
- ``2``
Expand All @@ -470,6 +473,7 @@ The batching is configurable by ``write_options``\ :
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
max_close_wait=300_000,
exponential_base=2)) as _write_client:

"""
Expand Down
47 changes: 43 additions & 4 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
max_retry_delay=125_000,
max_retry_time=180_000,
exponential_base=2,
max_close_wait=300_000,
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
"""
Create write api configuration.
Expand All @@ -66,6 +67,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
:param max_retry_delay: the maximum delay between each retry attempt in milliseconds
:param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled
:param exponential_base: base for the exponential retry delay
:parama max_close_wait: the maximum time to wait for writes to be flushed if close() is called
:param write_scheduler:
"""
self.write_type = write_type
Expand All @@ -78,6 +80,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
self.max_retry_time = max_retry_time
self.exponential_base = exponential_base
self.write_scheduler = write_scheduler
self.max_close_wait = max_close_wait

def to_retry_strategy(self, **kwargs):
"""
Expand Down Expand Up @@ -410,9 +413,31 @@ def __del__(self):
self._subject.dispose()
self._subject = None

# Wait for finish writing
"""
We impose a maximum wait time to ensure that we do not cause a deadlock if the
background thread has exited abnormally

Each iteration waits 100ms, but sleep expects the unit to be seconds so convert
the maximum wait time to seconds.

We keep a counter of how long we've waited
"""
max_wait_time = self._write_options.max_close_wait / 1000
waited = 0
sleep_period = 0.1

# Wait for writing to finish
while not self._disposable.is_disposed:
sleep(0.1)
sleep(sleep_period)
waited += sleep_period

# Have we reached the upper limit?
if waited >= max_wait_time:
logger.warning(
"Reached max_close_wait (%s seconds) waiting for batches to finish writing. Force closing",
max_wait_time
)
break

if self._disposable:
self._disposable = None
Expand Down Expand Up @@ -505,11 +530,25 @@ def _on_next(self, response: _BatchResponse):
if response.exception:
logger.error("The batch item wasn't processed successfully because: %s", response.exception)
if self._error_callback:
self._error_callback(response.data.to_key_tuple(), response.data.data, response.exception)
try:
self._error_callback(response.data.to_key_tuple(), response.data.data, response.exception)
except Exception as e:
"""
Unfortunately, because callbacks are user-provided generic code, exceptions can be entirely
arbitrary

We trap it, log that it occurred and then proceed - there's not much more that we can
really do.
"""
logger.error("The configured error callback threw an exception: %s", e)

else:
logger.debug("The batch item: %s was processed successfully.", response)
if self._success_callback:
self._success_callback(response.data.to_key_tuple(), response.data.data)
try:
self._success_callback(response.data.to_key_tuple(), response.data.data)
except Exception as e:
logger.error("The configured success callback threw an exception: %s", e)

@staticmethod
def _on_error(ex):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
'randomize>=0.13',
'pytest>=5.0.0',
'pytest-cov>=3.0.0',
'pytest-timeout>=2.1.0',
'httpretty==1.0.5',
'psutil>=5.6.3',
'aioresponses>=0.7.3',
Expand Down
45 changes: 45 additions & 0 deletions tests/test_WriteApiBatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,51 @@ def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError):
self.assertIsInstance(callback.error, InfluxDBError)
self.assertEqual(400, callback.error.response.status)

@pytest.mark.timeout(timeout=20)
def test_error_callback_exception(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=400)

class ErrorCallback(object):
def __init__(self):
self.conf = None
self.data = None
self.error = None

def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError):
self.conf = conf
self.data = data
self.error = error
raise Exception('Test generated an error')


callback = ErrorCallback()

self._write_client.close()
self._write_client = WriteApi(influxdb_client=self.influxdb_client,
write_options=WriteOptions(batch_size=2, max_close_wait=2_000), error_callback=callback)

self._write_client.write("my-bucket", "my-org",
["h2o_feet,location=coyote_creek water_level=1 x",
"h2o_feet,location=coyote_creek water_level=2 2"])

time.sleep(1)
_requests = httpretty.httpretty.latest_requests
self.assertEqual(1, len(_requests))
self.assertEqual("h2o_feet,location=coyote_creek water_level=1 x\n"
"h2o_feet,location=coyote_creek water_level=2 2", _requests[0].parsed_body)

self.assertEqual(b"h2o_feet,location=coyote_creek water_level=1 x\n"
b"h2o_feet,location=coyote_creek water_level=2 2", callback.data)
self.assertEqual("my-bucket", callback.conf[0])
self.assertEqual("my-org", callback.conf[1])
self.assertEqual("ns", callback.conf[2])
self.assertIsNotNone(callback.error)
self.assertIsInstance(callback.error, InfluxDBError)
self.assertEqual(400, callback.error.response.status)

self._write_client.close()


def test_retry_callback(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429, adding_headers={'Retry-After': '1'})
Expand Down