multiprocessing
— Process-based parallelism¶
Source code:Lib/multiprocessing/
Availability: not Android, not iOS, not WASI.
This module is not supported onmobile platformsorWebAssembly platforms.
Introduction¶
multiprocessing
is a package that supports spawning processes using anAPI similar to thethreading
module. Themultiprocessing
packageoffers both local and remote concurrency, effectively side-stepping theGlobal Interpreter Lock by usingsubprocesses instead of threads. Dueto this, themultiprocessing
module allows the programmer to fullyleverage multiple processors on a given machine. It runs on both POSIX andWindows.
Themultiprocessing
module also introduces APIs which do not haveanalogs in thethreading
module. A prime example of this is thePool
object which offers a convenient means ofparallelizing the execution of a function across multiple input values,distributing the input data across processes (data parallelism). The followingexample demonstrates the common practice of defining such functions in a moduleso that child processes can successfully import that module. This basic exampleof data parallelism usingPool
,
frommultiprocessingimportPooldeff(x):returnx*xif__name__=='__main__':withPool(5)asp:print(p.map(f,[1,2,3]))
will print to standard output
[1,4,9]
See also
concurrent.futures.ProcessPoolExecutor
offers a higher level interfaceto push tasks to a background process without blocking execution of thecalling process. Compared to using thePool
interface directly, theconcurrent.futures
API more readily allowsthe submission of work to the underlying process pool to be separated fromwaiting for the results.
TheProcess
class¶
Inmultiprocessing
, processes are spawned by creating aProcess
object and then calling itsstart()
method.Process
follows the API ofthreading.Thread
. A trivial example of amultiprocess program is
frommultiprocessingimportProcessdeff(name):print('hello',name)if__name__=='__main__':p=Process(target=f,args=('bob',))p.start()p.join()
To show the individual process IDs involved, here is an expanded example:
frommultiprocessingimportProcessimportosdefinfo(title):print(title)print('module name:',__name__)print('parent process:',os.getppid())print('process id:',os.getpid())deff(name):info('function f')print('hello',name)if__name__=='__main__':info('main line')p=Process(target=f,args=('bob',))p.start()p.join()
For an explanation of why theif__name__=='__main__'
part isnecessary, seeProgramming guidelines.
Contexts and start methods¶
Depending on the platform,multiprocessing
supports three waysto start a process. Thesestart methods are
- spawn
The parent process starts a fresh Python interpreter process. Thechild process will only inherit those resources necessary to runthe process object’s
run()
method. In particular,unnecessary file descriptors and handles from the parent processwill not be inherited. Starting a process using this method israther slow compared to usingfork orforkserver.Available on POSIX and Windows platforms. The default on Windows and macOS.
- fork
The parent process uses
os.fork()
to fork the Pythoninterpreter. The child process, when it begins, is effectivelyidentical to the parent process. All resources of the parent areinherited by the child process. Note that safely forking amultithreaded process is problematic.Available on POSIX systems. Currently the default on POSIX except macOS.
Note
The default start method will change away fromfork in Python 3.14.Code that requiresfork should explicitly specify that via
get_context()
orset_start_method()
.Changed in version 3.12:If Python is able to detect that your process has multiple threads, the
os.fork()
function that this start method calls internally willraise aDeprecationWarning
. Use a different start method.See theos.fork()
documentation for further explanation.- forkserver
When the program starts and selects theforkserver start method,a server process is spawned. From then on, whenever a new processis needed, the parent process connects to the server and requeststhat it fork a new process. The fork server process is single threadedunless system libraries or preloaded imports spawn threads as aside-effect so it is generally safe for it to use
os.fork()
.No unnecessary resources are inherited.Available on POSIX platforms which support passing file descriptorsover Unix pipes such as Linux.
Changed in version 3.4:spawn added on all POSIX platforms, andforkserver added forsome POSIX platforms.Child processes no longer inherit all of the parents inheritablehandles on Windows.
Changed in version 3.8:On macOS, thespawn start method is now the default. Thefork startmethod should be considered unsafe as it can lead to crashes of thesubprocess as macOS system libraries may start threads. Seebpo-33725.
On POSIX using thespawn orforkserver start methods will alsostart aresource tracker process which tracks the unlinked namedsystem resources (such as named semaphores orSharedMemory
objects) createdby processes of the program. When all processeshave exited the resource tracker unlinks any remaining tracked object.Usually there should be none, but if a process was killed by a signalthere may be some “leaked” resources. (Neither leaked semaphores nor sharedmemory segments will be automatically unlinked until the next reboot. This isproblematic for both objects because the system allows only a limited number ofnamed semaphores, and shared memory segments occupy some space in the mainmemory.)
To select a start method you use theset_start_method()
intheif__name__=='__main__'
clause of the main module. Forexample:
importmultiprocessingasmpdeffoo(q):q.put('hello')if__name__=='__main__':mp.set_start_method('spawn')q=mp.Queue()p=mp.Process(target=foo,args=(q,))p.start()print(q.get())p.join()
set_start_method()
should not be used more than once in theprogram.
Alternatively, you can useget_context()
to obtain a contextobject. Context objects have the same API as the multiprocessingmodule, and allow one to use multiple start methods in the sameprogram.
importmultiprocessingasmpdeffoo(q):q.put('hello')if__name__=='__main__':ctx=mp.get_context('spawn')q=ctx.Queue()p=ctx.Process(target=foo,args=(q,))p.start()print(q.get())p.join()
Note that objects related to one context may not be compatible withprocesses for a different context. In particular, locks created usingthefork context cannot be passed to processes started using thespawn orforkserver start methods.
A library which wants to use a particular start method should probablyuseget_context()
to avoid interfering with the choice of thelibrary user.
Warning
The'spawn'
and'forkserver'
start methods generally cannotbe used with “frozen” executables (i.e., binaries produced bypackages likePyInstaller andcx_Freeze) on POSIX systems.The'fork'
start method may work if code does not use threads.
Exchanging objects between processes¶
multiprocessing
supports two types of communication channel betweenprocesses:
Queues
The
Queue
class is a near clone ofqueue.Queue
. Forexample:frommultiprocessingimportProcess,Queuedeff(q):q.put([42,None,'hello'])if__name__=='__main__':q=Queue()p=Process(target=f,args=(q,))p.start()print(q.get())# prints "[42, None, 'hello']"p.join()Queues are thread and process safe.Any object put into a
multiprocessing
queue will be serialized.
Pipes
The
Pipe()
function returns a pair of connection objects connected by apipe which by default is duplex (two-way). For example:frommultiprocessingimportProcess,Pipedeff(conn):conn.send([42,None,'hello'])conn.close()if__name__=='__main__':parent_conn,child_conn=Pipe()p=Process(target=f,args=(child_conn,))p.start()print(parent_conn.recv())# prints "[42, None, 'hello']"p.join()The two connection objects returned by
Pipe()
represent the two ends ofthe pipe. Each connection object hassend()
andrecv()
methods (among others). Note that data in a pipemay become corrupted if two processes (or threads) try to read from or writeto thesame end of the pipe at the same time. Of course there is no riskof corruption from processes using different ends of the pipe at the sametime.The
send()
method serializes the object andrecv()
re-creates the object.
Synchronization between processes¶
multiprocessing
contains equivalents of all the synchronizationprimitives fromthreading
. For instance one can use a lock to ensurethat only one process prints to standard output at a time:
frommultiprocessingimportProcess,Lockdeff(l,i):l.acquire()try:print('hello world',i)finally:l.release()if__name__=='__main__':lock=Lock()fornuminrange(10):Process(target=f,args=(lock,num)).start()
Without using the lock output from the different processes is liable to get allmixed up.
Sharing state between processes¶
As mentioned above, when doing concurrent programming it is usually best toavoid using shared state as far as possible. This is particularly true whenusing multiple processes.
However, if you really do need to use some shared data thenmultiprocessing
provides a couple of ways of doing so.
Shared memory
Data can be stored in a shared memory map using
Value
orArray
. For example, the following codefrommultiprocessingimportProcess,Value,Arraydeff(n,a):n.value=3.1415927foriinrange(len(a)):a[i]=-a[i]if__name__=='__main__':num=Value('d',0.0)arr=Array('i',range(10))p=Process(target=f,args=(num,arr))p.start()p.join()print(num.value)print(arr[:])will print
3.1415927[0,-1,-2,-3,-4,-5,-6,-7,-8,-9]The
'd'
and'i'
arguments used when creatingnum
andarr
aretypecodes of the kind used by thearray
module:'d'
indicates adouble precision float and'i'
indicates a signed integer. These sharedobjects will be process and thread-safe.For more flexibility in using shared memory one can use the
multiprocessing.sharedctypes
module which supports the creation ofarbitrary ctypes objects allocated from shared memory.
Server process
A manager object returned by
Manager()
controls a server process whichholds Python objects and allows other processes to manipulate them usingproxies.A manager returned by
Manager()
will support typeslist
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Barrier
,Queue
,Value
andArray
. For example,frommultiprocessingimportProcess,Managerdeff(d,l):d[1]='1'd['2']=2d[0.25]=Nonel.reverse()if__name__=='__main__':withManager()asmanager:d=manager.dict()l=manager.list(range(10))p=Process(target=f,args=(d,l))p.start()p.join()print(d)print(l)will print
{0.25:None,1:'1','2':2}[9,8,7,6,5,4,3,2,1,0]Server process managers are more flexible than using shared memory objectsbecause they can be made to support arbitrary object types. Also, a singlemanager can be shared by processes on different computers over a network.They are, however, slower than using shared memory.
Using a pool of workers¶
ThePool
class represents a pool of workerprocesses. It has methods which allows tasks to be offloaded to the workerprocesses in a few different ways.
For example:
frommultiprocessingimportPool,TimeoutErrorimporttimeimportosdeff(x):returnx*xif__name__=='__main__':# start 4 worker processeswithPool(processes=4)aspool:# print "[0, 1, 4,..., 81]"print(pool.map(f,range(10)))# print same numbers in arbitrary orderforiinpool.imap_unordered(f,range(10)):print(i)# evaluate "f(20)" asynchronouslyres=pool.apply_async(f,(20,))# runs in *only* one processprint(res.get(timeout=1))# prints "400"# evaluate "os.getpid()" asynchronouslyres=pool.apply_async(os.getpid,())# runs in *only* one processprint(res.get(timeout=1))# prints the PID of that process# launching multiple evaluations asynchronously *may* use more processesmultiple_results=[pool.apply_async(os.getpid,())foriinrange(4)]print([res.get(timeout=1)forresinmultiple_results])# make a single worker sleep for 10 secondsres=pool.apply_async(time.sleep,(10,))try:print(res.get(timeout=1))exceptTimeoutError:print("We lacked patience and got a multiprocessing.TimeoutError")print("For the moment, the pool remains available for more work")# exiting the 'with'-block has stopped the poolprint("Now the pool is closed and no longer available")
Note that the methods of a pool should only ever be used by theprocess which created it.
Note
Functionality within this package requires that the__main__
module beimportable by the children. This is covered inProgramming guidelineshowever it is worth pointing out here. This means that some examples, suchas themultiprocessing.pool.Pool
examples will not work in theinteractive interpreter. For example:
>>>frommultiprocessingimportPool>>>p=Pool(5)>>>deff(x):...returnx*x...>>>withp:...p.map(f,[1,2,3])Process PoolWorker-1:Process PoolWorker-2:Process PoolWorker-3:Traceback (most recent call last):Traceback (most recent call last):Traceback (most recent call last):AttributeError:Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
(If you try this it will actually output three full tracebacksinterleaved in a semi-random fashion, and then you may have tostop the parent process somehow.)
Reference¶
Themultiprocessing
package mostly replicates the API of thethreading
module.
Process
and exceptions¶
- classmultiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)¶
Process objects represent activity that is run in a separate process. The
Process
class has equivalents of all the methods ofthreading.Thread
.The constructor should always be called with keyword arguments.groupshould always be
None
; it exists solely for compatibility withthreading.Thread
.target is the callable object to be invoked bytherun()
method. It defaults toNone
, meaning nothing iscalled.name is the process name (seename
for more details).args is the argument tuple for the target invocation.kwargs is adictionary of keyword arguments for the target invocation. If provided,the keyword-onlydaemon argument sets the processdaemon
flagtoTrue
orFalse
. IfNone
(the default), this flag will beinherited from the creating process.By default, no arguments are passed totarget. Theargs argument,which defaults to
()
, can be used to specify a list or tuple of the argumentsto pass totarget.If a subclass overrides the constructor, it must make sure it invokes thebase class constructor (
Process.__init__()
) before doing anything elseto the process.Changed in version 3.3:Added thedaemon parameter.
- run()¶
Method representing the process’s activity.
You may override this method in a subclass. The standard
run()
method invokes the callable object passed to the object’s constructor asthe target argument, if any, with sequential and keyword arguments takenfrom theargs andkwargs arguments, respectively.Using a list or tuple as theargs argument passed to
Process
achieves the same effect.Example:
>>>frommultiprocessingimportProcess>>>p=Process(target=print,args=[1])>>>p.run()1>>>p=Process(target=print,args=(1,))>>>p.run()1
- start()¶
Start the process’s activity.
This must be called at most once per process object. It arranges for theobject’s
run()
method to be invoked in a separate process.
- join([timeout])¶
If the optional argumenttimeout is
None
(the default), the methodblocks until the process whosejoin()
method is called terminates.Iftimeout is a positive number, it blocks at mosttimeout seconds.Note that the method returnsNone
if its process terminates or if themethod times out. Check the process’sexitcode
to determine ifit terminated.A process can be joined many times.
A process cannot join itself because this would cause a deadlock. It isan error to attempt to join a process before it has been started.
- name¶
The process’s name. The name is a string used for identification purposesonly. It has no semantics. Multiple processes may be given the samename.
The initial name is set by the constructor. If no explicit name isprovided to the constructor, a name of the form‘Process-N1:N2:…:Nk’ is constructed, whereeach Nk is the N-th child of its parent.
- is_alive()¶
Return whether the process is alive.
Roughly, a process object is alive from the moment the
start()
method returns until the child process terminates.
- daemon¶
The process’s daemon flag, a Boolean value. This must be set before
start()
is called.The initial value is inherited from the creating process.
When a process exits, it attempts to terminate all of its daemonic childprocesses.
Note that a daemonic process is not allowed to create child processes.Otherwise a daemonic process would leave its children orphaned if it getsterminated when its parent process exits. Additionally, these arenotUnix daemons or services, they are normal processes that will beterminated (and not joined) if non-daemonic processes have exited.
In addition to the
threading.Thread
API,Process
objectsalso support the following attributes and methods:- pid¶
Return the process ID. Before the process is spawned, this will be
None
.
- exitcode¶
The child’s exit code. This will be
None
if the process has not yetterminated.If the child’s
run()
method returned normally, the exit codewill be 0. If it terminated viasys.exit()
with an integerargumentN, the exit code will beN.If the child terminated due to an exception not caught within
run()
, the exit code will be 1. If it was terminated bysignalN, the exit code will be the negative value-N.
- authkey¶
The process’s authentication key (a byte string).
When
multiprocessing
is initialized the main process is assigned arandom string usingos.urandom()
.When a
Process
object is created, it will inherit theauthentication key of its parent process, although this may be changed bysettingauthkey
to another byte string.
- sentinel¶
A numeric handle of a system object which will become “ready” whenthe process ends.
You can use this value if you want to wait on several events atonce using
multiprocessing.connection.wait()
. Otherwisecallingjoin()
is simpler.On Windows, this is an OS handle usable with the
WaitForSingleObject
andWaitForMultipleObjects
family of API calls. On POSIX, this isa file descriptor usable with primitives from theselect
module.Added in version 3.3.
- terminate()¶
Terminate the process. On POSIX this is done using the
SIGTERM
signal;on WindowsTerminateProcess()
is used. Note that exit handlers andfinally clauses, etc., will not be executed.Note that descendant processes of the process willnot be terminated –they will simply become orphaned.
Warning
If this method is used when the associated process is using a pipe orqueue then the pipe or queue is liable to become corrupted and maybecome unusable by other process. Similarly, if the process hasacquired a lock or semaphore etc. then terminating it is liable tocause other processes to deadlock.
- kill()¶
Same as
terminate()
but using theSIGKILL
signal on POSIX.Added in version 3.7.
- close()¶
Close the
Process
object, releasing all resources associatedwith it.ValueError
is raised if the underlying processis still running. Onceclose()
returns successfully, mostother methods and attributes of theProcess
object willraiseValueError
.Added in version 3.7.
Note that the
start()
,join()
,is_alive()
,terminate()
andexitcode
methods should only be called bythe process that created the process object.Example usage of some of the methods of
Process
:>>>importmultiprocessing,time,signal>>>mp_context=multiprocessing.get_context('spawn')>>>p=mp_context.Process(target=time.sleep,args=(1000,))>>>print(p,p.is_alive())<...Process ... initial> False>>>p.start()>>>print(p,p.is_alive())<...Process ... started> True>>>p.terminate()>>>time.sleep(0.1)>>>print(p,p.is_alive())<...Process ... stopped exitcode=-SIGTERM> False>>>p.exitcode==-signal.SIGTERMTrue
- exceptionmultiprocessing.ProcessError¶
The base class of all
multiprocessing
exceptions.
- exceptionmultiprocessing.BufferTooShort¶
Exception raised by
Connection.recv_bytes_into()
when the suppliedbuffer object is too small for the message read.If
e
is an instance ofBufferTooShort
thene.args[0]
will givethe message as a byte string.
- exceptionmultiprocessing.AuthenticationError¶
Raised when there is an authentication error.
- exceptionmultiprocessing.TimeoutError¶
Raised by methods with a timeout when the timeout expires.
Pipes and Queues¶
When using multiple processes, one generally uses message passing forcommunication between processes and avoids having to use any synchronizationprimitives like locks.
For passing messages one can usePipe()
(for a connection between twoprocesses) or a queue (which allows multiple producers and consumers).
TheQueue
,SimpleQueue
andJoinableQueue
typesare multi-producer, multi-consumerFIFOqueues modelled on thequeue.Queue
class in thestandard library. They differ in thatQueue
lacks thetask_done()
andjoin()
methods introducedinto Python 2.5’squeue.Queue
class.
If you useJoinableQueue
then youmust callJoinableQueue.task_done()
for each task removed from the queue or else thesemaphore used to count the number of unfinished tasks may eventually overflow,raising an exception.
One difference from other Python queue implementations, is thatmultiprocessing
queues serializes all objects that are put into them usingpickle
.The object return by the get method is a re-created object that does not share memorywith the original object.
Note that one can also create a shared queue by using a manager object – seeManagers.
Note
multiprocessing
uses the usualqueue.Empty
andqueue.Full
exceptions to signal a timeout. They are not available inthemultiprocessing
namespace so you need to import them fromqueue
.
Note
When an object is put on a queue, the object is pickled and abackground thread later flushes the pickled data to an underlyingpipe. This has some consequences which are a little surprising,but should not cause any practical difficulties – if they reallybother you then you can instead use a queue created with amanager.
After putting an object on an empty queue there may be aninfinitesimal delay before the queue’s
empty()
method returnsFalse
andget_nowait()
canreturn without raisingqueue.Empty
.If multiple processes are enqueuing objects, it is possible forthe objects to be received at the other end out-of-order.However, objects enqueued by the same process will always be inthe expected order with respect to each other.
Warning
If a process is killed usingProcess.terminate()
oros.kill()
while it is trying to use aQueue
, then the data in the queue islikely to become corrupted. This may cause any other process to get anexception when it tries to use the queue later on.
Warning
As mentioned above, if a child process has put items on a queue (and it hasnot usedJoinableQueue.cancel_join_thread
), then that process willnot terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unlessyou are sure that all items which have been put on the queue have beenconsumed. Similarly, if the child process is non-daemonic then the parentprocess may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue. SeeProgramming guidelines.
For an example of the usage of queues for interprocess communication seeExamples.
- multiprocessing.Pipe([duplex])¶
Returns a pair
(conn1,conn2)
ofConnection
objects representing theends of a pipe.Ifduplex is
True
(the default) then the pipe is bidirectional. Ifduplex isFalse
then the pipe is unidirectional:conn1
can only beused for receiving messages andconn2
can only be used for sendingmessages.The
send()
method serializes the object usingpickle
and therecv()
re-creates the object.
- classmultiprocessing.Queue([maxsize])¶
Returns a process shared queue implemented using a pipe and a fewlocks/semaphores. When a process first puts an item on the queue a feederthread is started which transfers objects from a buffer into the pipe.
The usual
queue.Empty
andqueue.Full
exceptions from thestandard library’squeue
module are raised to signal timeouts.Queue
implements all the methods ofqueue.Queue
except fortask_done()
andjoin()
.- qsize()¶
Return the approximate size of the queue. Because ofmultithreading/multiprocessing semantics, this number is not reliable.
Note that this may raise
NotImplementedError
on platforms likemacOS wheresem_getvalue()
is not implemented.
- empty()¶
Return
True
if the queue is empty,False
otherwise. Because ofmultithreading/multiprocessing semantics, this is not reliable.May raise an
OSError
on closed queues. (not guaranteed)
- full()¶
Return
True
if the queue is full,False
otherwise. Because ofmultithreading/multiprocessing semantics, this is not reliable.
- put(obj[,block[,timeout]])¶
Put obj into the queue. If the optional argumentblock is
True
(the default) andtimeout isNone
(the default), block if necessary untila free slot is available. Iftimeout is a positive number, it blocks atmosttimeout seconds and raises thequeue.Full
exception if nofree slot was available within that time. Otherwise (block isFalse
), put an item on the queue if a free slot is immediatelyavailable, else raise thequeue.Full
exception (timeout isignored in that case).Changed in version 3.8:If the queue is closed,
ValueError
is raised instead ofAssertionError
.
- put_nowait(obj)¶
Equivalent to
put(obj,False)
.
- get([block[,timeout]])¶
Remove and return an item from the queue. If optional argsblock is
True
(the default) andtimeout isNone
(the default), block ifnecessary until an item is available. Iftimeout is a positive number,it blocks at mosttimeout seconds and raises thequeue.Empty
exception if no item was available within that time. Otherwise (block isFalse
), return an item if one is immediately available, else raise thequeue.Empty
exception (timeout is ignored in that case).Changed in version 3.8:If the queue is closed,
ValueError
is raised instead ofOSError
.
- get_nowait()¶
Equivalent to
get(False)
.
multiprocessing.Queue
has a few additional methods not found inqueue.Queue
. These methods are usually unnecessary for mostcode:- close()¶
Indicate that no more data will be put on this queue by the currentprocess. The background thread will quit once it has flushed all buffereddata to the pipe. This is called automatically when the queue is garbagecollected.
- join_thread()¶
Join the background thread. This can only be used after
close()
hasbeen called. It blocks until the background thread exits, ensuring thatall data in the buffer has been flushed to the pipe.By default if a process is not the creator of the queue then on exit itwill attempt to join the queue’s background thread. The process can call
cancel_join_thread()
to makejoin_thread()
do nothing.
- cancel_join_thread()¶
Prevent
join_thread()
from blocking. In particular, this preventsthe background thread from being joined automatically when the processexits – seejoin_thread()
.A better name for this method might be
allow_exit_without_flush()
. It is likely to cause enqueueddata to be lost, and you almost certainly will not need to use it.It is really only there if you need the current process to exitimmediately without waiting to flush enqueued data to theunderlying pipe, and you don’t care about lost data.
Note
This class’s functionality requires a functioning shared semaphoreimplementation on the host operating system. Without one, thefunctionality in this class will be disabled, and attempts toinstantiate a
Queue
will result in anImportError
. Seebpo-3770 for additional information. The same holds true for anyof the specialized queue types listed below.
- classmultiprocessing.SimpleQueue¶
It is a simplified
Queue
type, very close to a lockedPipe
.- close()¶
Close the queue: release internal resources.
A queue must not be used anymore after it is closed. For example,
get()
,put()
andempty()
methods must no longer becalled.Added in version 3.9.
- empty()¶
Return
True
if the queue is empty,False
otherwise.Always raises an
OSError
if the SimpleQueue is closed.
- get()¶
Remove and return an item from the queue.
- put(item)¶
Putitem into the queue.
- classmultiprocessing.JoinableQueue([maxsize])¶
JoinableQueue
, aQueue
subclass, is a queue whichadditionally hastask_done()
andjoin()
methods.- task_done()¶
Indicate that a formerly enqueued task is complete. Used by queueconsumers. For each
get()
used to fetch a task, a subsequentcall totask_done()
tells the queue that the processing on the taskis complete.If a
join()
is currently blocking, it will resume when allitems have been processed (meaning that atask_done()
call wasreceived for every item that had beenput()
into the queue).Raises a
ValueError
if called more times than there were itemsplaced in the queue.
- join()¶
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to thequeue. The count goes down whenever a consumer calls
task_done()
to indicate that the item was retrieved and all work onit is complete. When the count of unfinished tasks drops to zero,join()
unblocks.
Miscellaneous¶
- multiprocessing.active_children()¶
Return list of all live children of the current process.
Calling this has the side effect of “joining” any processes which havealready finished.
- multiprocessing.cpu_count()¶
Return the number of CPUs in the system.
This number is not equivalent to the number of CPUs the current process canuse. The number of usable CPUs can be obtained with
os.process_cpu_count()
(orlen(os.sched_getaffinity(0))
).When the number of CPUs cannot be determined a
NotImplementedError
is raised.Changed in version 3.13:The return value can also be overridden using the
-Xcpu_count
flag orPYTHON_CPU_COUNT
as this ismerely a wrapper around theos
cpu count APIs.
- multiprocessing.current_process()¶
Return the
Process
object corresponding to the current process.An analogue of
threading.current_thread()
.
- multiprocessing.parent_process()¶
Return the
Process
object corresponding to the parent process ofthecurrent_process()
. For the main process,parent_process
willbeNone
.Added in version 3.8.
- multiprocessing.freeze_support()¶
Add support for when a program which uses
multiprocessing
has beenfrozen to produce a Windows executable. (Has been tested withpy2exe,PyInstaller andcx_Freeze.)One needs to call this function straight after the
if__name__=='__main__'
line of the main module. For example:frommultiprocessingimportProcess,freeze_supportdeff():print('hello world!')if__name__=='__main__':freeze_support()Process(target=f).start()
If the
freeze_support()
line is omitted then trying to run the frozenexecutable will raiseRuntimeError
.Calling
freeze_support()
has no effect when invoked on any operatingsystem other than Windows. In addition, if the module is being runnormally by the Python interpreter on Windows (the program has not beenfrozen), thenfreeze_support()
has no effect.
- multiprocessing.get_all_start_methods()¶
Returns a list of the supported start methods, the first of whichis the default. The possible start methods are
'fork'
,'spawn'
and'forkserver'
. Not all platforms support allmethods. SeeContexts and start methods.Added in version 3.4.
- multiprocessing.get_context(method=None)¶
Return a context object which has the same attributes as the
multiprocessing
module.Ifmethod is
None
then the default context is returned.Otherwisemethod should be'fork'
,'spawn'
,'forkserver'
.ValueError
is raised if the specifiedstart method is not available. SeeContexts and start methods.Added in version 3.4.
- multiprocessing.get_start_method(allow_none=False)¶
Return the name of start method used for starting processes.
If the start method has not been fixed andallow_none is false,then the start method is fixed to the default and the name isreturned. If the start method has not been fixed andallow_noneis true then
None
is returned.The return value can be
'fork'
,'spawn'
,'forkserver'
orNone
. SeeContexts and start methods.Added in version 3.4.
Changed in version 3.8:On macOS, thespawn start method is now the default. Thefork startmethod should be considered unsafe as it can lead to crashes of thesubprocess. Seebpo-33725.
- multiprocessing.set_executable(executable)¶
Set the path of the Python interpreter to use when starting a child process.(By default
sys.executable
is used). Embedders will probably need todo some thing likeset_executable(os.path.join(sys.exec_prefix,'pythonw.exe'))
before they can create child processes.
Changed in version 3.4:Now supported on POSIX when the
'spawn'
start method is used.Changed in version 3.11:Accepts apath-like object.
- multiprocessing.set_forkserver_preload(module_names)¶
Set a list of module names for the forkserver main process to attempt toimport so that their already imported state is inherited by forkedprocesses. Any
ImportError
when doing so is silently ignored.This can be used as a performance enhancement to avoid repeated workin every process.For this to work, it must be called before the forkserver process has beenlaunched (before creating a
Pool
or starting aProcess
).Only meaningful when using the
'forkserver'
start method.SeeContexts and start methods.Added in version 3.4.
- multiprocessing.set_start_method(method,force=False)¶
Set the method which should be used to start child processes.Themethod argument can be
'fork'
,'spawn'
or'forkserver'
.RaisesRuntimeError
if the start method has already been set andforceis notTrue
. Ifmethod isNone
andforce isTrue
then the startmethod is set toNone
. Ifmethod isNone
andforce isFalse
then the context is set to the default context.Note that this should be called at most once, and it should beprotected inside the
if__name__=='__main__'
clause of themain module.SeeContexts and start methods.
Added in version 3.4.
Note
multiprocessing
contains no analogues ofthreading.active_count()
,threading.enumerate()
,threading.settrace()
,threading.setprofile()
,threading.Timer
, orthreading.local
.
Connection Objects¶
Connection objects allow the sending and receiving of picklable objects orstrings. They can be thought of as message oriented connected sockets.
Connection objects are usually created usingPipe
– see alsoListeners and Clients.
- classmultiprocessing.connection.Connection¶
- send(obj)¶
Send an object to the other end of the connection which should be readusing
recv()
.The object must be picklable. Very large pickles (approximately 32 MiB+,though it depends on the OS) may raise a
ValueError
exception.
- recv()¶
Return an object sent from the other end of the connection using
send()
. Blocks until there is something to receive. RaisesEOFError
if there is nothing left to receiveand the other end was closed.
- fileno()¶
Return the file descriptor or handle used by the connection.
- close()¶
Close the connection.
This is called automatically when the connection is garbage collected.
- poll([timeout])¶
Return whether there is any data available to be read.
Iftimeout is not specified then it will return immediately. Iftimeout is a number then this specifies the maximum time in seconds toblock. Iftimeout is
None
then an infinite timeout is used.Note that multiple connection objects may be polled at once byusing
multiprocessing.connection.wait()
.
- send_bytes(buffer[,offset[,size]])¶
Send byte data from abytes-like object as a complete message.
Ifoffset is given then data is read from that position inbuffer. Ifsize is given then that many bytes will be read from buffer. Very largebuffers (approximately 32 MiB+, though it depends on the OS) may raise a
ValueError
exception
- recv_bytes([maxlength])¶
Return a complete message of byte data sent from the other end of theconnection as a string. Blocks until there is something to receive.Raises
EOFError
if there is nothing leftto receive and the other end has closed.Ifmaxlength is specified and the message is longer thanmaxlengththen
OSError
is raised and the connection will no longer bereadable.
- recv_bytes_into(buffer[,offset])¶
Read intobuffer a complete message of byte data sent from the other endof the connection and return the number of bytes in the message. Blocksuntil there is something to receive. Raises
EOFError
if there is nothing left to receive and the other end wasclosed.buffer must be a writablebytes-like object. Ifoffset is given then the message will be written into the buffer fromthat position. Offset must be a non-negative integer less than thelength ofbuffer (in bytes).
If the buffer is too short then a
BufferTooShort
exception israised and the complete message is available ase.args[0]
wheree
is the exception instance.
Changed in version 3.3:Connection objects themselves can now be transferred between processesusing
Connection.send()
andConnection.recv()
.Connection objects also now support the context management protocol – seeContext Manager Types.
__enter__()
returns theconnection object, and__exit__()
callsclose()
.
For example:
>>>frommultiprocessingimportPipe>>>a,b=Pipe()>>>a.send([1,'hello',None])>>>b.recv()[1, 'hello', None]>>>b.send_bytes(b'thank you')>>>a.recv_bytes()b'thank you'>>>importarray>>>arr1=array.array('i',range(5))>>>arr2=array.array('i',[0]*10)>>>a.send_bytes(arr1)>>>count=b.recv_bytes_into(arr2)>>>assertcount==len(arr1)*arr1.itemsize>>>arr2array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Warning
TheConnection.recv()
method automatically unpickles the data itreceives, which can be a security risk unless you can trust the processwhich sent the message.
Therefore, unless the connection object was produced usingPipe()
youshould only use therecv()
andsend()
methods after performing some sort of authentication. SeeAuthentication keys.
Warning
If a process is killed while it is trying to read or write to a pipe thenthe data in the pipe is likely to become corrupted, because it may becomeimpossible to be sure where the message boundaries lie.
Synchronization primitives¶
Generally synchronization primitives are not as necessary in a multiprocessprogram as they are in a multithreaded program. See the documentation forthreading
module.
Note that one can also create synchronization primitives by using a managerobject – seeManagers.
- classmultiprocessing.Barrier(parties[,action[,timeout]])¶
A barrier object: a clone of
threading.Barrier
.Added in version 3.3.
- classmultiprocessing.BoundedSemaphore([value])¶
A bounded semaphore object: a close analog of
threading.BoundedSemaphore
.A solitary difference from its close analog exists: its
acquire
method’sfirst argument is namedblock, as is consistent withLock.acquire()
.Note
On macOS, this is indistinguishable from
Semaphore
becausesem_getvalue()
is not implemented on that platform.
- classmultiprocessing.Condition([lock])¶
A condition variable: an alias for
threading.Condition
.Iflock is specified then it should be a
Lock
orRLock
object frommultiprocessing
.Changed in version 3.3:The
wait_for()
method was added.
- classmultiprocessing.Event¶
A clone of
threading.Event
.
- classmultiprocessing.Lock¶
A non-recursive lock object: a close analog of
threading.Lock
.Once a process or thread has acquired a lock, subsequent attempts toacquire it from any process or thread will block until it is released;any process or thread may release it. The concepts and behaviors ofthreading.Lock
as it applies to threads are replicated here inmultiprocessing.Lock
as it applies to either processes or threads,except as noted.Note that
Lock
is actually a factory function which returns aninstance ofmultiprocessing.synchronize.Lock
initialized with adefault context.Lock
supports thecontext manager protocol and thus may beused inwith
statements.- acquire(block=True,timeout=None)¶
Acquire a lock, blocking or non-blocking.
With theblock argument set to
True
(the default), the method callwill block until the lock is in an unlocked state, then set it to lockedand returnTrue
. Note that the name of this first argument differsfrom that inthreading.Lock.acquire()
.With theblock argument set to
False
, the method call does notblock. If the lock is currently in a locked state, returnFalse
;otherwise set the lock to a locked state and returnTrue
.When invoked with a positive, floating-point value fortimeout, blockfor at most the number of seconds specified bytimeout as long asthe lock can not be acquired. Invocations with a negative value fortimeout are equivalent to atimeout of zero. Invocations with atimeout value of
None
(the default) set the timeout period toinfinite. Note that the treatment of negative orNone
values fortimeout differs from the implemented behavior inthreading.Lock.acquire()
. Thetimeout argument has no practicalimplications if theblock argument is set toFalse
and is thusignored. ReturnsTrue
if the lock has been acquired orFalse
ifthe timeout period has elapsed.
- release()¶
Release a lock. This can be called from any process or thread, not onlythe process or thread which originally acquired the lock.
Behavior is the same as in
threading.Lock.release()
except thatwhen invoked on an unlocked lock, aValueError
is raised.
- classmultiprocessing.RLock¶
A recursive lock object: a close analog of
threading.RLock
. Arecursive lock must be released by the process or thread that acquired it.Once a process or thread has acquired a recursive lock, the same processor thread may acquire it again without blocking; that process or threadmust release it once for each time it has been acquired.Note that
RLock
is actually a factory function which returns aninstance ofmultiprocessing.synchronize.RLock
initialized with adefault context.RLock
supports thecontext manager protocol and thus may beused inwith
statements.- acquire(block=True,timeout=None)¶
Acquire a lock, blocking or non-blocking.
When invoked with theblock argument set to
True
, block until thelock is in an unlocked state (not owned by any process or thread) unlessthe lock is already owned by the current process or thread. The currentprocess or thread then takes ownership of the lock (if it does notalready have ownership) and the recursion level inside the lock incrementsby one, resulting in a return value ofTrue
. Note that there areseveral differences in this first argument’s behavior compared to theimplementation ofthreading.RLock.acquire()
, starting with the nameof the argument itself.When invoked with theblock argument set to
False
, do not block.If the lock has already been acquired (and thus is owned) by anotherprocess or thread, the current process or thread does not take ownershipand the recursion level within the lock is not changed, resulting ina return value ofFalse
. If the lock is in an unlocked state, thecurrent process or thread takes ownership and the recursion level isincremented, resulting in a return value ofTrue
.Use and behaviors of thetimeout argument are the same as in
Lock.acquire()
. Note that some of these behaviors oftimeoutdiffer from the implemented behaviors inthreading.RLock.acquire()
.
- release()¶
Release a lock, decrementing the recursion level. If after thedecrement the recursion level is zero, reset the lock to unlocked (notowned by any process or thread) and if any other processes or threadsare blocked waiting for the lock to become unlocked, allow exactly oneof them to proceed. If after the decrement the recursion level is stillnonzero, the lock remains locked and owned by the calling process orthread.
Only call this method when the calling process or thread owns the lock.An
AssertionError
is raised if this method is called by a processor thread other than the owner or if the lock is in an unlocked (unowned)state. Note that the type of exception raised in this situationdiffers from the implemented behavior inthreading.RLock.release()
.
- classmultiprocessing.Semaphore([value])¶
A semaphore object: a close analog of
threading.Semaphore
.A solitary difference from its close analog exists: its
acquire
method’sfirst argument is namedblock, as is consistent withLock.acquire()
.
Note
On macOS,sem_timedwait
is unsupported, so callingacquire()
witha timeout will emulate that function’s behavior using a sleeping loop.
Note
Some of this package’s functionality requires a functioning shared semaphoreimplementation on the host operating system. Without one, themultiprocessing.synchronize
module will be disabled, and attempts toimport it will result in anImportError
. Seebpo-3770 for additional information.
Sharedctypes
Objects¶
It is possible to create shared objects using shared memory which can beinherited by child processes.
- multiprocessing.Value(typecode_or_type,*args,lock=True)¶
Return a
ctypes
object allocated from shared memory. By default thereturn value is actually a synchronized wrapper for the object. The objectitself can be accessed via thevalue attribute of aValue
.typecode_or_type determines the type of the returned object: it is either actypes type or a one character typecode of the kind used by the
array
module.*args is passed on to the constructor for the type.Iflock is
True
(the default) then a new recursive lockobject is created to synchronize access to the value. Iflock isaLock
orRLock
object then that will be used tosynchronize access to the value. Iflock isFalse
thenaccess to the returned object will not be automatically protectedby a lock, so it will not necessarily be “process-safe”.Operations like
+=
which involve a read and write are notatomic. So if, for instance, you want to atomically increment ashared value it is insufficient to just docounter.value+=1
Assuming the associated lock is recursive (which it is by default)you can instead do
withcounter.get_lock():counter.value+=1
Note thatlock is a keyword-only argument.
- multiprocessing.Array(typecode_or_type,size_or_initializer,*,lock=True)¶
Return a ctypes array allocated from shared memory. By default the returnvalue is actually a synchronized wrapper for the array.
typecode_or_type determines the type of the elements of the returned array:it is either a ctypes type or a one character typecode of the kind used bythe
array
module. Ifsize_or_initializer is an integer, then itdetermines the length of the array, and the array will be initially zeroed.Otherwise,size_or_initializer is a sequence which is used to initializethe array and whose length determines the length of the array.Iflock is
True
(the default) then a new lock object is created tosynchronize access to the value. Iflock is aLock
orRLock
object then that will be used to synchronize access to thevalue. Iflock isFalse
then access to the returned object will not beautomatically protected by a lock, so it will not necessarily be“process-safe”.Note thatlock is a keyword only argument.
Note that an array of
ctypes.c_char
hasvalue andrawattributes which allow one to use it to store and retrieve strings.
Themultiprocessing.sharedctypes
module¶
Themultiprocessing.sharedctypes
module provides functions for allocatingctypes
objects from shared memory which can be inherited by childprocesses.
Note
Although it is possible to store a pointer in shared memory remember thatthis will refer to a location in the address space of a specific process.However, the pointer is quite likely to be invalid in the context of a secondprocess and trying to dereference the pointer from the second process maycause a crash.
- multiprocessing.sharedctypes.RawArray(typecode_or_type,size_or_initializer)¶
Return a ctypes array allocated from shared memory.
typecode_or_type determines the type of the elements of the returned array:it is either a ctypes type or a one character typecode of the kind used bythe
array
module. Ifsize_or_initializer is an integer then itdetermines the length of the array, and the array will be initially zeroed.Otherwisesize_or_initializer is a sequence which is used to initialize thearray and whose length determines the length of the array.Note that setting and getting an element is potentially non-atomic – use
Array()
instead to make sure that access is automatically synchronizedusing a lock.
- multiprocessing.sharedctypes.RawValue(typecode_or_type,*args)¶
Return a ctypes object allocated from shared memory.
typecode_or_type determines the type of the returned object: it is either actypes type or a one character typecode of the kind used by the
array
module.*args is passed on to the constructor for the type.Note that setting and getting the value is potentially non-atomic – use
Value()
instead to make sure that access is automatically synchronizedusing a lock.Note that an array of
ctypes.c_char
hasvalue
andraw
attributes which allow one to use it to store and retrieve strings – seedocumentation forctypes
.
- multiprocessing.sharedctypes.Array(typecode_or_type,size_or_initializer,*,lock=True)¶
The same as
RawArray()
except that depending on the value oflock aprocess-safe synchronization wrapper may be returned instead of a raw ctypesarray.Iflock is
True
(the default) then a new lock object is created tosynchronize access to the value. Iflock is aLock
orRLock
objectthen that will be used to synchronize access to thevalue. Iflock isFalse
then access to the returned object will not beautomatically protected by a lock, so it will not necessarily be“process-safe”.Note thatlock is a keyword-only argument.
- multiprocessing.sharedctypes.Value(typecode_or_type,*args,lock=True)¶
The same as
RawValue()
except that depending on the value oflock aprocess-safe synchronization wrapper may be returned instead of a raw ctypesobject.Iflock is
True
(the default) then a new lock object is created tosynchronize access to the value. Iflock is aLock
orRLock
object then that will be used to synchronize access to thevalue. Iflock isFalse
then access to the returned object will not beautomatically protected by a lock, so it will not necessarily be“process-safe”.Note thatlock is a keyword-only argument.
- multiprocessing.sharedctypes.copy(obj)¶
Return a ctypes object allocated from shared memory which is a copy of thectypes objectobj.
- multiprocessing.sharedctypes.synchronized(obj[,lock])¶
Return a process-safe wrapper object for a ctypes object which useslock tosynchronize access. Iflock is
None
(the default) then amultiprocessing.RLock
object is created automatically.A synchronized wrapper will have two methods in addition to those of theobject it wraps:
get_obj()
returns the wrapped object andget_lock()
returns the lock object used for synchronization.Note that accessing the ctypes object through the wrapper can be a lot slowerthan accessing the raw ctypes object.
Changed in version 3.5:Synchronized objects support thecontext manager protocol.
The table below compares the syntax for creating shared ctypes objects fromshared memory with the normal ctypes syntax. (In the tableMyStruct
is somesubclass ofctypes.Structure
.)
ctypes | sharedctypes using type | sharedctypes using typecode |
---|---|---|
c_double(2.4) | RawValue(c_double, 2.4) | RawValue(‘d’, 2.4) |
MyStruct(4, 6) | RawValue(MyStruct, 4, 6) | |
(c_short * 7)() | RawArray(c_short, 7) | RawArray(‘h’, 7) |
(c_int * 3)(9, 2, 8) | RawArray(c_int, (9, 2, 8)) | RawArray(‘i’, (9, 2, 8)) |
Below is an example where a number of ctypes objects are modified by a childprocess:
frommultiprocessingimportProcess,Lockfrommultiprocessing.sharedctypesimportValue,ArrayfromctypesimportStructure,c_doubleclassPoint(Structure):_fields_=[('x',c_double),('y',c_double)]defmodify(n,x,s,A):n.value**=2x.value**=2s.value=s.value.upper()forainA:a.x**=2a.y**=2if__name__=='__main__':lock=Lock()n=Value('i',7)x=Value(c_double,1.0/3.0,lock=False)s=Array('c',b'hello world',lock=lock)A=Array(Point,[(1.875,-6.25),(-5.75,2.0),(2.375,9.5)],lock=lock)p=Process(target=modify,args=(n,x,s,A))p.start()p.join()print(n.value)print(x.value)print(s.value)print([(a.x,a.y)forainA])
The results printed are
490.1111111111111111HELLO WORLD[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
Managers¶
Managers provide a way to create data which can be shared between differentprocesses, including sharing over a network between processes running ondifferent machines. A manager object controls a server process which managesshared objects. Other processes can access the shared objects by usingproxies.
- multiprocessing.Manager()¶
Returns a started
SyncManager
object whichcan be used for sharing objects between processes. The returned managerobject corresponds to a spawned child process and has methods which willcreate shared objects and return corresponding proxies.
Manager processes will be shutdown as soon as they are garbage collected ortheir parent process exits. The manager classes are defined in themultiprocessing.managers
module:
- classmultiprocessing.managers.BaseManager(address=None,authkey=None,serializer='pickle',ctx=None,*,shutdown_timeout=1.0)¶
Create a BaseManager object.
Once created one should call
start()
orget_server().serve_forever()
to ensurethat the manager object refers to a started manager process.address is the address on which the manager process listens for newconnections. Ifaddress is
None
then an arbitrary one is chosen.authkey is the authentication key which will be used to check thevalidity of incoming connections to the server process. Ifauthkey is
None
thencurrent_process().authkey
is used.Otherwiseauthkey is used and it must be a byte string.serializer must be
'pickle'
(usepickle
serialization) or'xmlrpclib'
(usexmlrpc.client
serialization).ctx is a context object, or
None
(use the current context). See theget_context()
function.shutdown_timeout is a timeout in seconds used to wait until the processused by the manager completes in the
shutdown()
method. If theshutdown times out, the process is terminated. If terminating the processalso times out, the process is killed.Changed in version 3.11:Added theshutdown_timeout parameter.
- start([initializer[,initargs]])¶
Start a subprocess to start the manager. Ifinitializer is not
None
then the subprocess will callinitializer(*initargs)
when it starts.
- get_server()¶
Returns a
Server
object which represents the actual server underthe control of the Manager. TheServer
object supports theserve_forever()
method:>>>frommultiprocessing.managersimportBaseManager>>>manager=BaseManager(address=('',50000),authkey=b'abc')>>>server=manager.get_server()>>>server.serve_forever()
Server
additionally has anaddress
attribute.
- connect()¶
Connect a local manager object to a remote manager process:
>>>frommultiprocessing.managersimportBaseManager>>>m=BaseManager(address=('127.0.0.1',50000),authkey=b'abc')>>>m.connect()
- shutdown()¶
Stop the process used by the manager. This is only available if
start()
has been used to start the server process.This can be called multiple times.
- register(typeid[,callable[,proxytype[,exposed[,method_to_typeid[,create_method]]]]])¶
A classmethod which can be used for registering a type or callable withthe manager class.
typeid is a “type identifier” which is used to identify a particulartype of shared object. This must be a string.
callable is a callable used for creating objects for this typeidentifier. If a manager instance will be connected to theserver using the
connect()
method, or if thecreate_method argument isFalse
then this can be left asNone
.proxytype is a subclass of
BaseProxy
which is used to createproxies for shared objects with thistypeid. IfNone
then a proxyclass is created automatically.exposed is used to specify a sequence of method names which proxies forthis typeid should be allowed to access using
BaseProxy._callmethod()
. (Ifexposed isNone
thenproxytype._exposed_
is used instead if it exists.) In the casewhere no exposed list is specified, all “public methods” of the sharedobject will be accessible. (Here a “public method” means any attributewhich has a__call__()
method and whose name does not beginwith'_'
.)method_to_typeid is a mapping used to specify the return type of thoseexposed methods which should return a proxy. It maps method names totypeid strings. (Ifmethod_to_typeid is
None
thenproxytype._method_to_typeid_
is used instead if it exists.) If amethod’s name is not a key of this mapping or if the mapping isNone
then the object returned by the method will be copied by value.create_method determines whether a method should be created with nametypeid which can be used to tell the server process to create a newshared object and return a proxy for it. By default it is
True
.
BaseManager
instances also have one read-only property:- address¶
The address used by the manager.
Changed in version 3.3:Manager objects support the context management protocol – seeContext Manager Types.
__enter__()
starts theserver process (if it has not already started) and then returns themanager object.__exit__()
callsshutdown()
.In previous versions
__enter__()
did not start themanager’s server process if it was not already started.
- classmultiprocessing.managers.SyncManager¶
A subclass of
BaseManager
which can be used for the synchronizationof processes. Objects of this type are returned bymultiprocessing.Manager()
.Its methods create and returnProxy Objects for anumber of commonly used data types to be synchronized across processes.This notably includes shared lists and dictionaries.
- Barrier(parties[,action[,timeout]])¶
Create a shared
threading.Barrier
object and return aproxy for it.Added in version 3.3.
- BoundedSemaphore([value])¶
Create a shared
threading.BoundedSemaphore
object and return aproxy for it.
- Condition([lock])¶
Create a shared
threading.Condition
object and return a proxy forit.Iflock is supplied then it should be a proxy for a
threading.Lock
orthreading.RLock
object.Changed in version 3.3:The
wait_for()
method was added.
- Event()¶
Create a shared
threading.Event
object and return a proxy for it.
- Lock()¶
Create a shared
threading.Lock
object and return a proxy for it.
- Queue([maxsize])¶
Create a shared
queue.Queue
object and return a proxy for it.
- RLock()¶
Create a shared
threading.RLock
object and return a proxy for it.
- Semaphore([value])¶
Create a shared
threading.Semaphore
object and return a proxy forit.
- Array(typecode,sequence)¶
Create an array and return a proxy for it.
- Value(typecode,value)¶
Create an object with a writable
value
attribute and return a proxyfor it.
Changed in version 3.6:Shared objects are capable of being nested. For example, a sharedcontainer object such as a shared list can contain other shared objectswhich will all be managed and synchronized by the
SyncManager
.
- classmultiprocessing.managers.Namespace¶
A type that can register with
SyncManager
.A namespace object has no public methods, but does have writable attributes.Its representation shows the values of its attributes.
However, when using a proxy for a namespace object, an attribute beginningwith
'_'
will be an attribute of the proxy and not an attribute of thereferent:>>>mp_context=multiprocessing.get_context('spawn')>>>manager=mp_context.Manager()>>>Global=manager.Namespace()>>>Global.x=10>>>Global.y='hello'>>>Global._z=12.3# this is an attribute of the proxy>>>print(Global)Namespace(x=10, y='hello')
Customized managers¶
To create one’s own manager, one creates a subclass ofBaseManager
anduses theregister()
classmethod to register new types orcallables with the manager class. For example:
frommultiprocessing.managersimportBaseManagerclassMathsClass:defadd(self,x,y):returnx+ydefmul(self,x,y):returnx*yclassMyManager(BaseManager):passMyManager.register('Maths',MathsClass)if__name__=='__main__':withMyManager()asmanager:maths=manager.Maths()print(maths.add(4,3))# prints 7print(maths.mul(7,8))# prints 56
Using a remote manager¶
It is possible to run a manager server on one machine and have clients use itfrom other machines (assuming that the firewalls involved allow it).
Running the following commands creates a server for a single shared queue whichremote clients can access:
>>>frommultiprocessing.managersimportBaseManager>>>fromqueueimportQueue>>>queue=Queue()>>>classQueueManager(BaseManager):pass>>>QueueManager.register('get_queue',callable=lambda:queue)>>>m=QueueManager(address=('',50000),authkey=b'abracadabra')>>>s=m.get_server()>>>s.serve_forever()
One client can access the server as follows:
>>>frommultiprocessing.managersimportBaseManager>>>classQueueManager(BaseManager):pass>>>QueueManager.register('get_queue')>>>m=QueueManager(address=('foo.bar.org',50000),authkey=b'abracadabra')>>>m.connect()>>>queue=m.get_queue()>>>queue.put('hello')
Another client can also use it:
>>>frommultiprocessing.managersimportBaseManager>>>classQueueManager(BaseManager):pass>>>QueueManager.register('get_queue')>>>m=QueueManager(address=('foo.bar.org',50000),authkey=b'abracadabra')>>>m.connect()>>>queue=m.get_queue()>>>queue.get()'hello'
Local processes can also access that queue, using the code from above on theclient to access it remotely:
>>>frommultiprocessingimportProcess,Queue>>>frommultiprocessing.managersimportBaseManager>>>classWorker(Process):...def__init__(self,q):...self.q=q...super().__init__()...defrun(self):...self.q.put('local hello')...>>>queue=Queue()>>>w=Worker(queue)>>>w.start()>>>classQueueManager(BaseManager):pass...>>>QueueManager.register('get_queue',callable=lambda:queue)>>>m=QueueManager(address=('',50000),authkey=b'abracadabra')>>>s=m.get_server()>>>s.serve_forever()
Proxy Objects¶
A proxy is an object whichrefers to a shared object which lives (presumably)in a different process. The shared object is said to be thereferent of theproxy. Multiple proxy objects may have the same referent.
A proxy object has methods which invoke corresponding methods of its referent(although not every method of the referent will necessarily be available throughthe proxy). In this way, a proxy can be used just like its referent can:
>>>mp_context=multiprocessing.get_context('spawn')>>>manager=mp_context.Manager()>>>l=manager.list([i*iforiinrange(10)])>>>print(l)[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]>>>print(repr(l))<ListProxy object, typeid 'list' at 0x...>>>>l[4]16>>>l[2:5][4, 9, 16]
Notice that applyingstr()
to a proxy will return the representation ofthe referent, whereas applyingrepr()
will return the representation ofthe proxy.
An important feature of proxy objects is that they are picklable so they can bepassed between processes. As such, a referent can containProxy Objects. This permits nesting of these managedlists, dicts, and otherProxy Objects:
>>>a=manager.list()>>>b=manager.list()>>>a.append(b)# referent of a now contains referent of b>>>print(a,b)[<ListProxy object, typeid 'list' at ...>] []>>>b.append('hello')>>>print(a[0],b)['hello'] ['hello']
Similarly, dict and list proxies may be nested inside one another:
>>>l_outer=manager.list([manager.dict()foriinrange(2)])>>>d_first_inner=l_outer[0]>>>d_first_inner['a']=1>>>d_first_inner['b']=2>>>l_outer[1]['c']=3>>>l_outer[1]['z']=26>>>print(l_outer[0]){'a': 1, 'b': 2}>>>print(l_outer[1]){'c': 3, 'z': 26}
If standard (non-proxy)list
ordict
objects are containedin a referent, modifications to those mutable values will not be propagatedthrough the manager because the proxy has no way of knowing when the valuescontained within are modified. However, storing a value in a container proxy(which triggers a__setitem__
on the proxy object) does propagate throughthe manager and so to effectively modify such an item, one could re-assign themodified value to the container proxy:
# create a list proxy and append a mutable object (a dictionary)lproxy=manager.list()lproxy.append({})# now mutate the dictionaryd=lproxy[0]d['a']=1d['b']=2# at this point, the changes to d are not yet synced, but by# updating the dictionary, the proxy is notified of the changelproxy[0]=d
This approach is perhaps less convenient than employing nestedProxy Objects for most use cases but alsodemonstrates a level of control over the synchronization.
Note
The proxy types inmultiprocessing
do nothing to support comparisonsby value. So, for instance, we have:
>>>manager.list([1,2,3])==[1,2,3]False
One should just use a copy of the referent instead when making comparisons.
- classmultiprocessing.managers.BaseProxy¶
Proxy objects are instances of subclasses of
BaseProxy
.- _callmethod(methodname[,args[,kwds]])¶
Call and return the result of a method of the proxy’s referent.
If
proxy
is a proxy whose referent isobj
then the expressionproxy._callmethod(methodname,args,kwds)
will evaluate the expression
getattr(obj,methodname)(*args,**kwds)
in the manager’s process.
The returned value will be a copy of the result of the call or a proxy toa new shared object – see documentation for themethod_to_typeidargument of
BaseManager.register()
.If an exception is raised by the call, then is re-raised by
_callmethod()
. If some other exception is raised in the manager’sprocess then this is converted into aRemoteError
exception and israised by_callmethod()
.Note in particular that an exception will be raised ifmethodname hasnot beenexposed.
An example of the usage of
_callmethod()
:>>>l=manager.list(range(10))>>>l._callmethod('__len__')10>>>l._callmethod('__getitem__',(slice(2,7),))# equivalent to l[2:7][2, 3, 4, 5, 6]>>>l._callmethod('__getitem__',(20,))# equivalent to l[20]Traceback (most recent call last):...IndexError:list index out of range
- _getvalue()¶
Return a copy of the referent.
If the referent is unpicklable then this will raise an exception.
- __repr__()¶
Return a representation of the proxy object.
- __str__()¶
Return the representation of the referent.
Cleanup¶
A proxy object uses a weakref callback so that when it gets garbage collected itderegisters itself from the manager which owns its referent.
A shared object gets deleted from the manager process when there are no longerany proxies referring to it.
Process Pools¶
One can create a pool of processes which will carry out tasks submitted to itwith thePool
class.
- classmultiprocessing.pool.Pool([processes[,initializer[,initargs[,maxtasksperchild[,context]]]]])¶
A process pool object which controls a pool of worker processes to which jobscan be submitted. It supports asynchronous results with timeouts andcallbacks and has a parallel map implementation.
processes is the number of worker processes to use. Ifprocesses is
None
then the number returned byos.process_cpu_count()
is used.Ifinitializer is not
None
then each worker process will callinitializer(*initargs)
when it starts.maxtasksperchild is the number of tasks a worker process can completebefore it will exit and be replaced with a fresh worker process, to enableunused resources to be freed. The defaultmaxtasksperchild is
None
, whichmeans worker processes will live as long as the pool.context can be used to specify the context used for startingthe worker processes. Usually a pool is created using thefunction
multiprocessing.Pool()
or thePool()
methodof a context object. In both casescontext is setappropriately.Note that the methods of the pool object should only be called bythe process which created the pool.
Warning
multiprocessing.pool
objects have internal resources that need to beproperly managed (like any other resource) by using the pool as a context manageror by callingclose()
andterminate()
manually. Failure to do thiscan lead to the process hanging on finalization.Note that it isnot correct to rely on the garbage collector to destroy the poolas CPython does not assure that the finalizer of the pool will be called(see
object.__del__()
for more information).Changed in version 3.2:Added themaxtasksperchild parameter.
Changed in version 3.4:Added thecontext parameter.
Changed in version 3.13:processes uses
os.process_cpu_count()
by default, instead ofos.cpu_count()
.Note
Worker processes within a
Pool
typically live for the completeduration of the Pool’s work queue. A frequent pattern found in othersystems (such as Apache, mod_wsgi, etc) to free resources held byworkers is to allow a worker within a pool to complete only a setamount of work before being exiting, being cleaned up and a newprocess spawned to replace the old one. Themaxtasksperchildargument to thePool
exposes this ability to the end user.- apply(func[,args[,kwds]])¶
Callfunc with argumentsargs and keyword argumentskwds. It blocksuntil the result is ready. Given this blocks,
apply_async()
isbetter suited for performing work in parallel. Additionally,funcis only executed in one of the workers of the pool.
- apply_async(func[,args[,kwds[,callback[,error_callback]]]])¶
A variant of the
apply()
method which returns aAsyncResult
object.Ifcallback is specified then it should be a callable which accepts asingle argument. When the result becomes readycallback is applied toit, that is unless the call failed, in which case theerror_callbackis applied instead.
Iferror_callback is specified then it should be a callable whichaccepts a single argument. If the target function fails, thentheerror_callback is called with the exception instance.
Callbacks should complete immediately since otherwise the thread whichhandles the results will get blocked.
- map(func,iterable[,chunksize])¶
A parallel equivalent of the
map()
built-in function (it supports onlyoneiterable argument though, for multiple iterables seestarmap()
).It blocks until the result is ready.This method chops the iterable into a number of chunks which it submits tothe process pool as separate tasks. The (approximate) size of thesechunks can be specified by settingchunksize to a positive integer.
Note that it may cause high memory usage for very long iterables. Considerusing
imap()
orimap_unordered()
with explicitchunksizeoption for better efficiency.
- map_async(func,iterable[,chunksize[,callback[,error_callback]]])¶
A variant of the
map()
method which returns aAsyncResult
object.Ifcallback is specified then it should be a callable which accepts asingle argument. When the result becomes readycallback is applied toit, that is unless the call failed, in which case theerror_callbackis applied instead.
Iferror_callback is specified then it should be a callable whichaccepts a single argument. If the target function fails, thentheerror_callback is called with the exception instance.
Callbacks should complete immediately since otherwise the thread whichhandles the results will get blocked.
- imap(func,iterable[,chunksize])¶
A lazier version of
map()
.Thechunksize argument is the same as the one used by the
map()
method. For very long iterables using a large value forchunksize canmake the job completemuch faster than using the default value of1
.Also ifchunksize is
1
then thenext()
method of the iteratorreturned by theimap()
method has an optionaltimeout parameter:next(timeout)
will raisemultiprocessing.TimeoutError
if theresult cannot be returned withintimeout seconds.
- imap_unordered(func,iterable[,chunksize])¶
The same as
imap()
except that the ordering of the results from thereturned iterator should be considered arbitrary. (Only when there isonly one worker process is the order guaranteed to be “correct”.)
- starmap(func,iterable[,chunksize])¶
Like
map()
except that theelements of theiterable are expected to be iterables that areunpacked as arguments.Hence aniterable of
[(1,2),(3,4)]
results in[func(1,2),func(3,4)]
.Added in version 3.3.
- starmap_async(func,iterable[,chunksize[,callback[,error_callback]]])¶
A combination of
starmap()
andmap_async()
that iterates overiterable of iterables and callsfunc with the iterables unpacked.Returns a result object.Added in version 3.3.
- close()¶
Prevents any more tasks from being submitted to the pool. Once all thetasks have been completed the worker processes will exit.
- terminate()¶
Stops the worker processes immediately without completing outstandingwork. When the pool object is garbage collected
terminate()
will becalled immediately.
- join()¶
Wait for the worker processes to exit. One must call
close()
orterminate()
before usingjoin()
.
Changed in version 3.3:Pool objects now support the context management protocol – seeContext Manager Types.
__enter__()
returns thepool object, and__exit__()
callsterminate()
.
- classmultiprocessing.pool.AsyncResult¶
The class of the result returned by
Pool.apply_async()
andPool.map_async()
.- get([timeout])¶
Return the result when it arrives. Iftimeout is not
None
and theresult does not arrive withintimeout seconds thenmultiprocessing.TimeoutError
is raised. If the remote call raisedan exception then that exception will be reraised byget()
.
- wait([timeout])¶
Wait until the result is available or untiltimeout seconds pass.
- ready()¶
Return whether the call has completed.
- successful()¶
Return whether the call completed without raising an exception. Willraise
ValueError
if the result is not ready.Changed in version 3.7:If the result is not ready,
ValueError
is raised instead ofAssertionError
.
The following example demonstrates the use of a pool:
frommultiprocessingimportPoolimporttimedeff(x):returnx*xif__name__=='__main__':withPool(processes=4)aspool:# start 4 worker processesresult=pool.apply_async(f,(10,))# evaluate "f(10)" asynchronously in a single processprint(result.get(timeout=1))# prints "100" unless your computer is *very* slowprint(pool.map(f,range(10)))# prints "[0, 1, 4,..., 81]"it=pool.imap(f,range(10))print(next(it))# prints "0"print(next(it))# prints "1"print(it.next(timeout=1))# prints "4" unless your computer is *very* slowresult=pool.apply_async(time.sleep,(10,))print(result.get(timeout=1))# raises multiprocessing.TimeoutError
Listeners and Clients¶
Usually message passing between processes is done using queues or by usingConnection
objects returned byPipe()
.
However, themultiprocessing.connection
module allows some extraflexibility. It basically gives a high level message oriented API for dealingwith sockets or Windows named pipes. It also has support fordigestauthentication using thehmac
module, and for pollingmultiple connections at the same time.
- multiprocessing.connection.deliver_challenge(connection,authkey)¶
Send a randomly generated message to the other end of the connection and waitfor a reply.
If the reply matches the digest of the message usingauthkey as the keythen a welcome message is sent to the other end of the connection. Otherwise
AuthenticationError
is raised.
- multiprocessing.connection.answer_challenge(connection,authkey)¶
Receive a message, calculate the digest of the message usingauthkey as thekey, and then send the digest back.
If a welcome message is not received, then
AuthenticationError
is raised.
- multiprocessing.connection.Client(address[,family[,authkey]])¶
Attempt to set up a connection to the listener which is using addressaddress, returning a
Connection
.The type of the connection is determined byfamily argument, but this cangenerally be omitted since it can usually be inferred from the format ofaddress. (SeeAddress Formats)
Ifauthkey is given and not
None
, it should be a byte string and will beused as the secret key for an HMAC-based authentication challenge. Noauthentication is done ifauthkey isNone
.AuthenticationError
is raised if authentication fails.SeeAuthentication keys.
- classmultiprocessing.connection.Listener([address[,family[,backlog[,authkey]]]])¶
A wrapper for a bound socket or Windows named pipe which is ‘listening’ forconnections.
address is the address to be used by the bound socket or named pipe of thelistener object.
Note
If an address of ‘0.0.0.0’ is used, the address will not be a connectableend point on Windows. If you require a connectable end-point,you should use ‘127.0.0.1’.
family is the type of socket (or named pipe) to use. This can be one ofthe strings
'AF_INET'
(for a TCP socket),'AF_UNIX'
(for a Unixdomain socket) or'AF_PIPE'
(for a Windows named pipe). Of these onlythe first is guaranteed to be available. Iffamily isNone
then thefamily is inferred from the format ofaddress. Ifaddress is alsoNone
then a default is chosen. This default is the family which isassumed to be the fastest available. SeeAddress Formats. Note that iffamily is'AF_UNIX'
and address isNone
then the socket will be created in aprivate temporary directory created usingtempfile.mkstemp()
.If the listener object uses a socket thenbacklog (1 by default) is passedto the
listen()
method of the socket once it has beenbound.Ifauthkey is given and not
None
, it should be a byte string and will beused as the secret key for an HMAC-based authentication challenge. Noauthentication is done ifauthkey isNone
.AuthenticationError
is raised if authentication fails.SeeAuthentication keys.- accept()¶
Accept a connection on the bound socket or named pipe of the listenerobject and return a
Connection
object.If authentication is attempted and fails, thenAuthenticationError
is raised.
- close()¶
Close the bound socket or named pipe of the listener object. This iscalled automatically when the listener is garbage collected. However itis advisable to call it explicitly.
Listener objects have the following read-only properties:
- address¶
The address which is being used by the Listener object.
- last_accepted¶
The address from which the last accepted connection came. If this isunavailable then it is
None
.
Changed in version 3.3:Listener objects now support the context management protocol – seeContext Manager Types.
__enter__()
returns thelistener object, and__exit__()
callsclose()
.
- multiprocessing.connection.wait(object_list,timeout=None)¶
Wait till an object inobject_list is ready. Returns the list ofthose objects inobject_list which are ready. Iftimeout is afloat then the call blocks for at most that many seconds. Iftimeout is
None
then it will block for an unlimited period.A negative timeout is equivalent to a zero timeout.For both POSIX and Windows, an object can appear inobject_list ifit is
a readable
Connection
object;a connected and readable
socket.socket
object; or
A connection or socket object is ready when there is data availableto be read from it, or the other end has been closed.
POSIX:
wait(object_list,timeout)
almost equivalentselect.select(object_list,[],[],timeout)
. The difference isthat, ifselect.select()
is interrupted by a signal, it canraiseOSError
with an error number ofEINTR
, whereaswait()
will not.Windows: An item inobject_list must either be an integerhandle which is waitable (according to the definition used by thedocumentation of the Win32 function
WaitForMultipleObjects()
)or it can be an object with afileno()
method which returns asocket handle or pipe handle. (Note that pipe handles and sockethandles arenot waitable handles.)Added in version 3.3.
Examples
The following server code creates a listener which uses'secretpassword'
asan authentication key. It then waits for a connection and sends some data tothe client:
frommultiprocessing.connectionimportListenerfromarrayimportarrayaddress=('localhost',6000)# family is deduced to be 'AF_INET'withListener(address,authkey=b'secret password')aslistener:withlistener.accept()asconn:print('connection accepted from',listener.last_accepted)conn.send([2.25,None,'junk',float])conn.send_bytes(b'hello')conn.send_bytes(array('i',[42,1729]))
The following code connects to the server and receives some data from theserver:
frommultiprocessing.connectionimportClientfromarrayimportarrayaddress=('localhost',6000)withClient(address,authkey=b'secret password')asconn:print(conn.recv())# => [2.25, None, 'junk', float]print(conn.recv_bytes())# => 'hello'arr=array('i',[0,0,0,0,0])print(conn.recv_bytes_into(arr))# => 8print(arr)# => array('i', [42, 1729, 0, 0, 0])
The following code useswait()
towait for messages from multiple processes at once:
frommultiprocessingimportProcess,Pipe,current_processfrommultiprocessing.connectionimportwaitdeffoo(w):foriinrange(10):w.send((i,current_process().name))w.close()if__name__=='__main__':readers=[]foriinrange(4):r,w=Pipe(duplex=False)readers.append(r)p=Process(target=foo,args=(w,))p.start()# We close the writable end of the pipe now to be sure that# p is the only process which owns a handle for it. This# ensures that when p closes its handle for the writable end,# wait() will promptly report the readable end as being ready.w.close()whilereaders:forrinwait(readers):try:msg=r.recv()exceptEOFError:readers.remove(r)else:print(msg)
Address Formats¶
An
'AF_INET'
address is a tuple of the form(hostname,port)
wherehostname is a string andport is an integer.An
'AF_UNIX'
address is a string representing a filename on thefilesystem.An
'AF_PIPE'
address is a string of the formr'\\.\pipe\PipeName'
. To useClient()
to connect to a namedpipe on a remote computer calledServerName one should use an address of theformr'\\ServerName\pipe\PipeName'
instead.
Note that any string beginning with two backslashes is assumed by default to bean'AF_PIPE'
address rather than an'AF_UNIX'
address.
Authentication keys¶
When one usesConnection.recv
, thedata received is automaticallyunpickled. Unfortunately unpickling data from an untrusted source is a securityrisk. ThereforeListener
andClient()
use thehmac
moduleto provide digest authentication.
An authentication key is a byte string which can be thought of as apassword: once a connection is established both ends will demand proofthat the other knows the authentication key. (Demonstrating that bothends are using the same key doesnot involve sending the key overthe connection.)
If authentication is requested but no authentication key is specified then thereturn value ofcurrent_process().authkey
is used (seeProcess
). This value will be automatically inherited byanyProcess
object that the current process creates.This means that (by default) all processes of a multi-process program will sharea single authentication key which can be used when setting up connectionsbetween themselves.
Suitable authentication keys can also be generated by usingos.urandom()
.
Logging¶
Some support for logging is available. Note, however, that thelogging
package does not use process shared locks so it is possible (depending on thehandler type) for messages from different processes to get mixed up.
- multiprocessing.get_logger()¶
Returns the logger used by
multiprocessing
. If necessary, a new onewill be created.When first created the logger has level
logging.NOTSET
and nodefault handler. Messages sent to this logger will not by default propagateto the root logger.Note that on Windows child processes will only inherit the level of theparent process’s logger – any other customization of the logger will not beinherited.
- multiprocessing.log_to_stderr(level=None)¶
This function performs a call to
get_logger()
but in addition toreturning the logger created by get_logger, it adds a handler which sendsoutput tosys.stderr
using format'[%(levelname)s/%(processName)s]%(message)s'
.You can modifylevelname
of the logger by passing alevel
argument.
Below is an example session with logging turned on:
>>>importmultiprocessing,logging>>>logger=multiprocessing.log_to_stderr()>>>logger.setLevel(logging.INFO)>>>logger.warning('doomed')[WARNING/MainProcess] doomed>>>m=multiprocessing.Manager()[INFO/SyncManager-...] child process calling self.run()[INFO/SyncManager-...] created temp directory /.../pymp-...[INFO/SyncManager-...] manager serving at '/.../listener-...'>>>delm[INFO/MainProcess] sending shutdown message to manager[INFO/SyncManager-...] manager exiting with exitcode 0
For a full table of logging levels, see thelogging
module.
Themultiprocessing.dummy
module¶
multiprocessing.dummy
replicates the API ofmultiprocessing
but isno more than a wrapper around thethreading
module.
In particular, thePool
function provided bymultiprocessing.dummy
returns an instance ofThreadPool
, which is a subclass ofPool
that supports all the same method calls but uses a pool ofworker threads rather than worker processes.
- classmultiprocessing.pool.ThreadPool([processes[,initializer[,initargs]]])¶
A thread pool object which controls a pool of worker threads to which jobscan be submitted.
ThreadPool
instances are fully interfacecompatible withPool
instances, and their resources must also beproperly managed, either by using the pool as a context manager or bycallingclose()
andterminate()
manually.processes is the number of worker threads to use. Ifprocesses is
None
then the number returned byos.process_cpu_count()
is used.Ifinitializer is not
None
then each worker process will callinitializer(*initargs)
when it starts.Unlike
Pool
,maxtasksperchild andcontext cannot be provided.Note
A
ThreadPool
shares the same interface asPool
, whichis designed around a pool of processes and predates the introduction oftheconcurrent.futures
module. As such, it inherits someoperations that don’t make sense for a pool backed by threads, and ithas its own type for representing the status of asynchronous jobs,AsyncResult
, that is not understood by any other libraries.Users should generally prefer to use
concurrent.futures.ThreadPoolExecutor
, which has a simplerinterface that was designed around threads from the start, and whichreturnsconcurrent.futures.Future
instances that arecompatible with many other libraries, includingasyncio
.
Programming guidelines¶
There are certain guidelines and idioms which should be adhered to when usingmultiprocessing
.
All start methods¶
The following applies to all start methods.
Avoid shared state
As far as possible one should try to avoid shifting large amounts of databetween processes.
It is probably best to stick to using queues or pipes for communicationbetween processes rather than using the lower level synchronizationprimitives.
Picklability
Ensure that the arguments to the methods of proxies are picklable.
Thread safety of proxies
Do not use a proxy object from more than one thread unless you protect itwith a lock.
(There is never a problem with different processes using thesame proxy.)
Joining zombie processes
On POSIX when a process finishes but has not been joined it becomes a zombie.There should never be very many because each time a new process starts (or
active_children()
is called) all completed processeswhich have not yet been joined will be joined. Also calling a finishedprocess’sProcess.is_alive
willjoin the process. Even so it is probably goodpractice to explicitly join all the processes that you start.
Better to inherit than pickle/unpickle
When using thespawn orforkserver start methods many typesfrom
multiprocessing
need to be picklable so that childprocesses can use them. However, one should generally avoidsending shared objects to other processes using pipes or queues.Instead you should arrange the program so that a process whichneeds access to a shared resource created elsewhere can inherit itfrom an ancestor process.
Avoid terminating processes
Using the
Process.terminate
method to stop a process is liable tocause any shared resources (such as locks, semaphores, pipes and queues)currently being used by the process to become broken or unavailable to otherprocesses.Therefore it is probably best to only consider using
Process.terminate
on processeswhich never use any shared resources.
Joining processes that use queues
Bear in mind that a process that has put items in a queue will wait beforeterminating until all the buffered items are fed by the “feeder” thread tothe underlying pipe. (The child process can call the
Queue.cancel_join_thread
method of the queue to avoid this behaviour.)This means that whenever you use a queue you need to make sure that allitems which have been put on the queue will eventually be removed before theprocess is joined. Otherwise you cannot be sure that processes which haveput items on the queue will terminate. Remember also that non-daemonicprocesses will be joined automatically.
An example which will deadlock is the following:
frommultiprocessingimportProcess,Queuedeff(q):q.put('X'*1000000)if__name__=='__main__':queue=Queue()p=Process(target=f,args=(queue,))p.start()p.join()# this deadlocksobj=queue.get()A fix here would be to swap the last two lines (or simply remove the
p.join()
line).
Explicitly pass resources to child processes
On POSIX using thefork start method, a child process can makeuse of a shared resource created in a parent process using aglobal resource. However, it is better to pass the object as anargument to the constructor for the child process.
Apart from making the code (potentially) compatible with Windowsand the other start methods this also ensures that as long as thechild process is still alive the object will not be garbagecollected in the parent process. This might be important if someresource is freed when the object is garbage collected in theparent process.
So for instance
frommultiprocessingimportProcess,Lockdeff():...dosomethingusing"lock"...if__name__=='__main__':lock=Lock()foriinrange(10):Process(target=f).start()should be rewritten as
frommultiprocessingimportProcess,Lockdeff(l):...dosomethingusing"l"...if__name__=='__main__':lock=Lock()foriinrange(10):Process(target=f,args=(lock,)).start()
Beware of replacingsys.stdin
with a “file like object”
multiprocessing
originally unconditionally called:os.close(sys.stdin.fileno())in the
multiprocessing.Process._bootstrap()
method — this resultedin issues with processes-in-processes. This has been changed to:sys.stdin.close()sys.stdin=open(os.open(os.devnull,os.O_RDONLY),closefd=False)Which solves the fundamental issue of processes colliding with each otherresulting in a bad file descriptor error, but introduces a potential dangerto applications which replace
sys.stdin()
with a “file-like object”with output buffering. This danger is that if multiple processes callclose()
on this file-like object, it could result in the samedata being flushed to the object multiple times, resulting in corruption.If you write a file-like object and implement your own caching, you canmake it fork-safe by storing the pid whenever you append to the cache,and discarding the cache when the pid changes. For example:
@propertydefcache(self):pid=os.getpid()ifpid!=self._pid:self._pid=pidself._cache=[]returnself._cache
Thespawn andforkserver start methods¶
There are a few extra restrictions which don’t apply to theforkstart method.
More picklability
Ensure that all arguments to
Process.__init__()
are picklable.Also, if you subclassProcess
then make sure thatinstances will be picklable when theProcess.start
method is called.
Global variables
Bear in mind that if code run in a child process tries to access a globalvariable, then the value it sees (if any) may not be the same as the valuein the parent process at the time that
Process.start
was called.However, global variables which are just module level constants cause noproblems.
Safe importing of main module
Make sure that the main module can be safely imported by a new Pythoninterpreter without causing unintended side effects (such as starting a newprocess).
For example, using thespawn orforkserver start methodrunning the following module would fail with a
RuntimeError
:frommultiprocessingimportProcessdeffoo():print('hello')p=Process(target=foo)p.start()Instead one should protect the “entry point” of the program by using
if__name__=='__main__':
as follows:frommultiprocessingimportProcess,freeze_support,set_start_methoddeffoo():print('hello')if__name__=='__main__':freeze_support()set_start_method('spawn')p=Process(target=foo)p.start()(The
freeze_support()
line can be omitted if the program will be runnormally instead of frozen.)This allows the newly spawned Python interpreter to safely import the moduleand then run the module’s
foo()
function.Similar restrictions apply if a pool or manager is created in the mainmodule.
Examples¶
Demonstration of how to create and use customized managers and proxies:
frommultiprocessingimportfreeze_supportfrommultiprocessing.managersimportBaseManager,BaseProxyimportoperator##classFoo:deff(self):print('you called Foo.f()')defg(self):print('you called Foo.g()')def_h(self):print('you called Foo._h()')# A simple generator functiondefbaz():foriinrange(10):yieldi*i# Proxy type for generator objectsclassGeneratorProxy(BaseProxy):_exposed_=['__next__']def__iter__(self):returnselfdef__next__(self):returnself._callmethod('__next__')# Function to return the operator moduledefget_operator_module():returnoperator##classMyManager(BaseManager):pass# register the Foo class; make `f()` and `g()` accessible via proxyMyManager.register('Foo1',Foo)# register the Foo class; make `g()` and `_h()` accessible via proxyMyManager.register('Foo2',Foo,exposed=('g','_h'))# register the generator function baz; use `GeneratorProxy` to make proxiesMyManager.register('baz',baz,proxytype=GeneratorProxy)# register get_operator_module(); make public functions accessible via proxyMyManager.register('operator',get_operator_module)##deftest():manager=MyManager()manager.start()print('-'*20)f1=manager.Foo1()f1.f()f1.g()assertnothasattr(f1,'_h')assertsorted(f1._exposed_)==sorted(['f','g'])print('-'*20)f2=manager.Foo2()f2.g()f2._h()assertnothasattr(f2,'f')assertsorted(f2._exposed_)==sorted(['g','_h'])print('-'*20)it=manager.baz()foriinit:print('<%d>'%i,end=' ')print()print('-'*20)op=manager.operator()print('op.add(23, 45) =',op.add(23,45))print('op.pow(2, 94) =',op.pow(2,94))print('op._exposed_ =',op._exposed_)##if__name__=='__main__':freeze_support()test()
UsingPool
:
importmultiprocessingimporttimeimportrandomimportsys## Functions used by test code#defcalculate(func,args):result=func(*args)return'%s says that%s%s =%s'%(multiprocessing.current_process().name,func.__name__,args,result)defcalculatestar(args):returncalculate(*args)defmul(a,b):time.sleep(0.5*random.random())returna*bdefplus(a,b):time.sleep(0.5*random.random())returna+bdeff(x):return1.0/(x-5.0)defpow3(x):returnx**3defnoop(x):pass## Test code#deftest():PROCESSES=4print('Creating pool with%d processes\n'%PROCESSES)withmultiprocessing.Pool(PROCESSES)aspool:## Tests#TASKS=[(mul,(i,7))foriinrange(10)]+ \[(plus,(i,8))foriinrange(10)]results=[pool.apply_async(calculate,t)fortinTASKS]imap_it=pool.imap(calculatestar,TASKS)imap_unordered_it=pool.imap_unordered(calculatestar,TASKS)print('Ordered results using pool.apply_async():')forrinresults:print('\t',r.get())print()print('Ordered results using pool.imap():')forxinimap_it:print('\t',x)print()print('Unordered results using pool.imap_unordered():')forxinimap_unordered_it:print('\t',x)print()print('Ordered results using pool.map() --- will block till complete:')forxinpool.map(calculatestar,TASKS):print('\t',x)print()## Test error handling#print('Testing error handling:')try:print(pool.apply(f,(5,)))exceptZeroDivisionError:print('\tGot ZeroDivisionError as expected from pool.apply()')else:raiseAssertionError('expected ZeroDivisionError')try:print(pool.map(f,list(range(10))))exceptZeroDivisionError:print('\tGot ZeroDivisionError as expected from pool.map()')else:raiseAssertionError('expected ZeroDivisionError')try:print(list(pool.imap(f,list(range(10)))))exceptZeroDivisionError:print('\tGot ZeroDivisionError as expected from list(pool.imap())')else:raiseAssertionError('expected ZeroDivisionError')it=pool.imap(f,list(range(10)))foriinrange(10):try:x=next(it)exceptZeroDivisionError:ifi==5:passexceptStopIteration:breakelse:ifi==5:raiseAssertionError('expected ZeroDivisionError')asserti==9print('\tGot ZeroDivisionError as expected from IMapIterator.next()')print()## Testing timeouts#print('Testing ApplyResult.get() with timeout:',end=' ')res=pool.apply_async(calculate,TASKS[0])while1:sys.stdout.flush()try:sys.stdout.write('\n\t%s'%res.get(0.02))breakexceptmultiprocessing.TimeoutError:sys.stdout.write('.')print()print()print('Testing IMapIterator.next() with timeout:',end=' ')it=pool.imap(calculatestar,TASKS)while1:sys.stdout.flush()try:sys.stdout.write('\n\t%s'%it.next(0.02))exceptStopIteration:breakexceptmultiprocessing.TimeoutError:sys.stdout.write('.')print()print()if__name__=='__main__':multiprocessing.freeze_support()test()
An example showing how to use queues to feed tasks to a collection of workerprocesses and collect the results:
importtimeimportrandomfrommultiprocessingimportProcess,Queue,current_process,freeze_support## Function run by worker processes#defworker(input,output):forfunc,argsiniter(input.get,'STOP'):result=calculate(func,args)output.put(result)## Function used to calculate result#defcalculate(func,args):result=func(*args)return'%s says that%s%s =%s'% \(current_process().name,func.__name__,args,result)## Functions referenced by tasks#defmul(a,b):time.sleep(0.5*random.random())returna*bdefplus(a,b):time.sleep(0.5*random.random())returna+b###deftest():NUMBER_OF_PROCESSES=4TASKS1=[(mul,(i,7))foriinrange(20)]TASKS2=[(plus,(i,8))foriinrange(10)]# Create queuestask_queue=Queue()done_queue=Queue()# Submit tasksfortaskinTASKS1:task_queue.put(task)# Start worker processesforiinrange(NUMBER_OF_PROCESSES):Process(target=worker,args=(task_queue,done_queue)).start()# Get and print resultsprint('Unordered results:')foriinrange(len(TASKS1)):print('\t',done_queue.get())# Add more tasks using `put()`fortaskinTASKS2:task_queue.put(task)# Get and print some more resultsforiinrange(len(TASKS2)):print('\t',done_queue.get())# Tell child processes to stopforiinrange(NUMBER_OF_PROCESSES):task_queue.put('STOP')if__name__=='__main__':freeze_support()test()