Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork32k
gh-66587: Fix deadlock from pool worker death without communication#16103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:main
Are you sure you want to change the base?
Changes fromall commits
c8f4896
315ec3d
bcbd7d3
e1a9eb5
6459284
fa54afb
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -121,6 +121,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, | ||
break | ||
job, i, func, args, kwds = task | ||
put((job, i, (None, os.getpid()))) # Provide info on who took job | ||
try: | ||
result = (True, func(*args, **kwds)) | ||
except Exception as e: | ||
@@ -223,12 +224,14 @@ def __init__(self, processes=None, initializer=None, initargs=(), | ||
sentinels = self._get_sentinels() | ||
self._job_assignments = {} | ||
self._worker_handler = threading.Thread( | ||
target=Pool._handle_workers, | ||
args=(self._cache, self._taskqueue, self._ctx, self.Process, | ||
self._processes, self._pool, self._inqueue, self._outqueue, | ||
self._initializer, self._initargs, self._maxtasksperchild, | ||
self._wrap_exception, sentinels, self._change_notifier, | ||
self._job_assignments) | ||
) | ||
self._worker_handler.daemon = True | ||
self._worker_handler._state = RUN | ||
@@ -246,7 +249,8 @@ def __init__(self, processes=None, initializer=None, initargs=(), | ||
self._result_handler = threading.Thread( | ||
target=Pool._handle_results, | ||
args=(self._outqueue, self._quick_get, self._cache, | ||
self._job_assignments) | ||
) | ||
self._result_handler.daemon = True | ||
self._result_handler._state = RUN | ||
@@ -267,8 +271,6 @@ def __del__(self, _warn=warnings.warn, RUN=RUN): | ||
if self._state == RUN: | ||
_warn(f"unclosed running multiprocessing pool {self!r}", | ||
ResourceWarning, source=self) | ||
def __repr__(self): | ||
cls = self.__class__ | ||
@@ -287,7 +289,7 @@ def _get_worker_sentinels(workers): | ||
workers if hasattr(worker, "sentinel")] | ||
@staticmethod | ||
def _join_exited_workers(pool, outqueue, job_assignments): | ||
"""Cleanup after any worker processes which have exited due to reaching | ||
their specified lifetime. Returns True if any workers were cleaned up. | ||
""" | ||
@@ -297,8 +299,16 @@ def _join_exited_workers(pool): | ||
if worker.exitcode is not None: | ||
# worker exited | ||
util.debug('cleaning up worker %d' % i) | ||
pid = worker.ident | ||
worker.join() | ||
cleaned = True | ||
job_info = job_assignments.pop(pid, None) | ||
if job_info is not None: | ||
# If the worker process died without communicating back | ||
# while running a job, add a default result for it. | ||
outqueue.put( | ||
(*job_info, (False, RuntimeError("Worker died"))) | ||
) | ||
del pool[i] | ||
return cleaned | ||
@@ -333,10 +343,10 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, | ||
@staticmethod | ||
def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, | ||
initializer, initargs, maxtasksperchild, | ||
wrap_exception, job_assignments): | ||
"""Clean up any exited workers and start replacements for them. | ||
""" | ||
if Pool._join_exited_workers(pool, outqueue, job_assignments): | ||
Pool._repopulate_pool_static(ctx, Process, processes, pool, | ||
inqueue, outqueue, initializer, | ||
initargs, maxtasksperchild, | ||
@@ -507,15 +517,16 @@ def _wait_for_updates(sentinels, change_notifier, timeout=None): | ||
def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, | ||
pool, inqueue, outqueue, initializer, initargs, | ||
maxtasksperchild, wrap_exception, sentinels, | ||
change_notifier, job_assignments): | ||
thread = threading.current_thread() | ||
# Keep maintaining workers until the cache gets drained, unless the pool | ||
# is terminated. | ||
while thread._state == RUN or (cache and thread._state != TERMINATE): | ||
cls._maintain_pool(ctx, Process, processes, pool, inqueue, | ||
outqueue, initializer, initargs, | ||
maxtasksperchild, wrap_exception, | ||
job_assignments) | ||
current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels] | ||
@@ -571,7 +582,7 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache): | ||
util.debug('task handler exiting') | ||
@staticmethod | ||
def _handle_results(outqueue, get, cache, job_assignments): | ||
thread = threading.current_thread() | ||
while 1: | ||
@@ -590,12 +601,18 @@ def _handle_results(outqueue, get, cache): | ||
util.debug('result handler got sentinel') | ||
break | ||
job, i, (task_info, value) = task | ||
ambv marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
if task_info is None: | ||
# task_info is True or False when a task has completed but | ||
# None indicates information about which process has | ||
# accepted a job from the queue. | ||
job_assignments[value] = (job, i) | ||
else: | ||
try: | ||
cache[job]._set(i, (task_info, value)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Why don't you remove the job from | ||
except KeyError: | ||
pass | ||
task = job = task_info = value = None | ||
while cache and thread._state != TERMINATE: | ||
try: | ||
@@ -607,12 +624,16 @@ def _handle_results(outqueue, get, cache): | ||
if task is None: | ||
util.debug('result handler ignoring extra sentinel') | ||
continue | ||
job, i, (task_info, value) = task | ||
if task_info is None: | ||
job_assignments[value] = (job, i) | ||
else: | ||
try: | ||
cache[job]._set(i, (task_info, value)) | ||
except KeyError: | ||
pass | ||
task = job = task_info = value = None | ||
if hasattr(outqueue, '_reader'): | ||
util.debug('ensuring that outqueue is not full') | ||
@@ -672,7 +693,8 @@ def join(self): | ||
def _help_stuff_finish(inqueue, task_handler, size): | ||
# task_handler may be blocked trying to put items on inqueue | ||
util.debug('removing tasks from inqueue until task handler finished') | ||
if inqueue._reader.poll(): | ||
inqueue._rlock.acquire() | ||
ambv marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
while task_handler.is_alive() and inqueue._reader.poll(): | ||
inqueue._reader.recv() | ||
time.sleep(0) | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -3218,6 +3218,27 @@ def errback(exc): | ||
p.close() | ||
p.join() | ||
def test_pool_worker_died_without_communicating(self): | ||
# Issue22393: test fix of indefinite hang caused by worker processes | ||
# exiting abruptly (such as via os._exit()) without communicating | ||
# back to the pool at all. | ||
prog = ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. This can be written much more clearly using a multi-line string. See for example a very similar case in | ||
"import os, multiprocessing as mp; " | ||
"is_main = (__name__ == '__main__'); " | ||
"p = mp.Pool(1) if is_main else print('worker'); " | ||
"p.map(os._exit, [1]) if is_main else None; " | ||
"(p.close() or p.join()) if is_main else None" | ||
) | ||
# Only if there is a regression will this ever trigger a | ||
# subprocess.TimeoutExpired. | ||
completed_process = subprocess.run( | ||
[sys.executable, '-E', '-S', '-O', '-c', prog], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. The '-O' flag probably shouldn't be used here, but '-S' and '-E' seem fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Also, consider calling Or just use | ||
check=False, | ||
timeout=100, | ||
capture_output=True | ||
) | ||
self.assertNotEqual(0, completed_process.returncode) | ||
class _TestPoolWorkerLifetime(BaseTestCase): | ||
ALLOWED_TYPES = ('processes', ) | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix deadlock from multiprocessing.Pool worker death without communication. |
Uh oh!
There was an error while loading.Please reload this page.