Movatterモバイル変換


[0]ホーム

URL:


Skip to main content
Metaflow LogoMetaflow Logo

Distributed Computing

Metaflow'sforeach construct allows you to run tasks concurrently.In the caseforeach, tasks execute independently. This pattern works well when the workloadisembarrassingly parallel, that is,tasks don't communicate with each other and they don't have to execute simultaneously.

There are other workloads, such as distributed training of large models, which requiretasks to interact with each other. Metaflow provides another mechanism, the@parallel decorator,which orchestrates such inter-dependent tasks. In effect, the decorator launchesan ephemeral compute cluster on the fly, as a part of a Metaflow flow, benefiting fromMetaflow features likedependency management,versioning, andproduction deployments.

Typically, this pattern is used through one of the framework-specific decorators like@torchrunor@deepspeed, described below, which make it easy to use a particular framework for distributedtraining. If you need low-level access to the cluster, e.g. to use it with a framework that doesn'thave a corresponding high-level decorator yet, see documentation for low-level access at theend of this page.

info

To use distributed computing, followset up instructionshere. If you needhelp getting started, contactMetaflow Slack.

High-level decorators

The easiest way to get started is to use one of the high-level decorators -see an overviewin this blog post:

Decorator ImplementationUXDescriptionPyPi ReleaseExample
@torchrunUsecurrent.torch.run to submit yourtorch.distributed program. No need to log into each node, call the code once in@step.Atorchrun command that runs@step function code on each node.Torch distributed is used under the hood to handle communication between nodes.metaflow-torchrunMinGPT
@deepspeedExposescurrent.deepspeed.run
Requires OpenSSH and OpenMPI installed in the Metaflow task container.
Form MPI cluster with passwordless SSH configured at task runtime (to reduce the risk of leaking private keys). Submit the Deepspeed program and run.metaflow-deepspeedBert &Dolly
@metaflow_rayWrite a Ray program locally or call script from@step function,@metaflow_ray takes care of forming the Ray cluster.Forms aRay cluster dynamically. Runs the@step function code on the control task as Ray’s “head node”.metaflow-rayGPT-J &Distributed XGBoost
@tensorflowPut TensorFlow code in a distributed strategy scope, and call it from step function.Run the@step function code on each node. This means the user picks the appropriatestrategy in their code.metaflow-tensorflowKeras Distributed
@mpiExposescurrent.mpi.cc,current.mpi.broadcast_file,current.mpi.run,current.mpi.exec. Cluster SSH config is handled automatically inside the decorator. Requires OpenSSH and an MPI implementation are installed in the Metaflow task container. It was tested against OpenMPI, which you can find a sample Dockerfile forhere.Forms an MPI cluster with passwordless SSH configured at task runtime. Users can submit ampi4py program or compile, broadcast, and submit a C program.metaflow-mpiLibgrape
info

Note that these decorators are not included in themetaflow package but they are implemented as MetaflowExtensions. You need to install them separately in your development environment, but they will getpackaged automatically by Metaflow, so you don't need to include them in Docker imagesor@conda/@pypi. Also note that the extensions are not part ofthe stable Metaflow API, sothey are subject to change.

tip

When running demanding training workload, it is advisable to usethe@checkpointdecorator to ensure that no progress is lost even if atask hits a spurious failure.

Low-level access

Under the hood, Metaflow guarantees that you get a desired kind and number of compute nodes runningsimultaneously, so that they are able to communicate and coordinate amongst each other.

You can use this compute cluster to implement any distributed computing algorithms of your own.To illustrate this, consider a simple example that sets up a cluster of tasks that communicatewith each other overMPI.Technically, MPI is not required - you could communicate with any protocol you want - but MPI isa popular choice.

MPI example

Let’s create a simple Hello World MPI program based onthis example.The program identifies the main node (rank == 0) that sends a message toall workers nodes which they receive and print out. We usempi4py as a Python wrapper for the MPI protocol.

First, let's create an MPI script,mpi_hello.py:

import mpi4py
from mpi4pyimport MPI

if __name__=="__main__":

comm= MPI.COMM_WORLD
rank= comm.Get_rank()
size= comm.Get_size()

if rank==0:
print(f"Cluster has{size} processes")
for iinrange(1, size):
msg="Main node says hi! 👋"
comm.send(msg, dest=i)
else:
msg= comm.recv()
print(f"👷 Worker node{rank} received message:{msg}")

Next, let's create a flow that launches a cluster of four nodes, thankstonum_parallel=4, and runs the MPI script we defined above in the cluster,launching two worker processes on each node.

from metaflowimport FlowSpec, step, batch, mpi, current

N_CPU=2
N_NODES=4

classMPI4PyFlow(FlowSpec):

@step
defstart(self):
self.next(self.multinode, num_parallel=N_NODES)

@batch(image="eddieob/mpi-base:1", cpu=N_CPU)
@mpi
@step
defmultinode(self):
current.mpi.exec(
args=["-n",str(N_CPU* N_NODES),"--allow-run-as-root"],
program="python mpi_hello.py",
)
self.next(self.join)

@step
defjoin(self, inputs):
self.next(self.end)

@step
defend(self):
pass

if __name__=="__main__":
MPI4PyFlow()

To run the flow, make sure your AWS Batch environment isconfigured to support multinodejobs. Then, installtheMPI extension for Metaflow

pip install metaflow-mpi

and run the flow with

python mpiflow.py run

The example uses an image,eddieob/mpi-base, defined inthis Dockerfile. The imageincludes MPI andssh for communication. Note that Metaflow packagesmpi_hello.py automatically,so it doesn't have to be included in the image.


[8]ページ先頭

©2009-2025 Movatter.jp