Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork32.3k
gh-132969: Fix error/hang when shutdown(wait=False) and task exited abnormally#133222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes from6 commits
8ac1fcf
1774445
0938eeb
dbc0ba8
99ab321
fe3650c
40cefc2
c177c91
2cd70b1
57265b5
d0c2bb0
709d9fd
1a8919b
87eb623
ec2543a
3b3721f
7ef7872
efd0a1d
b335d38
b060786
c6457c3
2beb0d9
65f30b0
54ae7b8
e132f5e
9908810
f5f3dc2
0fe743b
310765e
86d0c2a
4f46875
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -755,6 +755,10 @@ def _start_executor_manager_thread(self): | ||
self._executor_manager_thread_wakeup | ||
def _adjust_process_count(self): | ||
# gh-132969 | ||
ogbiggles marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
if self._processes is None: | ||
return | ||
# if there's an idle process, we don't need to spawn a new one. | ||
if self._idle_worker_semaphore.acquire(blocking=False): | ||
return | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -20,6 +20,16 @@ def sleep_and_print(t, msg): | ||
sys.stdout.flush() | ||
def failing_task_132969(n: int) -> int: | ||
ogbiggles marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
raise ValueError("failing task") | ||
def good_task_132969(n: int) -> int: | ||
time.sleep(0.1 * n) | ||
return n | ||
class ExecutorShutdownTest: | ||
def test_run_after_shutdown(self): | ||
self.executor.shutdown() | ||
@@ -330,6 +340,46 @@ def test_shutdown_no_wait(self): | ||
# shutdown. | ||
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) | ||
def _run_test_issue_132969(self, max_workers: int) -> int: | ||
if sys.platform == "win32": | ||
raise unittest.SkipTest("skip test since forkserver is not available on Windows") | ||
ogbiggles marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
# max_workers=2 will repro exception | ||
# max_workers=4 will repro exception and then hang | ||
import multiprocessing as mp | ||
ogbiggles marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
# Repro conditions | ||
# max_tasks_per_child=1 | ||
# a task ends abnormally | ||
# shutdown(wait=False) is called | ||
executor = futures.ProcessPoolExecutor( | ||
max_workers=max_workers, | ||
max_tasks_per_child=1, | ||
mp_context=mp.get_context("forkserver")) | ||
ogbiggles marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
f1 = executor.submit(good_task_132969, 1) | ||
f2 = executor.submit(failing_task_132969, 2) | ||
f3 = executor.submit(good_task_132969, 3) | ||
result:int = 0 | ||
ogbiggles marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
try: | ||
result += f1.result() | ||
result += f2.result() | ||
result += f3.result() | ||
except ValueError: | ||
# stop processing results upon first exception | ||
pass | ||
executor.shutdown(wait=False) | ||
return result | ||
def test_shutdown_len_exception_132969(self): | ||
ogbiggles marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
result = self._run_test_issue_132969(2) | ||
self.assertEqual(result, 1) | ||
def test_shutdown_process_hang_132969(self): | ||
result = self._run_test_issue_132969(4) | ||
self.assertEqual(result, 1) | ||
create_executor_tests(globals(), ProcessPoolShutdownTest, | ||
executor_mixins=(ProcessPoolForkMixin, | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fixes error+hang when ProcessPoolExecutor shutdown called with wait=False and a task ended abnormally. | ||
ogbiggles marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. |
Uh oh!
There was an error while loading.Please reload this page.