2626PENDING ,RUNNING ,CANCELLED ,CANCELLED_AND_NOTIFIED ,FINISHED ,Future ,
2727BrokenExecutor )
2828from concurrent .futures .process import BrokenProcessPool ,_check_system_limits
29- from multiprocessing import get_context
3029
3130import multiprocessing .process
3231import multiprocessing .util
32+ import multiprocessing as mp
3333
3434
3535if support .check_sanitizer (address = True ,memory = True ):
@@ -130,7 +130,6 @@ def setUp(self):
130130self .executor = self .executor_type (
131131max_workers = self .worker_count ,
132132** self .executor_kwargs )
133- self ._prime_executor ()
134133
135134def tearDown (self ):
136135self .executor .shutdown (wait = True )
@@ -144,15 +143,7 @@ def tearDown(self):
144143super ().tearDown ()
145144
146145def get_context (self ):
147- return get_context (self .ctx )
148-
149- def _prime_executor (self ):
150- # Make sure that the executor is ready to do work before running the
151- # tests. This should reduce the probability of timeouts in the tests.
152- futures = [self .executor .submit (time .sleep ,0.1 )
153- for _ in range (self .worker_count )]
154- for f in futures :
155- f .result ()
146+ return mp .get_context (self .ctx )
156147
157148
158149class ThreadPoolMixin (ExecutorMixin ):
@@ -275,9 +266,6 @@ def test_initializer(self):
275266with self .assertRaises (BrokenExecutor ):
276267self .executor .submit (get_init_status )
277268
278- def _prime_executor (self ):
279- pass
280-
281269@contextlib .contextmanager
282270def _assert_logged (self ,msg ):
283271if self .log_queue is not None :
@@ -364,14 +352,14 @@ def test_hang_issue12364(self):
364352f .result ()
365353
366354def test_cancel_futures (self ):
367- executor = self .executor_type ( max_workers = 3 )
368- fs = [executor .submit (time .sleep ,.1 )for _ in range (50 )]
369- executor .shutdown (cancel_futures = True )
355+ assert self .worker_count <= 5 , "test needs few workers"
356+ fs = [self . executor .submit (time .sleep ,.1 )for _ in range (50 )]
357+ self . executor .shutdown (cancel_futures = True )
370358# We can't guarantee the exact number of cancellations, but we can
371- # guarantee that *some* were cancelled. Withsetting max_workers to 3,
372- #most of the submitted futures should have been cancelled.
359+ # guarantee that *some* were cancelled. Withfew workers, many of
360+ # the submitted futures should have been cancelled.
373361cancelled = [fut for fut in fs if fut .cancelled ()]
374- self .assertTrue (len (cancelled )>= 35 , msg = f" { len ( cancelled ) = } " )
362+ self .assertGreater (len (cancelled ), 20 )
375363
376364# Ensure the other futures were able to finish.
377365# Use "not fut.cancelled()" instead of "fut.done()" to include futures
@@ -384,33 +372,32 @@ def test_cancel_futures(self):
384372# Similar to the number of cancelled futures, we can't guarantee the
385373# exact number that completed. But, we can guarantee that at least
386374# one finished.
387- self .assertTrue (len (others )> 0 , msg = f" { len ( others ) = } " )
375+ self .assertGreater (len (others ), 0 )
388376
389- def test_hang_issue39205 (self ):
377+ def test_hang_gh83386 (self ):
390378"""shutdown(wait=False) doesn't hang at exit with running futures.
391379
392- See https://bugs. python.org/issue39205 .
380+ See https://github.com/ python/cpython/issues/83386 .
393381 """
394382if self .executor_type == futures .ProcessPoolExecutor :
395383raise unittest .SkipTest (
396- "Hangs due to https://bugs. python.org/issue39205 " )
384+ "Hangs, see https://github.com/ python/cpython/issues/83386 " )
397385
398386rc ,out ,err = assert_python_ok ('-c' ,"""if True:
399387 from concurrent.futures import {executor_type}
400388 from test.test_concurrent_futures import sleep_and_print
401389 if __name__ == "__main__":
390+ if {context!r}: multiprocessing.set_start_method({context!r})
402391 t = {executor_type}(max_workers=3)
403392 t.submit(sleep_and_print, 1.0, "apple")
404393 t.shutdown(wait=False)
405- """ .format (executor_type = self .executor_type .__name__ ))
394+ """ .format (executor_type = self .executor_type .__name__ ,
395+ context = getattr (self ,'ctx' ,None )))
406396self .assertFalse (err )
407397self .assertEqual (out .strip (),b"apple" )
408398
409399
410400class ThreadPoolShutdownTest (ThreadPoolMixin ,ExecutorShutdownTest ,BaseTestCase ):
411- def _prime_executor (self ):
412- pass
413-
414401def test_threads_terminate (self ):
415402def acquire_lock (lock ):
416403lock .acquire ()
@@ -505,14 +492,11 @@ def test_cancel_futures_wait_false(self):
505492
506493
507494class ProcessPoolShutdownTest (ExecutorShutdownTest ):
508- def _prime_executor (self ):
509- pass
510-
511495def test_processes_terminate (self ):
512496def acquire_lock (lock ):
513497lock .acquire ()
514498
515- mp_context = get_context ()
499+ mp_context = self . get_context ()
516500sem = mp_context .Semaphore (0 )
517501for _ in range (3 ):
518502self .executor .submit (acquire_lock ,sem )
@@ -526,7 +510,8 @@ def acquire_lock(lock):
526510p .join ()
527511
528512def test_context_manager_shutdown (self ):
529- with futures .ProcessPoolExecutor (max_workers = 5 )as e :
513+ with futures .ProcessPoolExecutor (
514+ max_workers = 5 ,mp_context = self .get_context ())as e :
530515processes = e ._processes
531516self .assertEqual (list (e .map (abs ,range (- 5 ,5 ))),
532517 [5 ,4 ,3 ,2 ,1 ,0 ,1 ,2 ,3 ,4 ])
@@ -535,7 +520,8 @@ def test_context_manager_shutdown(self):
535520p .join ()
536521
537522def test_del_shutdown (self ):
538- executor = futures .ProcessPoolExecutor (max_workers = 5 )
523+ executor = futures .ProcessPoolExecutor (
524+ max_workers = 5 ,mp_context = self .get_context ())
539525res = executor .map (abs ,range (- 5 ,5 ))
540526executor_manager_thread = executor ._executor_manager_thread
541527processes = executor ._processes
@@ -558,7 +544,8 @@ def test_del_shutdown(self):
558544def test_shutdown_no_wait (self ):
559545# Ensure that the executor cleans up the processes when calling
560546# shutdown with wait=False
561- executor = futures .ProcessPoolExecutor (max_workers = 5 )
547+ executor = futures .ProcessPoolExecutor (
548+ max_workers = 5 ,mp_context = self .get_context ())
562549res = executor .map (abs ,range (- 5 ,5 ))
563550processes = executor ._processes
564551call_queue = executor ._call_queue
@@ -935,7 +922,7 @@ def submit(pool):
935922pool .submit (submit ,pool )
936923
937924for _ in range (50 ):
938- with futures .ProcessPoolExecutor (1 ,mp_context = get_context ('fork' ))as workers :
925+ with futures .ProcessPoolExecutor (1 ,mp_context = mp . get_context ('fork' ))as workers :
939926workers .submit (tuple )
940927
941928
@@ -1005,7 +992,7 @@ def test_traceback(self):
1005992def test_ressources_gced_in_workers (self ):
1006993# Ensure that argument for a job are correctly gc-ed after the job
1007994# is finished
1008- mgr = get_context ( self .ctx ).Manager ()
995+ mgr = self .get_context ( ).Manager ()
1009996obj = EventfulGCObj (mgr )
1010997future = self .executor .submit (id ,obj )
1011998future .result ()
@@ -1021,38 +1008,41 @@ def test_ressources_gced_in_workers(self):
10211008mgr .join ()
10221009
10231010def test_saturation (self ):
1024- executor = self .executor_type ( 4 )
1025- mp_context = get_context ()
1011+ executor = self .executor
1012+ mp_context = self . get_context ()
10261013sem = mp_context .Semaphore (0 )
10271014job_count = 15 * executor ._max_workers
1028- try :
1029- for _ in range (job_count ):
1030- executor .submit (sem .acquire )
1031- self .assertEqual (len (executor ._processes ),executor ._max_workers )
1032- for _ in range (job_count ):
1033- sem .release ()
1034- finally :
1035- executor .shutdown ()
1015+ for _ in range (job_count ):
1016+ executor .submit (sem .acquire )
1017+ self .assertEqual (len (executor ._processes ),executor ._max_workers )
1018+ for _ in range (job_count ):
1019+ sem .release ()
10361020
10371021def test_idle_process_reuse_one (self ):
1038- executor = self .executor_type (4 )
1022+ executor = self .executor
1023+ assert executor ._max_workers >= 4
10391024executor .submit (mul ,21 ,2 ).result ()
10401025executor .submit (mul ,6 ,7 ).result ()
10411026executor .submit (mul ,3 ,14 ).result ()
10421027self .assertEqual (len (executor ._processes ),1 )
1043- executor .shutdown ()
10441028
10451029def test_idle_process_reuse_multiple (self ):
1046- executor = self .executor_type (4 )
1030+ executor = self .executor
1031+ assert executor ._max_workers <= 5
10471032executor .submit (mul ,12 ,7 ).result ()
10481033executor .submit (mul ,33 ,25 )
10491034executor .submit (mul ,25 ,26 ).result ()
10501035executor .submit (mul ,18 ,29 )
1051- self .assertLessEqual (len (executor ._processes ),2 )
1036+ executor .submit (mul ,1 ,2 ).result ()
1037+ executor .submit (mul ,0 ,9 )
1038+ self .assertLessEqual (len (executor ._processes ),3 )
10521039executor .shutdown ()
10531040
10541041def test_max_tasks_per_child (self ):
1055- executor = self .executor_type (1 ,max_tasks_per_child = 3 )
1042+ # not using self.executor as we need to control construction.
1043+ # arguably this could go in another class w/o that mixin.
1044+ executor = self .executor_type (
1045+ 1 ,mp_context = self .get_context (),max_tasks_per_child = 3 )
10561046f1 = executor .submit (os .getpid )
10571047original_pid = f1 .result ()
10581048# The worker pid remains the same as the worker could be reused
@@ -1072,7 +1062,10 @@ def test_max_tasks_per_child(self):
10721062executor .shutdown ()
10731063
10741064def test_max_tasks_early_shutdown (self ):
1075- executor = self .executor_type (3 ,max_tasks_per_child = 1 )
1065+ # not using self.executor as we need to control construction.
1066+ # arguably this could go in another class w/o that mixin.
1067+ executor = self .executor_type (
1068+ 3 ,mp_context = self .get_context (),max_tasks_per_child = 1 )
10761069futures = []
10771070for i in range (6 ):
10781071futures .append (executor .submit (mul ,i ,i ))
@@ -1182,7 +1175,7 @@ def _check_crash(self, error, func, *args, ignore_stderr=False):
11821175self .executor .shutdown (wait = True )
11831176
11841177executor = self .executor_type (
1185- max_workers = 2 ,mp_context = get_context ( self .ctx ))
1178+ max_workers = 2 ,mp_context = self .get_context ( ))
11861179res = executor .submit (func ,* args )
11871180
11881181if ignore_stderr :
@@ -1261,7 +1254,7 @@ def test_shutdown_deadlock(self):
12611254# if a worker fails after the shutdown call.
12621255self .executor .shutdown (wait = True )
12631256with self .executor_type (max_workers = 2 ,
1264- mp_context = get_context ( self .ctx ))as executor :
1257+ mp_context = self .get_context ( ))as executor :
12651258self .executor = executor # Allow clean up in fail_on_deadlock
12661259f = executor .submit (_crash ,delay = .1 )
12671260executor .shutdown (wait = True )
@@ -1274,7 +1267,7 @@ def test_shutdown_deadlock_pickle(self):
12741267# Reported in bpo-39104.
12751268self .executor .shutdown (wait = True )
12761269with self .executor_type (max_workers = 2 ,
1277- mp_context = get_context ( self .ctx ))as executor :
1270+ mp_context = self .get_context ( ))as executor :
12781271self .executor = executor # Allow clean up in fail_on_deadlock
12791272
12801273# Start the executor and get the executor_manager_thread to collect