Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

bpo-22393: Fix multiprocessing.Pool hangs if a worker process dies unexpectedly#10441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
oesteban wants to merge16 commits intopython:mainfromoesteban:fix-issue-22393
Closed
Show file tree
Hide file tree
Changes from1 commit
Commits
Show all changes
16 commits
Select commitHold shift + click to select a range
d37e360
add tests
oestebanNov 6, 2018
bc08d85
base patch
oestebanNov 9, 2018
4eac116
finishing up fix
oestebanNov 9, 2018
c8ba754
cleanup not needed imports
oestebanNov 9, 2018
b36663b
avert race condition
oestebanNov 9, 2018
f8500e2
add documentation
oestebanNov 9, 2018
848d304
make patchcheck
oestebanNov 9, 2018
1f93322
add News entry
oestebanNov 9, 2018
a172df6
stylistic fixes, avoid shadowing ``worker`` variable name
oestebanNov 12, 2018
4d614b3
address some of @effigies' comments
oestebanNov 12, 2018
65f6eaf
protect changes of state of worker handler thread with lock
oestebanNov 12, 2018
6d9c4ca
Merge branch 'fix-issue-22393' of github.com:oesteban/cpython into fi…
oestebanNov 12, 2018
706f178
Merge remote-tracking branch 'upstream/master' into fix-issue-22393
oestebanDec 17, 2018
933c77a
address @pitrou's comments
oestebanDec 18, 2018
efcc185
fix typo
oestebanDec 18, 2018
7c21ddd
fix typo (sorry for the rebound commit)
oestebanDec 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
PrevPrevious commit
NextNext commit
address@pitrou's comments
  • Loading branch information
@oesteban
oesteban committedDec 18, 2018
commit933c77a7dd86bf1ea1f9a63e5569ff387d8b366e
2 changes: 2 additions & 0 deletionsDoc/library/multiprocessing.rst
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2237,6 +2237,8 @@ with the :class:`Pool` class.
one of the workers of a :class:`Pool` has terminated in a non-clean
fashion (for example, if it was killed from the outside).

.. versionadded:: 3.8


.. class:: AsyncResult

Expand Down
25 changes: 19 additions & 6 deletionsLib/multiprocessing/pool.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -290,12 +290,12 @@ def _maintain_pool(self):
if need_repopulate is None:
with self._worker_state_lock:
self._worker_handler._state = BROKEN

err = BrokenProcessPool(
'A worker in the pool terminated abruptly.')
# Exhaust MapResult with errors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This also applies toApplyResult right?

for i, cache_ent in list(self._cache.items()):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Out of curiosity, is there any reason why we iterate on a list of ofself._cache?

err = BrokenProcessPool(
'A worker of the pool terminated abruptly.')
# Exhaust MapResult with errors
while cache_ent._number_left > 0:
cache_ent._set(i, (False, err))
cache_ent._set_all((False, err))

def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
Expand DownExpand Up@@ -679,7 +679,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
# worker has not yet exited
util.debug('cleaning up worker %d' % p.pid)
p.join()
util.debug('terminate poolfinalized')
util.debug('terminate poolfinished')

def __enter__(self):
self._check_running()
Expand DownExpand Up@@ -731,6 +731,9 @@ def _set(self, i, obj):
self._event.set()
del self._cache[self._job]

def _set_all(self, obj):
self._set(0, obj)

AsyncResult = ApplyResult # create alias -- see #17805

#
Expand DownExpand Up@@ -774,6 +777,12 @@ def _set(self, i, success_result):
del self._cache[self._job]
self._event.set()

def _set_all(self, obj):
item = 0
while self._number_left > 0:
self._set(item, obj)
item += 1

#
# Class whose instances are returned by `Pool.imap()`
#
Expand DownExpand Up@@ -831,6 +840,10 @@ def _set(self, i, obj):
if self._index == self._length:
del self._cache[self._job]

def _set_all(self, obj):
while self._index != self._length:
self._set(self._index, obj)

def _set_length(self, length):
with self._cond:
self._length = length
Expand Down
86 changes: 75 additions & 11 deletionsLib/test/_test_multiprocessing.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2584,10 +2584,13 @@ def raising():
def unpickleable_result():
return lambda: 42

def waiting(args):
time.sleep(7)
def bad_exit_os(value):
if value:
from os import _exit as exit
# from sys import exit
exit(123)

defbad_exit(value):
defbad_exit_sys(value):
if value:
from sys import exit
exit(123)
Expand DownExpand Up@@ -2633,24 +2636,52 @@ def errback(exc):
p.close()
p.join()

def test_broken_process_pool1(self):
def test_external_signal_kills_worker_apply_async(self):
"""mimics that a worker was killed from external signal"""
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
res = p.map_async(waiting, range(10))
# Kill one of the pool workers.
waiting(None)
res = p.apply_async(time.sleep, (5,))
res = p.apply_async(time.sleep, (2,))
res = p.apply_async(time.sleep, (1,))
# Kill one of the pool workers, after some have entered
# execution (hence, the 0.5s wait)
time.sleep(0.5)
pid = p._pool[0].pid
os.kill(pid, signal.SIGTERM)
with self.assertRaises(BrokenProcessPool):
res.get()
p.close()
p.join()

def test_external_signal_kills_worker_imap_unordered(self):
"""mimics that a worker was killed from external signal"""
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
with self.assertRaises(BrokenProcessPool):
res = list(p.imap_unordered(bad_exit_os, [0, 0, 1, 0]))
p.close()
p.join()

def test_external_signal_kills_worker_map_async1(self):
"""mimics that a worker was killed from external signal"""
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
res = p.map_async(time.sleep, [5] * 10)
# Kill one of the pool workers, after some have entered
# execution (hence, the 0.5s wait)
time.sleep(0.5)
pid = p._pool[0].pid
os.kill(pid, signal.SIGTERM)
with self.assertRaises(BrokenProcessPool):
res.get()
p.close()
p.join()

def test_broken_process_pool2(self):
def test_external_signal_kills_worker_map_async2(self):
"""mimics that a worker was killed from external signal"""
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
res = p.map_async(waiting, [1])
res = p.map_async(time.sleep, (2, ))
# Kill one of the pool workers.
pid = p._pool[0].pid
os.kill(pid, signal.SIGTERM)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Same here.

Expand All@@ -2659,14 +2690,47 @@ def test_broken_process_pool2(self):
p.close()
p.join()

def test_broken_process_pool3(self):
def test_map_async_with_broken_pool(self):
"""submit task to a broken pool"""
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
pid = p._pool[0].pid
res = p.map_async(time.sleep, (2, ))
# Kill one of the pool workers.
os.kill(pid, signal.SIGTERM)
with self.assertRaises(BrokenProcessPool):
res = p.map(bad_exit, [0, 0, 1, 0])
res.get()
p.close()
p.join()

def test_internal_signal_kills_worker_map1(self):
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
with self.assertRaises(BrokenProcessPool):
res = p.map(bad_exit_os, [0, 0, 1, 0])
p.close()
p.join()

def test_internal_signal_kills_worker_map2(self):
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
with self.assertRaises(BrokenProcessPool):
res = p.map(bad_exit_sys, [0, 0, 1, 0])
p.close()
p.join()

def test_internal_signal_kills_worker_map_async3(self):
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
res = p.map_async(time.sleep, [5] * 10)
# Kill one of the pool workers, after some have entered
# execution (hence, the 0.5s wait)
time.sleep(0.5)
p._pool[0].terminate()
with self.assertRaises(BrokenProcessPool):
res.get()
p.close()
p.join()

class _TestPoolWorkerLifetime(BaseTestCase):
ALLOWED_TYPES = ('processes', )
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp