Tutorial
Warning
Under construction. Contributions very welcome!
Tip
Rolf Rabenseifner atHLRS developed a comprehensiveMPI-3.1/4.0 course with slides and a large set of exercisesincluding solutions. This material isavailable online for self-study. The slides and exercises show the C,Fortran, and Python (mpi4py) interfaces. For performance reasons,most Python exercises use NumPy arrays and communication routinesinvolving buffer-like objects.
Tip
Victor Eijkhout atTACC authored the bookParallelProgramming for Science and Engineering. This book isavailableonline in PDF andHTML formats. Thebook covers parallel programming with MPI and OpenMP in C/C++ andFortran, and MPI in Python using mpi4py.
MPI for Python supports convenient,pickle-based communication ofgeneric Python object as well as fast, near C-speed, direct array datacommunication of buffer-provider objects (e.g., NumPy arrays).
Communication of generic Python objects
You have to use methods withall-lowercase names, like
Comm.send,Comm.recv,Comm.bcast,Comm.scatter,Comm.gather. An object to be sent is passed as a parameter to thecommunication call, and the received object is simply the returnvalue.The
Comm.isendandComm.irecvmethods returnRequestinstances; completion of these methods can be managed using theRequest.testandRequest.waitmethods.The
Comm.recvandComm.irecvmethods may be passed a bufferobject that can be repeatedly used to receive messages avoidinginternal memory allocation. This buffer must be sufficiently largeto accommodate the transmitted messages; hence, any buffer passed toComm.recvorComm.irecvmust be at least as long as thepickled data transmitted to the receiver.Collective calls like
Comm.scatter,Comm.gather,Comm.allgather,Comm.alltoallexpect a single value or asequence ofComm.sizeelements at the root or all process. Theyreturn a single value, a list ofComm.sizeelements, orNone.Note
MPI for Python uses thehighestprotocol version available in the Python runtime (see the
HIGHEST_PROTOCOLconstant in thepicklemodule). The default protocol can be changed at import time bysetting theMPI4PY_PICKLE_PROTOCOLenvironmentvariable, or at runtime by assigning a different value to thePROTOCOLattribute of thepickleobject within theMPImodule.Communication of buffer-like objects
You have to use method names starting with anupper-case letter,like
Comm.Send,Comm.Recv,Comm.Bcast,Comm.Scatter,Comm.Gather.In general, buffer arguments to these calls must be explicitlyspecified by using a 2/3-list/tuple like
[data,MPI.DOUBLE], or[data,count,MPI.DOUBLE](the former one uses the byte-size ofdataand the extent of the MPI datatype to definecount).For vector collectives communication operations like
Comm.ScattervandComm.Gatherv, buffer arguments arespecified as[data,count,displ,datatype], wherecountanddisplare sequences of integral values.Automatic MPI datatype discovery for NumPy/GPU arrays and PEP-3118buffers is supported, but limited to basic C types (all C/C99-nativesigned/unsigned integral types and single/double precisionreal/complex floating types) and availability of matching datatypesin the underlying MPI implementation. In this case, thebuffer-provider object can be passed directly as a buffer argument,the count and MPI datatype will be inferred.
If mpi4py is built against a GPU-aware MPI implementation, GPUarrays can be passed to upper-case methods as long as they haveeither the
__dlpack__and__dlpack_device__methods or the__cuda_array_interface__attribute that are compliant with therespective standard specifications. Moreover, only C-contiguous orFortran-contiguous GPU arrays are supported. It is important to notethat GPU buffers must be fully ready before any MPI routines operateon them to avoid race conditions. This can be ensured by using thesynchronization API of your array library. mpi4py does not haveaccess to any GPU-specific functionality and thus cannot performthis operation automatically for users.
Running Python scripts with MPI
Most MPI programs can be run with the commandmpiexec. Inpractice, running Python programs looks like:
$ mpiexec -n 4 python script.py
to run the program with 4 processors.
Point-to-Point Communication
Python objects (
pickleunder the hood):frommpi4pyimportMPIcomm=MPI.COMM_WORLDrank=comm.Get_rank()ifrank==0:data={'a':7,'b':3.14}comm.send(data,dest=1,tag=11)elifrank==1:data=comm.recv(source=0,tag=11)
Python objects with non-blocking communication:
frommpi4pyimportMPIcomm=MPI.COMM_WORLDrank=comm.Get_rank()ifrank==0:data={'a':7,'b':3.14}req=comm.isend(data,dest=1,tag=11)req.wait()elifrank==1:req=comm.irecv(source=0,tag=11)data=req.wait()
NumPy arrays (the fast way!):
frommpi4pyimportMPIimportnumpycomm=MPI.COMM_WORLDrank=comm.Get_rank()# passing MPI datatypes explicitlyifrank==0:data=numpy.arange(1000,dtype='i')comm.Send([data,MPI.INT],dest=1,tag=77)elifrank==1:data=numpy.empty(1000,dtype='i')comm.Recv([data,MPI.INT],source=0,tag=77)# automatic MPI datatype discoveryifrank==0:data=numpy.arange(100,dtype=numpy.float64)comm.Send(data,dest=1,tag=13)elifrank==1:data=numpy.empty(100,dtype=numpy.float64)comm.Recv(data,source=0,tag=13)
Collective Communication
Broadcasting a Python dictionary:
frommpi4pyimportMPIcomm=MPI.COMM_WORLDrank=comm.Get_rank()ifrank==0:data={'key1':[7,2.72,2+3j],'key2':('abc','xyz')}else:data=Nonedata=comm.bcast(data,root=0)
Scattering Python objects:
frommpi4pyimportMPIcomm=MPI.COMM_WORLDsize=comm.Get_size()rank=comm.Get_rank()ifrank==0:data=[(i+1)**2foriinrange(size)]else:data=Nonedata=comm.scatter(data,root=0)assertdata==(rank+1)**2
Gathering Python objects:
frommpi4pyimportMPIcomm=MPI.COMM_WORLDsize=comm.Get_size()rank=comm.Get_rank()data=(rank+1)**2data=comm.gather(data,root=0)ifrank==0:foriinrange(size):assertdata[i]==(i+1)**2else:assertdataisNone
Broadcasting a NumPy array:
frommpi4pyimportMPIimportnumpyasnpcomm=MPI.COMM_WORLDrank=comm.Get_rank()ifrank==0:data=np.arange(100,dtype='i')else:data=np.empty(100,dtype='i')comm.Bcast(data,root=0)foriinrange(100):assertdata[i]==i
Scattering NumPy arrays:
frommpi4pyimportMPIimportnumpyasnpcomm=MPI.COMM_WORLDsize=comm.Get_size()rank=comm.Get_rank()sendbuf=Noneifrank==0:sendbuf=np.empty([size,100],dtype='i')sendbuf.T[:,:]=range(size)recvbuf=np.empty(100,dtype='i')comm.Scatter(sendbuf,recvbuf,root=0)assertnp.allclose(recvbuf,rank)
Gathering NumPy arrays:
frommpi4pyimportMPIimportnumpyasnpcomm=MPI.COMM_WORLDsize=comm.Get_size()rank=comm.Get_rank()sendbuf=np.zeros(100,dtype='i')+rankrecvbuf=Noneifrank==0:recvbuf=np.empty([size,100],dtype='i')comm.Gather(sendbuf,recvbuf,root=0)ifrank==0:foriinrange(size):assertnp.allclose(recvbuf[i,:],i)
Parallel matrix-vector product:
frommpi4pyimportMPIimportnumpydefmatvec(comm,A,x):m=A.shape[0]# local rowsp=comm.Get_size()xg=numpy.zeros(m*p,dtype='d')comm.Allgather([x,MPI.DOUBLE],[xg,MPI.DOUBLE])y=numpy.dot(A,xg)returny
Input/Output (MPI-IO)
Collective I/O with NumPy arrays:
frommpi4pyimportMPIimportnumpyasnpamode=MPI.MODE_WRONLY|MPI.MODE_CREATEcomm=MPI.COMM_WORLDfh=MPI.File.Open(comm,"./datafile.contig",amode)buffer=np.empty(10,dtype=np.int)buffer[:]=comm.Get_rank()offset=comm.Get_rank()*buffer.nbytesfh.Write_at_all(offset,buffer)fh.Close()
Non-contiguous Collective I/O with NumPy arrays and datatypes:
frommpi4pyimportMPIimportnumpyasnpcomm=MPI.COMM_WORLDrank=comm.Get_rank()size=comm.Get_size()amode=MPI.MODE_WRONLY|MPI.MODE_CREATEfh=MPI.File.Open(comm,"./datafile.noncontig",amode)item_count=10buffer=np.empty(item_count,dtype='i')buffer[:]=rankfiletype=MPI.INT.Create_vector(item_count,1,size)filetype.Commit()displacement=MPI.INT.Get_size()*rankfh.Set_view(displacement,filetype=filetype)fh.Write_all(buffer)filetype.Free()fh.Close()
Dynamic Process Management
Compute Pi - Master (or parent, or client) side:
#!/usr/bin/env pythonfrommpi4pyimportMPIimportnumpyimportsyscomm=MPI.COMM_SELF.Spawn(sys.executable,args=['cpi.py'],maxprocs=5)N=numpy.array(100,'i')comm.Bcast([N,MPI.INT],root=MPI.ROOT)PI=numpy.array(0.0,'d')comm.Reduce(None,[PI,MPI.DOUBLE],op=MPI.SUM,root=MPI.ROOT)print(PI)comm.Disconnect()
Compute Pi - Worker (or child, or server) side:
#!/usr/bin/env pythonfrommpi4pyimportMPIimportnumpycomm=MPI.Comm.Get_parent()size=comm.Get_size()rank=comm.Get_rank()N=numpy.array(0,dtype='i')comm.Bcast([N,MPI.INT],root=0)h=1.0/N;s=0.0foriinrange(rank,N,size):x=h*(i+0.5)s+=4.0/(1.0+x**2)PI=numpy.array(s*h,dtype='d')comm.Reduce([PI,MPI.DOUBLE],None,op=MPI.SUM,root=0)comm.Disconnect()
GPU-aware MPI + Python GPU arrays
Reduce-to-all CuPy arrays:
frommpi4pyimportMPIimportcupyascpcomm=MPI.COMM_WORLDsize=comm.Get_size()rank=comm.Get_rank()sendbuf=cp.arange(10,dtype='i')recvbuf=cp.empty_like(sendbuf)cp.cuda.get_current_stream().synchronize()comm.Allreduce(sendbuf,recvbuf)assertcp.allclose(recvbuf,sendbuf*size)
One-Sided Communication (RMA)
Read from (write to) the entire RMA window:
importnumpyasnpfrommpi4pyimportMPIfrommpi4py.utilimportdtlibcomm=MPI.COMM_WORLDrank=comm.Get_rank()datatype=MPI.FLOATnp_dtype=dtlib.to_numpy_dtype(datatype)itemsize=datatype.Get_size()N=10win_size=N*itemsizeifrank==0else0win=MPI.Win.Allocate(win_size,comm=comm)buf=np.empty(N,dtype=np_dtype)ifrank==0:buf.fill(42)win.Lock(rank=0)win.Put(buf,target_rank=0)win.Unlock(rank=0)comm.Barrier()else:comm.Barrier()win.Lock(rank=0)win.Get(buf,target_rank=0)win.Unlock(rank=0)assertnp.all(buf==42)
Accessing a part of the RMA window using the
targetargument,which is defined as(offset,count,datatype):importnumpyasnpfrommpi4pyimportMPIfrommpi4py.utilimportdtlibcomm=MPI.COMM_WORLDrank=comm.Get_rank()datatype=MPI.FLOATnp_dtype=dtlib.to_numpy_dtype(datatype)itemsize=datatype.Get_size()N=comm.Get_size()+1win_size=N*itemsizeifrank==0else0win=MPI.Win.Allocate(size=win_size,disp_unit=itemsize,comm=comm,)ifrank==0:mem=np.frombuffer(win,dtype=np_dtype)mem[:]=np.arange(len(mem),dtype=np_dtype)comm.Barrier()buf=np.zeros(3,dtype=np_dtype)target=(rank,2,datatype)win.Lock(rank=0)win.Get(buf,target_rank=0,target=target)win.Unlock(rank=0)assertnp.all(buf==[rank,rank+1,0])
Wrapping with SWIG
C source:
/* file: helloworld.c */voidsayhello(MPI_Commcomm){intsize,rank;MPI_Comm_size(comm,&size);MPI_Comm_rank(comm,&rank);printf("Hello, World! ""I am process %d of %d.\n",rank,size);}
SWIG interface file:
// file: helloworld.i%modulehelloworld%{#include<mpi.h>#include"helloworld.c"}%%includempi4py/mpi4py.i%mpi4py_typemap(Comm,MPI_Comm);voidsayhello(MPI_Commcomm);
Try it in the Python prompt:
>>>frommpi4pyimportMPI>>>importhelloworld>>>helloworld.sayhello(MPI.COMM_WORLD)Hello, World! I am process 0 of 1.
Wrapping with F2Py
Fortran 90 source:
! file: helloworld.f90subroutinesayhello(comm)usempiimplicit noneinteger::comm,rank,size,ierrcallMPI_Comm_size(comm,size,ierr)callMPI_Comm_rank(comm,rank,ierr)print*,'Hello, World! I am process ',rank,' of ',size,'.'end subroutinesayhello
Compiling example using f2py
$ f2py -c --f90exec=mpif90 helloworld.f90 -m helloworld
Try it in the Python prompt:
>>>frommpi4pyimportMPI>>>importhelloworld>>>fcomm=MPI.COMM_WORLD.py2f()>>>helloworld.sayhello(fcomm)Hello, World! I am process 0 of 1.