Skip to content

Commit 497738c

Browse files
authored
fix: Prevent exceptions in callbacks from causing deadlocks (influxdata#559)
1 parent a07fea6 commit 497738c

File tree

5 files changed

+96
-4
lines changed

5 files changed

+96
-4
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.37.0 [unreleased]
22

3+
### Bug Fixes
4+
1. [#559](https://github.com/influxdata/influxdb-client-python/pull/559): Exceptions in callbacks can cause deadlocks
5+
36
## 1.36.0 [2023-01-26]
47

58
### Features

README.rst

+4
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,9 @@ The batching is configurable by ``write_options``\ :
447447
* - **max_retry_delay**
448448
- the maximum delay between each retry attempt in milliseconds
449449
- ``125_000``
450+
* - **max_close_wait**
451+
- the maximum amount of time to wait for batches to flush when `.close()` is called
452+
- ``300_000``
450453
* - **exponential_base**
451454
- the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retry_interval * exponential_base^(attempts-1)`` and ``retry_interval * exponential_base^(attempts)``. Example for ``retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]``
452455
- ``2``
@@ -470,6 +473,7 @@ The batching is configurable by ``write_options``\ :
470473
retry_interval=5_000,
471474
max_retries=5,
472475
max_retry_delay=30_000,
476+
max_close_wait=300_000,
473477
exponential_base=2)) as _write_client:
474478
475479
"""

influxdb_client/client/write_api.py

+43-4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5151
max_retry_delay=125_000,
5252
max_retry_time=180_000,
5353
exponential_base=2,
54+
max_close_wait=300_000,
5455
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
5556
"""
5657
Create write api configuration.
@@ -66,6 +67,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
6667
:param max_retry_delay: the maximum delay between each retry attempt in milliseconds
6768
:param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled
6869
:param exponential_base: base for the exponential retry delay
70+
:parama max_close_wait: the maximum time to wait for writes to be flushed if close() is called
6971
:param write_scheduler:
7072
"""
7173
self.write_type = write_type
@@ -78,6 +80,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
7880
self.max_retry_time = max_retry_time
7981
self.exponential_base = exponential_base
8082
self.write_scheduler = write_scheduler
83+
self.max_close_wait = max_close_wait
8184

8285
def to_retry_strategy(self, **kwargs):
8386
"""
@@ -410,9 +413,31 @@ def __del__(self):
410413
self._subject.dispose()
411414
self._subject = None
412415

413-
# Wait for finish writing
416+
"""
417+
We impose a maximum wait time to ensure that we do not cause a deadlock if the
418+
background thread has exited abnormally
419+
420+
Each iteration waits 100ms, but sleep expects the unit to be seconds so convert
421+
the maximum wait time to seconds.
422+
423+
We keep a counter of how long we've waited
424+
"""
425+
max_wait_time = self._write_options.max_close_wait / 1000
426+
waited = 0
427+
sleep_period = 0.1
428+
429+
# Wait for writing to finish
414430
while not self._disposable.is_disposed:
415-
sleep(0.1)
431+
sleep(sleep_period)
432+
waited += sleep_period
433+
434+
# Have we reached the upper limit?
435+
if waited >= max_wait_time:
436+
logger.warning(
437+
"Reached max_close_wait (%s seconds) waiting for batches to finish writing. Force closing",
438+
max_wait_time
439+
)
440+
break
416441

417442
if self._disposable:
418443
self._disposable = None
@@ -505,11 +530,25 @@ def _on_next(self, response: _BatchResponse):
505530
if response.exception:
506531
logger.error("The batch item wasn't processed successfully because: %s", response.exception)
507532
if self._error_callback:
508-
self._error_callback(response.data.to_key_tuple(), response.data.data, response.exception)
533+
try:
534+
self._error_callback(response.data.to_key_tuple(), response.data.data, response.exception)
535+
except Exception as e:
536+
"""
537+
Unfortunately, because callbacks are user-provided generic code, exceptions can be entirely
538+
arbitrary
539+
540+
We trap it, log that it occurred and then proceed - there's not much more that we can
541+
really do.
542+
"""
543+
logger.error("The configured error callback threw an exception: %s", e)
544+
509545
else:
510546
logger.debug("The batch item: %s was processed successfully.", response)
511547
if self._success_callback:
512-
self._success_callback(response.data.to_key_tuple(), response.data.data)
548+
try:
549+
self._success_callback(response.data.to_key_tuple(), response.data.data)
550+
except Exception as e:
551+
logger.error("The configured success callback threw an exception: %s", e)
513552

514553
@staticmethod
515554
def _on_error(ex):

setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
'randomize>=0.13',
2222
'pytest>=5.0.0',
2323
'pytest-cov>=3.0.0',
24+
'pytest-timeout>=2.1.0',
2425
'httpretty==1.0.5',
2526
'psutil>=5.6.3',
2627
'aioresponses>=0.7.3',

tests/test_WriteApiBatching.py

+45
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,51 @@ def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError):
647647
self.assertIsInstance(callback.error, InfluxDBError)
648648
self.assertEqual(400, callback.error.response.status)
649649

650+
@pytest.mark.timeout(timeout=20)
651+
def test_error_callback_exception(self):
652+
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=400)
653+
654+
class ErrorCallback(object):
655+
def __init__(self):
656+
self.conf = None
657+
self.data = None
658+
self.error = None
659+
660+
def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError):
661+
self.conf = conf
662+
self.data = data
663+
self.error = error
664+
raise Exception('Test generated an error')
665+
666+
667+
callback = ErrorCallback()
668+
669+
self._write_client.close()
670+
self._write_client = WriteApi(influxdb_client=self.influxdb_client,
671+
write_options=WriteOptions(batch_size=2, max_close_wait=2_000), error_callback=callback)
672+
673+
self._write_client.write("my-bucket", "my-org",
674+
["h2o_feet,location=coyote_creek water_level=1 x",
675+
"h2o_feet,location=coyote_creek water_level=2 2"])
676+
677+
time.sleep(1)
678+
_requests = httpretty.httpretty.latest_requests
679+
self.assertEqual(1, len(_requests))
680+
self.assertEqual("h2o_feet,location=coyote_creek water_level=1 x\n"
681+
"h2o_feet,location=coyote_creek water_level=2 2", _requests[0].parsed_body)
682+
683+
self.assertEqual(b"h2o_feet,location=coyote_creek water_level=1 x\n"
684+
b"h2o_feet,location=coyote_creek water_level=2 2", callback.data)
685+
self.assertEqual("my-bucket", callback.conf[0])
686+
self.assertEqual("my-org", callback.conf[1])
687+
self.assertEqual("ns", callback.conf[2])
688+
self.assertIsNotNone(callback.error)
689+
self.assertIsInstance(callback.error, InfluxDBError)
690+
self.assertEqual(400, callback.error.response.status)
691+
692+
self._write_client.close()
693+
694+
650695
def test_retry_callback(self):
651696
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
652697
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429, adding_headers={'Retry-After': '1'})

0 commit comments

Comments
 (0)