Skip to content

Commit 6bdb4c3

Browse files
committed
fix: prevent process deadlock if the batch write thread dies
This is one part of a fix for influxdata#558 It: * Adds a WriteOption `max_close_wait` (default 500,000ms) * Adjust `__del__` so that we'll only wait `max_close_wait`ms for queued writes to complete * Adds a warning if the threshold is hit
1 parent a07fea6 commit 6bdb4c3

File tree

1 file changed

+27
-2
lines changed

1 file changed

+27
-2
lines changed

influxdb_client/client/write_api.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5151
max_retry_delay=125_000,
5252
max_retry_time=180_000,
5353
exponential_base=2,
54+
max_close_wait=300_000,
5455
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
5556
"""
5657
Create write api configuration.
@@ -66,6 +67,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
6667
:param max_retry_delay: the maximum delay between each retry attempt in milliseconds
6768
:param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled
6869
:param exponential_base: base for the exponential retry delay
70+
:parama max_close_wait: the maximum time to wait for writes to be flushed if close() is called
6971
:param write_scheduler:
7072
"""
7173
self.write_type = write_type
@@ -78,6 +80,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
7880
self.max_retry_time = max_retry_time
7981
self.exponential_base = exponential_base
8082
self.write_scheduler = write_scheduler
83+
self.max_close_wait = max_close_wait
8184

8285
def to_retry_strategy(self, **kwargs):
8386
"""
@@ -410,9 +413,31 @@ def __del__(self):
410413
self._subject.dispose()
411414
self._subject = None
412415

413-
# Wait for finish writing
416+
"""
417+
We impose a maximum wait time to ensure that we do not cause a deadlock if the
418+
background thread has exited abnormally
419+
420+
Each iteration waits 100ms, but sleep expects the unit to be seconds so convert
421+
the maximum wait time to seconds.
422+
423+
We keep a counter of how long we've waited
424+
"""
425+
max_wait_time = self._write_options.max_close_wait / 1000
426+
waited = 0
427+
sleep_period = 0.1
428+
429+
# Wait for writing to finish
414430
while not self._disposable.is_disposed:
415-
sleep(0.1)
431+
sleep(sleep_period)
432+
waited += sleep_period
433+
434+
# Have we reached the upper limit?
435+
if waited >= max_wait_time:
436+
logger.warning(
437+
"Reached max_close_wait (%s seconds) waiting for batches to finish writing. Force closing",
438+
max_wait_time
439+
)
440+
break
416441

417442
if self._disposable:
418443
self._disposable = None

0 commit comments

Comments
 (0)