Skip to content

Commit 391323e

Browse files
Avoid hanging forever after a parallel job was killed (#7834) (#7930)
* Replace multiprocessing.pool with concurrent.futures.ProcessPoolExecutor to avoid deadlocks. In a multiprocessing.pool, if a process terminates in a non-clean fashion (for example, due to OOM or a segmentation fault), the pool will silently replace said process, but the work that the process was supposed to do will never be done, causing pylint to hang indefinitely. The concurrent.futures.ProcessPoolExecutor will raise a BrokenProcessPool exception in that case, avoiding the hang. Co-authored-by: Daniël van Noord <[email protected]> (cherry picked from commit 5eca8ec) Co-authored-by: Daniel <[email protected]>
1 parent 4655b92 commit 391323e

File tree

4 files changed

+52
-17
lines changed

4 files changed

+52
-17
lines changed

doc/whatsnew/fragments/3899.bugfix

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Pylint will no longer deadlock if a parallel job is killed but fail
2+
immediately instead.
3+
4+
Closes #3899

pylint/lint/parallel.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,23 @@
2323
except ImportError:
2424
multiprocessing = None # type: ignore[assignment]
2525

26+
try:
27+
from concurrent.futures import ProcessPoolExecutor
28+
except ImportError:
29+
ProcessPoolExecutor = None # type: ignore[assignment,misc]
30+
2631
if TYPE_CHECKING:
2732
from pylint.lint import PyLinter
2833

29-
# PyLinter object used by worker processes when checking files using multiprocessing
34+
# PyLinter object used by worker processes when checking files using parallel mode
3035
# should only be used by the worker processes
3136
_worker_linter: PyLinter | None = None
3237

3338

3439
def _worker_initialize(
3540
linter: bytes, arguments: None | str | Sequence[str] = None
3641
) -> None:
37-
"""Function called to initialize a worker for a Process within a multiprocessing
38-
Pool.
42+
"""Function called to initialize a worker for a Process within a concurrent Pool.
3943
4044
:param linter: A linter-class (PyLinter) instance pickled with dill
4145
:param arguments: File or module name(s) to lint and to be added to sys.path
@@ -137,9 +141,9 @@ def check_parallel(
137141
# is identical to the linter object here. This is required so that
138142
# a custom PyLinter object can be used.
139143
initializer = functools.partial(_worker_initialize, arguments=arguments)
140-
with multiprocessing.Pool(
141-
jobs, initializer=initializer, initargs=[dill.dumps(linter)]
142-
) as pool:
144+
with ProcessPoolExecutor(
145+
max_workers=jobs, initializer=initializer, initargs=(dill.dumps(linter),)
146+
) as executor:
143147
linter.open()
144148
all_stats = []
145149
all_mapreduce_data: defaultdict[
@@ -158,7 +162,7 @@ def check_parallel(
158162
stats,
159163
msg_status,
160164
mapreduce_data,
161-
) in pool.imap_unordered(_worker_check_single_file, files):
165+
) in executor.map(_worker_check_single_file, files):
162166
linter.file_state.base_name = base_name
163167
linter.file_state._is_base_filestate = False
164168
linter.set_current_module(module, file_path)
@@ -168,8 +172,5 @@ def check_parallel(
168172
all_mapreduce_data[worker_idx].append(mapreduce_data)
169173
linter.msg_status |= msg_status
170174

171-
pool.close()
172-
pool.join()
173-
174175
_merge_mapreduce_data(linter, all_mapreduce_data)
175176
linter.stats = merge_stats([linter.stats] + all_stats)

pylint/lint/run.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@
3030
except ImportError:
3131
multiprocessing = None # type: ignore[assignment]
3232

33+
try:
34+
from concurrent.futures import ProcessPoolExecutor
35+
except ImportError:
36+
ProcessPoolExecutor = None # type: ignore[assignment,misc]
37+
3338

3439
def _query_cpu() -> int | None:
3540
"""Try to determine number of CPUs allotted in a docker container.
@@ -185,9 +190,9 @@ def __init__(
185190
)
186191
sys.exit(32)
187192
if linter.config.jobs > 1 or linter.config.jobs == 0:
188-
if multiprocessing is None:
193+
if ProcessPoolExecutor is None:
189194
print(
190-
"Multiprocessing library is missing, fallback to single process",
195+
"concurrent.futures module is missing, fallback to single process",
191196
file=sys.stderr,
192197
)
193198
linter.set_option("jobs", 1)

tests/test_check_parallel.py

+30-5
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
from __future__ import annotations
1010

1111
import argparse
12-
import multiprocessing
1312
import os
13+
from concurrent.futures import ProcessPoolExecutor
14+
from concurrent.futures.process import BrokenProcessPool
1415
from pickle import PickleError
1516

1617
import dill
@@ -182,10 +183,10 @@ def test_worker_initialize_pickling(self) -> None:
182183
"""
183184
linter = PyLinter(reporter=Reporter())
184185
linter.attribute = argparse.ArgumentParser() # type: ignore[attr-defined]
185-
with multiprocessing.Pool(
186-
2, initializer=worker_initialize, initargs=[dill.dumps(linter)]
187-
) as pool:
188-
pool.imap_unordered(print, [1, 2])
186+
with ProcessPoolExecutor(
187+
max_workers=2, initializer=worker_initialize, initargs=(dill.dumps(linter),)
188+
) as executor:
189+
executor.map(print, [1, 2])
189190

190191
def test_worker_check_single_file_uninitialised(self) -> None:
191192
pylint.lint.parallel._worker_linter = None
@@ -571,3 +572,27 @@ def test_map_reduce(self, num_files, num_jobs, num_checkers):
571572
assert str(stats_single_proc.by_msg) == str(
572573
stats_check_parallel.by_msg
573574
), "Single-proc and check_parallel() should return the same thing"
575+
576+
@pytest.mark.timeout(5)
577+
def test_no_deadlock_due_to_initializer_error(self) -> None:
578+
"""Tests that an error in the initializer for the parallel jobs doesn't
579+
lead to a deadlock.
580+
"""
581+
linter = PyLinter(reporter=Reporter())
582+
583+
linter.register_checker(SequentialTestChecker(linter))
584+
585+
# Create a dummy file, the actual contents of which will be ignored by the
586+
# register test checkers, but it will trigger at least a single-job to be run.
587+
single_file_container = _gen_file_datas(count=1)
588+
589+
# The error in the initializer should trigger a BrokenProcessPool exception
590+
with pytest.raises(BrokenProcessPool):
591+
check_parallel(
592+
linter,
593+
jobs=1,
594+
files=iter(single_file_container),
595+
# This will trigger an exception in the initializer for the parallel jobs
596+
# because arguments has to be an Iterable.
597+
arguments=1, # type: ignore[arg-type]
598+
)

0 commit comments

Comments
 (0)