Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork32k
bpo-22393: Fix multiprocessing.Pool hangs if a worker process dies unexpectedly#10441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes from1 commit
d37e360
bc08d85
4eac116
c8ba754
b36663b
f8500e2
848d304
1f93322
a172df6
4d614b3
65f6eaf
6d9c4ca
706f178
933c77a
efcc185
7c21ddd
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
- Loading branch information
Uh oh!
There was an error while loading.Please reload this page.
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -2237,6 +2237,8 @@ with the :class:`Pool` class. | ||
one of the workers of a :class:`Pool` has terminated in a non-clean | ||
fashion (for example, if it was killed from the outside). | ||
oesteban marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
.. versionadded:: 3.8 | ||
.. class:: AsyncResult | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -290,12 +290,12 @@ def _maintain_pool(self): | ||
if need_repopulate is None: | ||
with self._worker_state_lock: | ||
self._worker_handler._state = BROKEN | ||
err = BrokenProcessPool( | ||
'A worker in the pool terminated abruptly.') | ||
# Exhaust MapResult with errors | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. This also applies to | ||
for i, cache_ent in list(self._cache.items()): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Out of curiosity, is there any reason why we iterate on a list of of | ||
cache_ent._set_all((False, err)) | ||
def _setup_queues(self): | ||
self._inqueue = self._ctx.SimpleQueue() | ||
@@ -679,7 +679,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, | ||
# worker has not yet exited | ||
util.debug('cleaning up worker %d' % p.pid) | ||
p.join() | ||
util.debug('terminate poolfinished') | ||
def __enter__(self): | ||
self._check_running() | ||
@@ -731,6 +731,9 @@ def _set(self, i, obj): | ||
self._event.set() | ||
del self._cache[self._job] | ||
def _set_all(self, obj): | ||
self._set(0, obj) | ||
AsyncResult = ApplyResult # create alias -- see #17805 | ||
# | ||
@@ -774,6 +777,12 @@ def _set(self, i, success_result): | ||
del self._cache[self._job] | ||
self._event.set() | ||
def _set_all(self, obj): | ||
item = 0 | ||
while self._number_left > 0: | ||
self._set(item, obj) | ||
item += 1 | ||
# | ||
# Class whose instances are returned by `Pool.imap()` | ||
# | ||
@@ -831,6 +840,10 @@ def _set(self, i, obj): | ||
if self._index == self._length: | ||
del self._cache[self._job] | ||
def _set_all(self, obj): | ||
while self._index != self._length: | ||
self._set(self._index, obj) | ||
def _set_length(self, length): | ||
with self._cond: | ||
self._length = length | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -2584,10 +2584,13 @@ def raising(): | ||
def unpickleable_result(): | ||
return lambda: 42 | ||
def bad_exit_os(value): | ||
if value: | ||
from os import _exit as exit | ||
# from sys import exit | ||
exit(123) | ||
defbad_exit_sys(value): | ||
if value: | ||
from sys import exit | ||
exit(123) | ||
oesteban marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
@@ -2633,24 +2636,52 @@ def errback(exc): | ||
p.close() | ||
p.join() | ||
def test_external_signal_kills_worker_apply_async(self): | ||
"""mimics that a worker was killed from external signal""" | ||
from multiprocessing.pool import BrokenProcessPool | ||
p = multiprocessing.Pool(2) | ||
res = p.apply_async(time.sleep, (5,)) | ||
res = p.apply_async(time.sleep, (2,)) | ||
res = p.apply_async(time.sleep, (1,)) | ||
# Kill one of the pool workers, after some have entered | ||
# execution (hence, the 0.5s wait) | ||
time.sleep(0.5) | ||
pid = p._pool[0].pid | ||
os.kill(pid, signal.SIGTERM) | ||
oesteban marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
with self.assertRaises(BrokenProcessPool): | ||
res.get() | ||
oesteban marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
p.close() | ||
p.join() | ||
def test_external_signal_kills_worker_imap_unordered(self): | ||
"""mimics that a worker was killed from external signal""" | ||
from multiprocessing.pool import BrokenProcessPool | ||
p = multiprocessing.Pool(2) | ||
with self.assertRaises(BrokenProcessPool): | ||
res = list(p.imap_unordered(bad_exit_os, [0, 0, 1, 0])) | ||
p.close() | ||
p.join() | ||
def test_external_signal_kills_worker_map_async1(self): | ||
"""mimics that a worker was killed from external signal""" | ||
from multiprocessing.pool import BrokenProcessPool | ||
p = multiprocessing.Pool(2) | ||
res = p.map_async(time.sleep, [5] * 10) | ||
# Kill one of the pool workers, after some have entered | ||
# execution (hence, the 0.5s wait) | ||
time.sleep(0.5) | ||
pid = p._pool[0].pid | ||
os.kill(pid, signal.SIGTERM) | ||
with self.assertRaises(BrokenProcessPool): | ||
res.get() | ||
p.close() | ||
p.join() | ||
def test_external_signal_kills_worker_map_async2(self): | ||
"""mimics that a worker was killed from external signal""" | ||
from multiprocessing.pool import BrokenProcessPool | ||
p = multiprocessing.Pool(2) | ||
res = p.map_async(time.sleep, (2, )) | ||
# Kill one of the pool workers. | ||
pid = p._pool[0].pid | ||
os.kill(pid, signal.SIGTERM) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Same here. | ||
@@ -2659,14 +2690,47 @@ def test_broken_process_pool2(self): | ||
p.close() | ||
p.join() | ||
def test_map_async_with_broken_pool(self): | ||
"""submit task to a broken pool""" | ||
from multiprocessing.pool import BrokenProcessPool | ||
p = multiprocessing.Pool(2) | ||
pid = p._pool[0].pid | ||
res = p.map_async(time.sleep, (2, )) | ||
# Kill one of the pool workers. | ||
os.kill(pid, signal.SIGTERM) | ||
with self.assertRaises(BrokenProcessPool): | ||
res.get() | ||
p.close() | ||
p.join() | ||
def test_internal_signal_kills_worker_map1(self): | ||
from multiprocessing.pool import BrokenProcessPool | ||
p = multiprocessing.Pool(2) | ||
with self.assertRaises(BrokenProcessPool): | ||
res = p.map(bad_exit_os, [0, 0, 1, 0]) | ||
p.close() | ||
p.join() | ||
def test_internal_signal_kills_worker_map2(self): | ||
from multiprocessing.pool import BrokenProcessPool | ||
p = multiprocessing.Pool(2) | ||
with self.assertRaises(BrokenProcessPool): | ||
res = p.map(bad_exit_sys, [0, 0, 1, 0]) | ||
p.close() | ||
p.join() | ||
def test_internal_signal_kills_worker_map_async3(self): | ||
from multiprocessing.pool import BrokenProcessPool | ||
p = multiprocessing.Pool(2) | ||
res = p.map_async(time.sleep, [5] * 10) | ||
# Kill one of the pool workers, after some have entered | ||
# execution (hence, the 0.5s wait) | ||
time.sleep(0.5) | ||
p._pool[0].terminate() | ||
with self.assertRaises(BrokenProcessPool): | ||
res.get() | ||
p.close() | ||
p.join() | ||
class _TestPoolWorkerLifetime(BaseTestCase): | ||
ALLOWED_TYPES = ('processes', ) | ||