Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-66587: Fix deadlock from pool worker death without communication#16103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
applio wants to merge6 commits intopython:main
base:main
Choose a base branch
Loading
fromapplio:fix_multiprocessing_worker_died_indefinite_hang
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 45 additions & 23 deletionsLib/multiprocessing/pool.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -121,6 +121,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
break

job, i, func, args, kwds = task
put((job, i, (None, os.getpid()))) # Provide info on who took job
try:
result = (True, func(*args, **kwds))
except Exception as e:
Expand DownExpand Up@@ -223,12 +224,14 @@ def __init__(self, processes=None, initializer=None, initargs=(),

sentinels = self._get_sentinels()

self._job_assignments = {}
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self._cache, self._taskqueue, self._ctx, self.Process,
self._processes, self._pool, self._inqueue, self._outqueue,
self._initializer, self._initargs, self._maxtasksperchild,
self._wrap_exception, sentinels, self._change_notifier)
self._wrap_exception, sentinels, self._change_notifier,
self._job_assignments)
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
Expand All@@ -246,7 +249,8 @@ def __init__(self, processes=None, initializer=None, initargs=(),

self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
args=(self._outqueue, self._quick_get, self._cache,
self._job_assignments)
)
self._result_handler.daemon = True
self._result_handler._state = RUN
Expand All@@ -267,8 +271,6 @@ def __del__(self, _warn=warnings.warn, RUN=RUN):
if self._state == RUN:
_warn(f"unclosed running multiprocessing pool {self!r}",
ResourceWarning, source=self)
if getattr(self, '_change_notifier', None) is not None:
self._change_notifier.put(None)

def __repr__(self):
cls = self.__class__
Expand All@@ -287,7 +289,7 @@ def _get_worker_sentinels(workers):
workers if hasattr(worker, "sentinel")]

@staticmethod
def _join_exited_workers(pool):
def _join_exited_workers(pool, outqueue, job_assignments):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
Expand All@@ -297,8 +299,16 @@ def _join_exited_workers(pool):
if worker.exitcode is not None:
# worker exited
util.debug('cleaning up worker %d' % i)
pid = worker.ident
worker.join()
cleaned = True
job_info = job_assignments.pop(pid, None)
if job_info is not None:
# If the worker process died without communicating back
# while running a job, add a default result for it.
outqueue.put(
(*job_info, (False, RuntimeError("Worker died")))
)
del pool[i]
return cleaned

Expand DownExpand Up@@ -333,10 +343,10 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
@staticmethod
def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
initializer, initargs, maxtasksperchild,
wrap_exception):
wrap_exception, job_assignments):
"""Clean up any exited workers and start replacements for them.
"""
if Pool._join_exited_workers(pool):
if Pool._join_exited_workers(pool, outqueue, job_assignments):
Pool._repopulate_pool_static(ctx, Process, processes, pool,
inqueue, outqueue, initializer,
initargs, maxtasksperchild,
Expand DownExpand Up@@ -507,15 +517,16 @@ def _wait_for_updates(sentinels, change_notifier, timeout=None):
def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
pool, inqueue, outqueue, initializer, initargs,
maxtasksperchild, wrap_exception, sentinels,
change_notifier):
change_notifier, job_assignments):
thread = threading.current_thread()

# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
while thread._state == RUN or (cache and thread._state != TERMINATE):
cls._maintain_pool(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception)
maxtasksperchild, wrap_exception,
job_assignments)

current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]

Expand DownExpand Up@@ -571,7 +582,7 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
util.debug('task handler exiting')

@staticmethod
def _handle_results(outqueue, get, cache):
def _handle_results(outqueue, get, cache, job_assignments):
thread = threading.current_thread()

while 1:
Expand All@@ -590,12 +601,18 @@ def _handle_results(outqueue, get, cache):
util.debug('result handler got sentinel')
break

job, i, obj = task
try:
cache[job]._set(i, obj)
except KeyError:
pass
task = job = obj = None
job, i, (task_info, value) = task
if task_info is None:
# task_info is True or False when a task has completed but
# None indicates information about which process has
# accepted a job from the queue.
job_assignments[value] = (job, i)
else:
try:
cache[job]._set(i, (task_info, value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Why don't you remove the job fromjob_assignement here? It would avoid unecessary operation when a worker died gracefully.

except KeyError:
pass
task = job = task_info = value = None

while cache and thread._state != TERMINATE:
try:
Expand All@@ -607,12 +624,16 @@ def _handle_results(outqueue, get, cache):
if task is None:
util.debug('result handler ignoring extra sentinel')
continue
job, i, obj = task
try:
cache[job]._set(i, obj)
except KeyError:
pass
task = job = obj = None

job, i, (task_info, value) = task
if task_info is None:
job_assignments[value] = (job, i)
else:
try:
cache[job]._set(i, (task_info, value))
except KeyError:
pass
task = job = task_info = value = None

if hasattr(outqueue, '_reader'):
util.debug('ensuring that outqueue is not full')
Expand DownExpand Up@@ -672,7 +693,8 @@ def join(self):
def _help_stuff_finish(inqueue, task_handler, size):
# task_handler may be blocked trying to put items on inqueue
util.debug('removing tasks from inqueue until task handler finished')
inqueue._rlock.acquire()
if inqueue._reader.poll():
inqueue._rlock.acquire()
while task_handler.is_alive() and inqueue._reader.poll():
inqueue._reader.recv()
time.sleep(0)
Expand Down
21 changes: 21 additions & 0 deletionsLib/test/_test_multiprocessing.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3218,6 +3218,27 @@ def errback(exc):
p.close()
p.join()

def test_pool_worker_died_without_communicating(self):
# Issue22393: test fix of indefinite hang caused by worker processes
# exiting abruptly (such as via os._exit()) without communicating
# back to the pool at all.
prog = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This can be written much more clearly using a multi-line string. See for example a very similar case intest_shared_memory_cleaned_after_process_termination in this file.

"import os, multiprocessing as mp; "
"is_main = (__name__ == '__main__'); "
"p = mp.Pool(1) if is_main else print('worker'); "
"p.map(os._exit, [1]) if is_main else None; "
"(p.close() or p.join()) if is_main else None"
)
# Only if there is a regression will this ever trigger a
# subprocess.TimeoutExpired.
completed_process = subprocess.run(
[sys.executable, '-E', '-S', '-O', '-c', prog],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The '-O' flag probably shouldn't be used here, but '-S' and '-E' seem fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Also, consider callingtest.support.script_utils.interpreter_requires_environment(), and only use the '-E' flag if that returnsFalse, as done by the other Python script running utils intest.support.script_utils.

Or just usetest.support.script_utils.run_python_until_end() instead ofsubprocess.run().

check=False,
timeout=100,
capture_output=True
)
self.assertNotEqual(0, completed_process.returncode)

class _TestPoolWorkerLifetime(BaseTestCase):
ALLOWED_TYPES = ('processes', )

Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
Fix deadlock from multiprocessing.Pool worker death without communication.
Loading

[8]ページ先頭

©2009-2025 Movatter.jp