Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork32k
gh-66587: Fix deadlock from pool worker death without communication#16103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:main
Are you sure you want to change the base?
gh-66587: Fix deadlock from pool worker death without communication#16103
Conversation
…ueue; adds test for issue22393/issue38084.
This looks good to me, simply a few remarks:
Also pinging@tomMoral |
For mine, I think this fix seems more elegant than#10441, but the tests in that PR seem to have more coverage. I personally prefer to just have the task fail, and the pool continue. The current behaviour is that the broken worker is immediately replaced and other work continues, but if you wait on the failed task then it will never complete. Now it does complete (with a failure), which means robust code can re-queue it if appropriate. I don't see any reason to tear down the entire pool. Few comments on the PR incoming. |
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Lib/multiprocessing/pool.py Outdated
worker.join() | ||
cleaned = True | ||
if pid in job_assignments: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
ifpidinjob_assignments: | |
job=job_assignments.pop(pid,None) | |
ifjob: | |
outqueue.put((job,i, (False,RuntimeError("Worker died")))) |
And some additional simplification below, of course.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Here is a batch of comments.
I have to say that I like this solution as it is the most robust way of handling this, (a kind of scheduler). But it also comes with more complexity and increase communication needs -> more changes for deadlocks.
One of the main argument for the fail on error design is that there is no way there is no way to know in the main process if the worker that died had a lock on one of the communication queue. In this situation, the only way to recover the system and avoid a deadlock is to kill thePool
and re-spawn one.
job_assignments[value] = job | ||
else: | ||
try: | ||
cache[job]._set(i, (task_info, value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Why don't you remove the job fromjob_assignement
here? It would avoid unecessary operation when a worker died gracefully.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Co-Authored-By: Steve Dower <steve.dower@microsoft.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Additional tests would certainly be a good idea.
# Issue22393: test fix of indefinite hang caused by worker processes | ||
# exiting abruptly (such as via os._exit()) without communicating | ||
# back to the pool at all. | ||
prog = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This can be written much more clearly using a multi-line string. See for example a very similar case intest_shared_memory_cleaned_after_process_termination
in this file.
# Only if there is a regression will this ever trigger a | ||
# subprocess.TimeoutExpired. | ||
completed_process = subprocess.run( | ||
[sys.executable, '-E', '-S', '-O', '-c', prog], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
The '-O' flag probably shouldn't be used here, but '-S' and '-E' seem fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Also, consider callingtest.support.script_utils.interpreter_requires_environment()
, and only use the '-E' flag if that returnsFalse
, as done by the other Python script running utils intest.support.script_utils
.
Or just usetest.support.script_utils.run_python_until_end()
instead ofsubprocess.run()
.
@applio, I'm not sure where this one is at, but I believe there are some comments that still need to be addressed. I don't know if it's waiting on anything else, but it would probably be nice to get this merged. |
Closing and re-opening to re-trigger CI. |
bedevere-bot commentedSep 23, 2021
bedevere-bot commentedSep 23, 2021
This missed the boat for inclusion in Python 3.9 which accepts security fixes only as of today. |
The following commit authors need to sign the Contributor License Agreement: |
Uh oh!
There was an error while loading.Please reload this page.
Adds tracking of which worker process in the pool takes which job from the queue.
When a worker process dies without communication, its task/job is also lost. By tracking what job that worker took off the job queue as its task, upon detecting the death, the parent process can add an item to the result queue indicating the failure of that task/job.
In case of a future regression, the supplied test uses subprocess to constrain the test with a timeout to ensure an indefinite hang does not interfere with the running of tests.
https://bugs.python.org/issue22393