Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitba05a4e

Browse files
csm10495picnixzblurb-it[bot]
authored
gh-128041: Addterminate_workers andkill_workers methods to ProcessPoolExecutor (GH-130849)
This adds two new methods to `multiprocessing`'s `ProcessPoolExecutor`:- **`terminate_workers()`**: forcefully terminates worker processes using `Process.terminate()`- **`kill_workers()`**: forcefully kills worker processes using `Process.kill()`These methods provide users with a direct way to stop worker processes without `shutdown()` or relying on implementation details, addressing situations where immediate termination is needed.Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>Co-authored-by: Sam Gross@colesburyCommit-message-mostly-authored-by: Claude Sonnet 3.7 (because why not -greg)
1 parent02de9cb commitba05a4e

File tree

5 files changed

+224
-0
lines changed

5 files changed

+224
-0
lines changed

‎Doc/library/concurrent.futures.rst‎

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,30 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
415415
require the *fork* start method for:class:`ProcessPoolExecutor` you must
416416
explicitly pass ``mp_context=multiprocessing.get_context("fork")``.
417417

418+
..method::terminate_workers()
419+
420+
Attempt to terminate all living worker processes immediately by calling
421+
:meth:`Process.terminate <multiprocessing.Process.terminate>` on each of them.
422+
Internally, it will also call:meth:`Executor.shutdown` to ensure that all
423+
other resources associated with the executor are freed.
424+
425+
After calling this method the caller should no longer submit tasks to the
426+
executor.
427+
428+
..versionadded::next
429+
430+
..method::kill_workers()
431+
432+
Attempt to kill all living worker processes immediately by calling
433+
:meth:`Process.kill <multiprocessing.Process.kill>` on each of them.
434+
Internally, it will also call:meth:`Executor.shutdown` to ensure that all
435+
other resources associated with the executor are freed.
436+
437+
After calling this method the caller should no longer submit tasks to the
438+
executor.
439+
440+
..versionadded::next
441+
418442
.. _processpoolexecutor-example:
419443

420444
ProcessPoolExecutor Example

‎Doc/whatsnew/3.14.rst‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,11 @@ contextvars
444444
* Support context manager protocol by:class:`contextvars.Token`.
445445
(Contributed by Andrew Svetlov in:gh:`129889`.)
446446

447+
* Add:meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
448+
:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
449+
ways to terminate or kill all living worker processes in the given pool.
450+
(Contributed by Charles Machalow in:gh:`130849`.)
451+
447452

448453
ctypes
449454
------

‎Lib/concurrent/futures/process.py‎

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,14 @@ class BrokenProcessPool(_base.BrokenExecutor):
626626
while a future was in the running state.
627627
"""
628628

629+
_TERMINATE="terminate"
630+
_KILL="kill"
631+
632+
_SHUTDOWN_CALLBACK_OPERATION= {
633+
_TERMINATE,
634+
_KILL
635+
}
636+
629637

630638
classProcessPoolExecutor(_base.Executor):
631639
def__init__(self,max_workers=None,mp_context=None,
@@ -855,3 +863,66 @@ def shutdown(self, wait=True, *, cancel_futures=False):
855863
self._executor_manager_thread_wakeup=None
856864

857865
shutdown.__doc__=_base.Executor.shutdown.__doc__
866+
867+
def_force_shutdown(self,operation):
868+
"""Attempts to terminate or kill the executor's workers based off the
869+
given operation. Iterates through all of the current processes and
870+
performs the relevant task if the process is still alive.
871+
872+
After terminating workers, the pool will be in a broken state
873+
and no longer usable (for instance, new tasks should not be
874+
submitted).
875+
"""
876+
ifoperationnotin_SHUTDOWN_CALLBACK_OPERATION:
877+
raiseValueError(f"Unsupported operation:{operation!r}")
878+
879+
processes= {}
880+
ifself._processes:
881+
processes=self._processes.copy()
882+
883+
# shutdown will invalidate ._processes, so we copy it right before
884+
# calling. If we waited here, we would deadlock if a process decides not
885+
# to exit.
886+
self.shutdown(wait=False,cancel_futures=True)
887+
888+
ifnotprocesses:
889+
return
890+
891+
forprocinprocesses.values():
892+
try:
893+
ifnotproc.is_alive():
894+
continue
895+
exceptValueError:
896+
# The process is already exited/closed out.
897+
continue
898+
899+
try:
900+
ifoperation==_TERMINATE:
901+
proc.terminate()
902+
elifoperation==_KILL:
903+
proc.kill()
904+
exceptProcessLookupError:
905+
# The process just ended before our signal
906+
continue
907+
908+
defterminate_workers(self):
909+
"""Attempts to terminate the executor's workers.
910+
Iterates through all of the current worker processes and terminates
911+
each one that is still alive.
912+
913+
After terminating workers, the pool will be in a broken state
914+
and no longer usable (for instance, new tasks should not be
915+
submitted).
916+
"""
917+
returnself._force_shutdown(operation=_TERMINATE)
918+
919+
defkill_workers(self):
920+
"""Attempts to kill the executor's workers.
921+
Iterates through all of the current worker processes and kills
922+
each one that is still alive.
923+
924+
After killing workers, the pool will be in a broken state
925+
and no longer usable (for instance, new tasks should not be
926+
submitted).
927+
"""
928+
returnself._force_shutdown(operation=_KILL)

‎Lib/test/test_concurrent_futures/test_process_pool.py‎

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
importos
2+
importqueue
3+
importsignal
24
importsys
35
importthreading
46
importtime
57
importunittest
8+
importunittest.mock
69
fromconcurrentimportfutures
710
fromconcurrent.futures.processimportBrokenProcessPool
811

912
fromtestimportsupport
1013
fromtest.supportimporthashlib_helper
14+
fromtest.test_importlib.metadata.fixturesimportparameterize
1115

1216
from .executorimportExecutorTest,mul
1317
from .utilimport (
@@ -22,6 +26,21 @@ def __init__(self, mgr):
2226
def__del__(self):
2327
self.event.set()
2428

29+
TERMINATE_WORKERS=futures.ProcessPoolExecutor.terminate_workers.__name__
30+
KILL_WORKERS=futures.ProcessPoolExecutor.kill_workers.__name__
31+
FORCE_SHUTDOWN_PARAMS= [
32+
dict(function_name=TERMINATE_WORKERS),
33+
dict(function_name=KILL_WORKERS),
34+
]
35+
36+
def_put_wait_put(queue,event):
37+
""" Used as part of test_terminate_workers """
38+
queue.put('started')
39+
event.wait()
40+
41+
# We should never get here since the event will not get set
42+
queue.put('finished')
43+
2544

2645
classProcessPoolExecutorTest(ExecutorTest):
2746

@@ -218,6 +237,107 @@ def mock_start_new_thread(func, *args, **kwargs):
218237
list(executor.map(mul, [(2,3)]*10))
219238
executor.shutdown()
220239

240+
deftest_terminate_workers(self):
241+
mock_fn=unittest.mock.Mock()
242+
withself.executor_type(max_workers=1)asexecutor:
243+
executor._force_shutdown=mock_fn
244+
executor.terminate_workers()
245+
246+
mock_fn.assert_called_once_with(operation=futures.process._TERMINATE)
247+
248+
deftest_kill_workers(self):
249+
mock_fn=unittest.mock.Mock()
250+
withself.executor_type(max_workers=1)asexecutor:
251+
executor._force_shutdown=mock_fn
252+
executor.kill_workers()
253+
254+
mock_fn.assert_called_once_with(operation=futures.process._KILL)
255+
256+
deftest_force_shutdown_workers_invalid_op(self):
257+
withself.executor_type(max_workers=1)asexecutor:
258+
self.assertRaises(ValueError,
259+
executor._force_shutdown,
260+
operation='invalid operation'),
261+
262+
@parameterize(*FORCE_SHUTDOWN_PARAMS)
263+
deftest_force_shutdown_workers(self,function_name):
264+
manager=self.get_context().Manager()
265+
q=manager.Queue()
266+
e=manager.Event()
267+
268+
withself.executor_type(max_workers=1)asexecutor:
269+
executor.submit(_put_wait_put,q,e)
270+
271+
# We should get started, but not finished since we'll terminate the
272+
# workers just after and never set the event.
273+
self.assertEqual(q.get(timeout=support.SHORT_TIMEOUT),'started')
274+
275+
worker_process=list(executor._processes.values())[0]
276+
277+
Mock=unittest.mock.Mock
278+
worker_process.terminate=Mock(wraps=worker_process.terminate)
279+
worker_process.kill=Mock(wraps=worker_process.kill)
280+
281+
getattr(executor,function_name)()
282+
worker_process.join()
283+
284+
iffunction_name==TERMINATE_WORKERS:
285+
worker_process.terminate.assert_called()
286+
eliffunction_name==KILL_WORKERS:
287+
worker_process.kill.assert_called()
288+
else:
289+
self.fail(f"Unknown operation:{function_name}")
290+
291+
self.assertRaises(queue.Empty,q.get,timeout=0.01)
292+
293+
@parameterize(*FORCE_SHUTDOWN_PARAMS)
294+
deftest_force_shutdown_workers_dead_workers(self,function_name):
295+
withself.executor_type(max_workers=1)asexecutor:
296+
future=executor.submit(os._exit,1)
297+
self.assertRaises(BrokenProcessPool,future.result)
298+
299+
# even though the pool is broken, this shouldn't raise
300+
getattr(executor,function_name)()
301+
302+
@parameterize(*FORCE_SHUTDOWN_PARAMS)
303+
deftest_force_shutdown_workers_not_started_yet(self,function_name):
304+
ctx=self.get_context()
305+
withunittest.mock.patch.object(ctx,'Process')asmock_process:
306+
withself.executor_type(max_workers=1,mp_context=ctx)asexecutor:
307+
# The worker has not been started yet, terminate/kill_workers
308+
# should basically no-op
309+
getattr(executor,function_name)()
310+
311+
mock_process.return_value.kill.assert_not_called()
312+
mock_process.return_value.terminate.assert_not_called()
313+
314+
@parameterize(*FORCE_SHUTDOWN_PARAMS)
315+
deftest_force_shutdown_workers_stops_pool(self,function_name):
316+
withself.executor_type(max_workers=1)asexecutor:
317+
task=executor.submit(time.sleep,0)
318+
self.assertIsNone(task.result())
319+
320+
worker_process=list(executor._processes.values())[0]
321+
getattr(executor,function_name)()
322+
323+
self.assertRaises(RuntimeError,executor.submit,time.sleep,0)
324+
325+
# A signal sent, is not a signal reacted to.
326+
# So wait a moment here for the process to die.
327+
# If we don't, every once in a while we may get an ENV CHANGE
328+
# error since the process would be alive immediately after the
329+
# test run.. and die a moment later.
330+
worker_process.join(support.SHORT_TIMEOUT)
331+
332+
# Oddly enough, even though join completes, sometimes it takes a
333+
# moment for the process to actually be marked as dead.
334+
# ... that seems a bit buggy.
335+
# We need it dead before ending the test to ensure it doesn't
336+
# get marked as an ENV CHANGE due to living child process.
337+
for_insupport.sleeping_retry(support.SHORT_TIMEOUT):
338+
ifnotworker_process.is_alive():
339+
break
340+
221341

222342
create_executor_tests(globals(),ProcessPoolExecutorTest,
223343
executor_mixins=(ProcessPoolForkMixin,
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Add:meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
2+
:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
3+
ways to terminate or kill all living worker processes in the given pool.
4+
(Contributed by Charles Machalow in:gh:`130849`.)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp