mpi4py.futures

Added in version 3.0.0.

This package provides a high-level interface for asynchronously executingcallables on a pool of worker processes using MPI for inter-processcommunication.

Thempi4py.futures package is based onconcurrent.futures fromthe Python standard library. More precisely,mpi4py.futures provides theMPIPoolExecutor class as a concrete implementation of the abstractclassExecutor. Thesubmit() interface schedules a callable tobe executed asynchronously and returns aFutureobject representing the execution of the callable.Future instances can be queried for the callresult or exception. Sets ofFuture instances canbe passed to thewait() andas_completed() functions.

See also

Moduleconcurrent.futures

Documentation of theconcurrent.futures standard module.

MPIPoolExecutor

TheMPIPoolExecutor class uses a pool of MPI processes to executecalls asynchronously. By performing computations in separate processes, itallows to side-step theglobal interpreter lock but also means thatonly picklable objects can be executed and returned. The__main__ modulemust be importable by worker processes, thusMPIPoolExecutor instancesmay not work in the interactive interpreter.

MPIPoolExecutor takes advantage of the dynamic process managementfeatures introduced in the MPI-2 standard. In particular, theMPI.Intracomm.Spawn method ofMPI.COMM_SELF is used in the master (orparent) process to spawn new worker (or child) processes running a Pythoninterpreter. The master process uses a separate thread (one for eachMPIPoolExecutor instance) to communicate back and forth with theworkers. The worker processes serve the execution of tasks in the main (andonly) thread until they are signaled for completion.

Note

The worker processes must import the main script in order tounpickle anycallable defined in the__main__ module and submitted from the masterprocess. Furthermore, the callables may need access to other globalvariables. At the worker processes,mpi4py.futures executes the mainscript code (using therunpy module) under the__worker__namespace to define the__main__ module. The__main__ and__worker__ modules are added tosys.modules (both at themaster and worker processes) to ensure properpickling andunpickling.

Warning

During the initial import phase at the workers, the main script cannotcreate and use newMPIPoolExecutor instances. Otherwise, eachworker would attempt to spawn a new pool of workers, leading to infiniterecursion.mpi4py.futures detects such recursive attempts to spawnnew workers and aborts the MPI execution environment. As the main scriptcode is run under the__worker__ namespace, the easiest way to avoidspawn recursion is using the idiomif__name__=='__main__':... inthe main script.

classmpi4py.futures.MPIPoolExecutor(max_workers=None,initializer=None,initargs=(),**kwargs)

AnExecutor subclass that executes callsasynchronously using a pool of at mostmax_workers processes. Ifmax_workers isNone or not given, its value is determined from theMPI4PY_FUTURES_MAX_WORKERS environment variable if set, or the MPIuniverse size if set, otherwise a single worker process is spawned. Ifmax_workers is lower than or equal to0, then aValueError willbe raised.

initializer is an optional callable that is called at the start of eachworker process before executing any tasks;initargs is a tuple ofarguments passed to the initializer. Ifinitializer raises an exception,all pending tasks and any attempt to submit new tasks to the pool will raiseaBrokenExecutor exception.

Other parameters:

  • python_exe: Path to the Python interpreter executable used to spawnworker processes, otherwisesys.executable is used.

  • python_args:list or iterable with additional command lineflags to pass to the Python executable. Command line flags determined frominspection ofsys.flags,sys.warnoptions andsys._xoptions in are passed unconditionally.

  • mpi_info:dict or iterable yielding(key,value) pairs.These(key,value) pairs are passed (through anMPI.Info object) totheMPI.Intracomm.Spawn call used to spawn worker processes. Thismechanism allows telling the MPI runtime system where and how to start theprocesses. Check the documentation of the backend MPI implementation aboutthe set of keys it interprets and the corresponding format for values.

  • globals:dict or iterable yielding(name,value) pairs toinitialize the main module namespace in worker processes.

  • main: If set toFalse, do not import the__main__ module inworker processes. Settingmain toFalse prevents worker processesfrom accessing definitions in the parent__main__ namespace.

  • path:list or iterable with paths to append tosys.pathin worker processes to extend themodule search path.

  • wdir: Path to set the current working directory in worker processesusingos.chdir(). The initial working directory is set by the MPIimplementation. Quality MPI implementations should honor awdir infokey passed throughmpi_info, although such feature is not mandatory.

  • env:dict or iterable yielding(name,value) pairs withenvironment variables to updateos.environ in worker processes.The initial environment is set by the MPI implementation. MPIimplementations may allow setting the initial environment throughmpi_info, however such feature is not required nor recommended by theMPI standard.

  • use_pkl5: If set toTrue, usepickle with out-of-band buffersfor interprocess communication. Ifuse_pkl5 is set toNone or notgiven, its value is determined from theMPI4PY_FUTURES_USE_PKL5environment variable. Usingpickle with out-of-band buffers maybenefit applications dealing with large buffer-like objects like NumPyarrays. Seempi4py.util.pkl5 for additional information.

  • backoff:float value specifying the maximum number of seconds aworker thread or process suspends execution withtime.sleep()while idle-waiting. If not set, its value is determined from theMPI4PY_FUTURES_BACKOFF environment variable if set, otherwisethe default value of 0.001 seconds is used. Lower values will reducelatency and increase execution throughput for very short-lived tasks,albeit at the expense of spinning CPU cores and increased energyconsumption.

submit(fn,/,*args,**kwargs)

Schedule the callablefn to be executed asfn(*args,**kwargs) and returns aFuture objectrepresenting the execution of the callable.

executor=MPIPoolExecutor(max_workers=1)future=executor.submit(pow,321,1234)print(future.result())
map(fn,*iterables,timeout=None,chunksize=1,buffersize=None,**kwargs)

Similar tomap(fn,*iterables) except:

  • Theiterables are consumed immediately rather than lazily, unlessbuffersize is specified to limit the number of submitted tasks whoseresults have not yet been yielded. If the task buffer is full, thecaller blocks and iteration over theiterables pauses until a resultis yielded from the buffer.

  • fn is executed asynchronously and several calls tofn may be made concurrently, out-of-order, in separate processes.

The returned iterator raises aTimeoutError if__next__() is called and the result isn’t available aftertimeout seconds from the original call tomap().timeout can be an int or a float. Iftimeout is not specified orNone, there is no limit to the wait time.

Iffn raises an exception, then that exception will be raised whenits value is retrieved from the iterator.

This method chopsiterables into a number of chunks which it submits tothe pool as separate tasks. The (approximate) size of these chunks can bespecified by settingchunksize to a positive integer. For very longiterables, using a large value forchunksize can significantly improveperformance compared to the default size of one.

By default, the returned iterator yields results in-order, waiting forsuccessive tasks to complete . This behavior can be changed by passingthe keyword argumentunordered asTrue, then the result iterator willyield a result as soon as any of the tasks complete.

executor=MPIPoolExecutor(max_workers=3)forresultinexecutor.map(pow,[2]*32,range(32)):print(result)

Changed in version 4.1.0:Added thebuffersize parameter.

starmap(fn,iterable,timeout=None,chunksize=1,buffersize=None,**kwargs)

Similar toitertools.starmap(fn,iterable). Used instead ofmap() whenargument parameters are already grouped in tuples from a single iterable(the data has been “pre-zipped”).map(fn,*iterable) isequivalent tostarmap(fn,zip(*iterable)).

executor=MPIPoolExecutor(max_workers=3)iterable=((2,n)forninrange(32))forresultinexecutor.starmap(pow,iterable):print(result)

Changed in version 4.1.0:Added thebuffersize parameter.

shutdown(wait=True,cancel_futures=False)

Signal the executor that it should free any resources that it is usingwhen the currently pending futures are done executing. Calls tosubmit() andmap() madeaftershutdown() will raiseRuntimeError.

Ifwait isTrue then this method will not return until all thepending futures are done executing and the resources associated with theexecutor have been freed. Ifwait isFalse then this method willreturn immediately and the resources associated with the executor will befreed when all pending futures are done executing. Regardless of thevalue ofwait, the entire Python program will not exit until allpending futures are done executing.

Ifcancel_futures isTrue, this method will cancel all pendingfutures that the executor has not started running. Any futures thatare completed or running won’t be cancelled, regardless of the valueofcancel_futures.

You can avoid having to call this method explicitly if you use thewith statement, which will shutdown the executor instance(waiting as ifshutdown() were called withwaitset toTrue).

importtimewithMPIPoolExecutor(max_workers=1)asexecutor:future=executor.submit(time.sleep,2)assertfuture.done()
bootup(wait=True)

Signal the executor that it should allocate eagerly any requiredresources (in particular, MPI worker processes). Ifwait isTrue,thenbootup() will not return until the executorresources are ready to process submissions. Resources are automaticallyallocated in the first call tosubmit(), thuscallingbootup() explicitly is seldom needed.

num_workers

Number or worker processes in the pool.

MPI4PY_FUTURES_MAX_WORKERS

If themax_workers parameter toMPIPoolExecutor isNone or notgiven, theMPI4PY_FUTURES_MAX_WORKERS environment variableprovides a fallback value for the maximum number of MPI worker processes tospawn.

Added in version 3.1.0.

MPI4PY_FUTURES_USE_PKL5

If theuse_pkl5 keyword argument toMPIPoolExecutor isNone ornot given, theMPI4PY_FUTURES_USE_PKL5 environment variableprovides a fallback value for whether the executor should usepicklewith out-of-band buffers for interprocess communication. Accepted values are0 and1 (interpreted asFalse andTrue, respectively), andstrings specifying aYAML boolean value (case-insensitive). Usingpickle with out-of-band buffers may benefit applications dealingwith large buffer-like objects like NumPy arrays. Seempi4py.util.pkl5 for additional information.

Added in version 4.0.0.

MPI4PY_FUTURES_BACKOFF

If thebackoff keyword argument toMPIPoolExecutor is not given,theMPI4PY_FUTURES_BACKOFF environment variable can be set to afloat value specifying the maximum number of seconds a workerthread or process suspends execution withtime.sleep() whileidle-waiting. If not set, the default backoff value is 0.001 seconds. Lowervalues will reduce latency and increase execution throughput for veryshort-lived tasks, albeit at the expense of spinning CPU cores and increasedenergy consumption.

Added in version 4.0.0.

Note

As the master process uses a separate thread to perform MPI communicationwith the workers, the backend MPI implementation should provide support forMPI.THREAD_MULTIPLE. However, some popular MPI implementations do notsupport yet concurrent MPI calls from multiple threads. Additionally, usersmay decide to initialize MPI with a lower level of thread support. If thelevel of thread support in the backend MPI is less thanMPI.THREAD_MULTIPLE,mpi4py.futures will use a global lock toserialize MPI calls. If the level of thread support is less thanMPI.THREAD_SERIALIZED,mpi4py.futures will emit aRuntimeWarning.

Warning

If the level of thread support in the backend MPI is less thanMPI.THREAD_SERIALIZED (i.e, it is eitherMPI.THREAD_SINGLE orMPI.THREAD_FUNNELED), in theorympi4py.futures cannot beused. Rather than raising an exception,mpi4py.futures emits awarning and takes a “cross-fingers” attitude to continue execution in thehope that serializing MPI calls with a global lock will actually work.

MPICommExecutor

Legacy MPI-1 implementations (as well as some vendor MPI-2 implementations) donot support the dynamic process management features introduced in the MPI-2standard. Additionally, job schedulers and batch systems in supercomputingfacilities may pose additional complications to applications using theMPI_Comm_spawn() routine.

With these issues in mind,mpi4py.futures supports an additional, moretraditional, SPMD-like usage pattern requiring MPI-1 calls only. Pythonapplications are started the usual way, e.g., using thempiexeccommand. Python code should make a collective call to theMPICommExecutor context manager to partition the set of MPI processeswithin a MPI communicator in one master processes and many workersprocesses. The master process gets access to anMPIPoolExecutorinstance to submit tasks. Meanwhile, the worker process follow a differentexecution path and team-up to execute the tasks submitted from the master.

Besides alleviating the lack of dynamic process management features in legacyMPI-1 or partial MPI-2 implementations, theMPICommExecutor contextmanager may be useful in classic MPI-based Python applications willing to takeadvantage of the simple, task-based, master/worker approach available in thempi4py.futures package.

classmpi4py.futures.MPICommExecutor(comm=None,root=0)

Context manager forMPIPoolExecutor. This context manager splits aMPI (intra)communicatorcomm (defaults toMPI.COMM_WORLD if not providedorNone) in two disjoint sets: a single master process (with rankrootincomm) and the remaining worker processes. These sets are then connectedthrough an intercommunicator. The target of thewith statementis assigned either anMPIPoolExecutor instance (at the master) orNone (at the workers).

frommpi4pyimportMPIfrommpi4py.futuresimportMPICommExecutorwithMPICommExecutor(MPI.COMM_WORLD,root=0)asexecutor:ifexecutorisnotNone:future=executor.submit(abs,-42)assertfuture.result()==42answer=set(executor.map(abs,[-42,42]))assertanswer=={42}

Warning

IfMPICommExecutor is passed a communicator of size one (e.g.,MPI.COMM_SELF), then the executor instance assigned to the target of thewith statement will execute all submitted tasks in a singleworker thread, thus ensuring that task execution still progressasynchronously. However, theGIL will prevent the main and workerthreads from running concurrently in multicore processors. Moreover, thethread context switching may harm noticeably the performance of CPU-boundtasks. In case of I/O-bound tasks, theGIL is not usually an issue,however, as a single worker thread is used, it progress one task at atime. We advice against usingMPICommExecutor with communicators ofsize one and suggest refactoring your code to use instead aThreadPoolExecutor.

Command line

Recalling the issues related to the lack of support for dynamic processmanagement features in MPI implementations,mpi4py.futures supports analternative usage pattern where Python code (either from scripts, modules, orzip files) is run under command line control of thempi4py.futurespackage by passing-mmpi4py.futures to thepythonexecutable. Thempi4py.futures invocation should be passed apyfile pathto a script (or a zipfile/directory containing a__main__.py file).Additionally,mpi4py.futures accepts-mmod to execute a modulenamedmod,-ccmd to execute a command stringcmd, or even- to read commands from standard input (sys.stdin).Summarizing,mpi4py.futures can be invoked in the following ways:

  • $mpiexec-nnumprocspython-mmpi4py.futurespyfile[arg]...

  • $mpiexec-nnumprocspython-mmpi4py.futures-mmod[arg]...

  • $mpiexec-nnumprocspython-mmpi4py.futures-ccmd[arg]...

  • $mpiexec-nnumprocspython-mmpi4py.futures-[arg]...

Before starting the main script execution,mpi4py.futures splitsMPI.COMM_WORLD in one master (the process with rank 0 inMPI.COMM_WORLD) andnumprocs - 1 workers and connects them through an MPI intercommunicator.Afterwards, the master process proceeds with the execution of the user scriptcode, which eventually createsMPIPoolExecutor instances to submittasks. Meanwhile, the worker processes follow a different execution path toserve the master. Upon successful termination of the main script at the master,the entire MPI execution environment exists gracefully. In case of any unhandledexception in the main script, the master process callsMPI.COMM_WORLD.Abort(1) to prevent deadlocks and force termination of entireMPI execution environment.

Warning

Running scripts under command line control ofmpi4py.futures is quitesimilar to executing a single-process application that spawn additionalworkers as required. However, there is a very important difference usersshould be aware of. AllMPIPoolExecutor instances created at themaster will share the pool of workers. Tasks submitted at the master frommany different executors will be scheduled for execution in random order assoon as a worker is idle. Any executor can easily starve all the workers(e.g., by callingMPIPoolExecutor.map() with long iterables). If thatever happens, submissions from other executors will not be serviced untilfree workers are available.

See also

Command line

Documentation on Python command line interface.

Parallel tasks

Thempi4py.futures package favors an embarrassingly parallel executionmodel involving a series of sequential tasks independent of each other andexecuted asynchronously. Albeit unnatural,MPIPoolExecutor can still beused for handling workloads involving parallel tasks, where worker processescommunicate and coordinate each other via MPI.

mpi4py.futures.get_comm_workers()

Access an intracommunicator grouping MPI worker processes.

Executing parallel tasks withmpi4py.futures requires following somerules, cf. highlighted lines in examplecpi.py :

  • UseMPIPoolExecutor.num_workers to determine the number of workerprocesses in the executor andsubmit exactly one callable per workerprocess using theMPIPoolExecutor.submit() method.

  • The submitted callable must useget_comm_workers() to access anintracommunicator grouping MPI worker processes. Afterwards, it is highlyrecommended calling theBarrier() method on thecommunicator. The barrier synchronization ensures that every worker processis executing the submitted callable exactly once. Afterwards, the paralleltask can safely perform any kind of point-to-point or collective operationusing the returned communicator.

  • TheFuture instances returned byMPIPoolExecutor.submit() should be collected in a sequence.Usewait() with the sequence ofFuture instances to ensure logical completion ofthe parallel task.

Utilities

Thempi4py.futures package provides additional utilities for handlingFuture instances.

mpi4py.futures.collect(fs)

Gather a collection of futures in a new future.

Parameters:

fs – Collection of futures.

Returns:

New future producing as result a list with results fromfs.

mpi4py.futures.compose(future,resulthook=None,excepthook=None)

Compose the completion of a future with result and exception handlers.

Parameters:
  • future – Input future instance.

  • resulthook – Function to be called once the input future completes withsuccess. Once the input future finish running with success, itsresult value is the input argument forresulthook. The result ofresulthook is set as the result of the output future.Ifresulthook isNone, the output future is completeddirectly with the result of the input future.

  • excepthook – Function to be called once the input future completes withfailure. Once the input future finish running with failure, itsexception value is the input argument forexcepthook. Ifexcepthook returns anException instance, it is set asthe exception of the output future. Otherwise, the result ofexcepthook is set as the result of the output future. Ifexcepthook isNone, the output future is set as failed withthe exception from the input future.

Returns:

Output future instance to be completed once the input future iscompleted and eitherresulthook orexcepthook finish executing.

Examples

Computing the Julia set

The followingjulia.py script computes theJulia set and dumps animage to disk in binaryPGM format. The code starts by importingMPIPoolExecutor from thempi4py.futures package. Next, someglobal constants and functions implement the computation of the Julia set. Thecomputations are protected with the standardif__name__=='__main__':... idiom. The image is computed by whole scanlines submitting all thesetasks at once using themap method. The resultiterator yields scanlines in-order as the tasks complete. Finally, eachscanline is dumped to disk.

julia.py
 1frommpi4py.futuresimportMPIPoolExecutor 2 3x0,x1,w=-2.0,+2.0,640*2 4y0,y1,h=-1.5,+1.5,480*2 5dx=(x1-x0)/w 6dy=(y1-y0)/h 7 8c=complex(0,0.65) 910defjulia(x,y):11z=complex(x,y)12n=25513whileabs(z)<3andn>1:14z=z**2+c15n-=116returnn1718defjulia_line(k):19line=bytearray(w)20y=y1-k*dy21forjinrange(w):22x=x0+j*dx23line[j]=julia(x,y)24returnline2526if__name__=='__main__':2728withMPIPoolExecutor()asexecutor:29image=executor.map(julia_line,range(h))30withopen('julia.pgm','wb')asf:31f.write(b'P5%d%d%d\n'%(w,h,255))32forlineinimage:33f.write(line)

The recommended way to execute the script is by using thempiexeccommand specifying one MPI process (master) and (optional but recommended) thedesired MPI universe size, which determines the number of additionaldynamically spawned processes (workers). The MPI universe size is providedeither by a batch system or set by the user via command-line arguments tompiexec or environment variables. Below we provide examples forMPICH and Open MPI implementations[1]. In all of these examples, thempiexec command launches a single master process running the Pythoninterpreter and executing the main script. When required,mpi4py.futuresspawns the pool of 16 worker processes. The master submits tasks to the workersand waits for the results. The workers receive incoming tasks, execute them,and send back the results to the master.

When using MPICH implementation or its derivatives based on the Hydra processmanager, users can set the MPI universe size via the-usize argument tompiexec:

$mpiexec-n1-usize17pythonjulia.py

or, alternatively, by setting theMPIEXEC_UNIVERSE_SIZE environmentvariable:

$envMPIEXEC_UNIVERSE_SIZE=17mpiexec-n1pythonjulia.py

In the Open MPI implementation, the MPI universe size can be set via the-host argument tompiexec:

$mpiexec-n1-hostlocalhost:17pythonjulia.py

Another way to specify the number of workers is to use thempi4py.futures-specific environment variableMPI4PY_FUTURES_MAX_WORKERS:

$envMPI4PY_FUTURES_MAX_WORKERS=16mpiexec-n1pythonjulia.py

Note that in this case, the MPI universe size is ignored.

Alternatively, users may decide to execute the script in a more traditionalway, that is, all the MPI processes are started at once. The user script is rununder command-line control ofmpi4py.futures passing the-m flag to thepython executable:

$mpiexec-n17python-mmpi4py.futuresjulia.py

As explained previously, the 17 processes are partitioned in one master and 16workers. The master process executes the main script while the workers executethe tasks submitted by the master.

[1]

When using an MPI implementation other than MPICH or Open MPI, pleasecheck the documentation of the implementation and/or batchsystem for the ways to specify the desired MPI universe size.

Computing Pi (parallel task)

The number\(\pi\) can be approximated via numerical integration with thesimple midpoint rule, that is:

\[\pi = \int_{0}^{1} \frac{4}{1+x^2} \,dx \approx\frac{1}{n} \sum_{i=1}^{n}\frac{4}{1 + \left[\frac{1}{n} \left(i-\frac{1}{2}\right) \right]^2} .\]

The followingcpi.py script computes such approximations usingmpi4py.futures with a parallel task involving a collective reductionoperation. Highlighted lines correspond to the rules discussed inParalleltasks.

cpi.py
 1importmath 2importsys 3frommpi4py.futuresimportMPIPoolExecutor,wait 4frommpi4py.futuresimportget_comm_workers 5 6 7defcompute_pi(n): 8# Access intracommunicator and synchronize 9comm=get_comm_workers()10comm.Barrier()1112rank=comm.Get_rank()13size=comm.Get_size()1415# Local computation16h=1.0/n17s=0.018foriinrange(rank+1,n+1,size):19x=h*(i-0.5)20s+=4.0/(1.0+x**2)21pi_partial=s*h2223# Parallel reduce-to-all24pi=comm.allreduce(pi_partial)2526# All workers return the same value27returnpi282930if__name__=='__main__':31n=int(sys.argv[1])iflen(sys.argv)>1else2563233withMPIPoolExecutor()asexecutor:34# Submit exactly one callable per worker35P=executor.num_workers36fs=[executor.submit(compute_pi,n)for_inrange(P)]3738# Wait for all workers to finish39wait(fs)4041# Get result from the first future object.42# In this particular example, due to using reduce-to-all,43# all the other future objects hold the same result value.44pi=fs[0].result()45print(46f"pi:{pi:.16f}, error:{abs(pi-math.pi):.3e}",47f"({n:d} intervals,{P:d} workers)",48)

To run in modern MPI-2 mode:

$envMPI4PY_FUTURES_MAX_WORKERS=4mpiexec-n1pythoncpi.py128pi: 3.1415977398528137, error: 5.086e-06 (128 intervals, 4 workers)$envMPI4PY_FUTURES_MAX_WORKERS=8mpiexec-n1pythoncpi.py512pi: 3.1415929714812316, error: 3.179e-07 (512 intervals, 8 workers)

To run in legacy MPI-1 mode:

$mpiexec-n5python-mmpi4py.futurescpi.py128pi: 3.1415977398528137, error: 5.086e-06 (128 intervals, 4 workers)$mpiexec-n9python-mmpi4py.futurescpi.py512pi: 3.1415929714812316, error: 3.179e-07 (512 intervals, 8 workers)

Citation

Ifmpi4py.futures been significant to a project that leads to anacademic publication, please acknowledge our work by citing the followingarticle[mpi4py-futures]:

[mpi4py-futures]

M. Rogowski, S. Aseeri, D. Keyes, and L. Dalcin,mpi4py.futures: MPI-Based Asynchronous Task Execution for Python,IEEE Transactions on Parallel and Distributed Systems, 34(2):611-622, 2023.https://doi.org/10.1109/TPDS.2022.3225481