Movatterモバイル変換


[0]ホーム

URL:


Skip to main content
Ctrl+K
Try Ray with $100 credit —Start now.

Ray Collective Communication Lib#

The Ray collective communication library (ray.util.collective) offers a set of native collective primitives forcommunication between distributed CPUs or GPUs.

Ray collective communication library

  • enables 10x more efficient out-of-band collective communication between Ray actor and task processes,

  • operates on both distributed CPUs and GPUs,

  • uses NCCL and GLOO as the optional high-performance communication backends,

  • is suitable for distributed ML programs on Ray.

Collective Primitives Support Matrix#

See below the current support matrix for all collective calls with different backends.

Backend

gloo

nccl

Device

CPU

GPU

CPU

GPU

send

recv

broadcast

allreduce

reduce

allgather

gather

scatter

reduce_scatter

all-to-all

barrier

Supported Tensor Types#

  • torch.Tensor

  • numpy.ndarray

  • cupy.ndarray

Usage#

Installation and Importing#

Ray collective library is bundled with the released Ray wheel. Besides Ray, users need to install eitherpyglooorcupy in order to use collective communication with the GLOO and NCCL backend, respectively.

pipinstallpygloopipinstallcupy-cudaxxx# replace xxx with the right cuda version in your environment

To use these APIs, import the collective package in your actor/task or driver code via:

importray.util.collectiveascol

Initialization#

Collective functions operate on collective groups.A collective group contains a number of processes (in Ray, they are usually Ray-managed actors or tasks) that will together enter the collective function calls.Before making collective calls, users need to declare a set of actors/tasks, statically, as a collective group.

Below is an example code snippet that uses the two APIsinit_collective_group() andcreate_collective_group() to initialize collective groups among a fewremote actors. Refer toAPIs for the detailed descriptions of the two APIs.

importrayimportray.util.collectiveascollectiveimportcupyascp@ray.remote(num_gpus=1)classWorker:def__init__(self):self.send=cp.ones((4,),dtype=cp.float32)self.recv=cp.zeros((4,),dtype=cp.float32)defsetup(self,world_size,rank):collective.init_collective_group(world_size,rank,"nccl","default")returnTruedefcompute(self):collective.allreduce(self.send,"default")returnself.senddefdestroy(self):collective.destroy_group()# imperativenum_workers=2workers=[]init_rets=[]foriinrange(num_workers):w=Worker.remote()workers.append(w)init_rets.append(w.setup.remote(num_workers,i))_=ray.get(init_rets)results=ray.get([w.compute.remote()forwinworkers])# declarativeforiinrange(num_workers):w=Worker.remote()workers.append(w)_options={"group_name":"177","world_size":2,"ranks":[0,1],"backend":"nccl"}collective.create_collective_group(workers,**_options)results=ray.get([w.compute.remote()forwinworkers])

Note that for the same set of actors/task processes, multiple collective groups can be constructed, withgroup_name as their unique identifier.This enables to specify complex communication patterns between different (sub)set of processes.

Collective Communication#

Checkthe support matrix for the current status of supported collective calls and backends.

Note that the current set of collective communication API are imperative, and exhibit the following behaviours:

  • All the collective APIs are synchronous blocking calls

  • Since each API only specifies a part of the collective communication, the API is expected to be called by each participating process of the (pre-declared) collective group.Upon all the processes have made the call and rendezvous with each other, the collective communication happens and proceeds.

  • The APIs are imperative and the communication happens out-of-band — they need to be used inside the collective process (actor/task) code.

An example of usingray.util.collective.allreduce is below:

importrayimportcupyimportray.util.collectiveascol@ray.remote(num_gpus=1)classWorker:def__init__(self):self.buffer=cupy.ones((10,),dtype=cupy.float32)defcompute(self):col.allreduce(self.buffer,"default")returnself.buffer# Create two actors A and B and create a collective group following the previous example...A=Worker.remote()B=Worker.remote()# Invoke allreduce remotelyray.get([A.compute.remote(),B.compute.remote()])

Point-to-point Communication#

ray.util.collective also supports P2P send/recv communication between processes.

The send/recv exhibits the same behavior with the collective functions:they are synchronous blocking calls – a pair of send and recv must be called together on paired processes in order to specify the entire communication,and must successfully rendezvous with each other to proceed. See the code example below:

importrayimportcupyimportray.util.collectiveascol@ray.remote(num_gpus=1)classWorker:def__init__(self):self.buffer=cupy.ones((10,),dtype=cupy.float32)defget_buffer(self):returnself.bufferdefdo_send(self,target_rank=0):# this call is blockingcol.send(target_rank)defdo_recv(self,src_rank=0):# this call is blockingcol.recv(src_rank)defdo_allreduce(self):# this call is blocking as wellcol.allreduce(self.buffer)returnself.buffer# Create two actorsA=Worker.remote()B=Worker.remote()# Put A and B in a collective groupcol.create_collective_group([A,B],options={rank=[0,1],...})# let A to send a message to B; a send/recv has to be specified once at each workerray.get([A.do_send.remote(target_rank=1),B.do_recv.remote(src_rank=0)])# An anti-pattern: the following code will hang, because it doesn't instantiate the recv side callray.get([A.do_send.remote(target_rank=1)])

Single-GPU and Multi-GPU Collective Primitives#

In many cluster setups, a machine usually has more than 1 GPU;effectively leveraging the GPU-GPU bandwidth, such asNVLINK,can significantly improve communication performance.

ray.util.collective supports multi-GPU collective calls, in which case, a process (actor/tasks) manages more than 1 GPU (e.g., viaray.remote(num_gpus=4)).Using these multi-GPU collective functions are normally more performance-advantageous than using single-GPU collective APIand spawning the number of processes equal to the number of GPUs.See the API references for the signatures of multi-GPU collective APIs.

Also of note that all multi-GPU APIs are with the following restrictions:

  • Only NCCL backend is supported.

  • Collective processes that make multi-GPU collective or P2P calls need to own the same number of GPU devices.

  • The input to multi-GPU collective functions are normally a list of tensors, each located on a different GPU device owned by the caller process.

An example code utilizing the multi-GPU collective APIs is provided below:

importrayimportray.util.collectiveascollectiveimportcupyascpfromcupy.cudaimportDevice@ray.remote(num_gpus=2)classWorker:def__init__(self):withDevice(0):self.send1=cp.ones((4,),dtype=cp.float32)withDevice(1):self.send2=cp.ones((4,),dtype=cp.float32)*2withDevice(0):self.recv1=cp.ones((4,),dtype=cp.float32)withDevice(1):self.recv2=cp.ones((4,),dtype=cp.float32)*2defsetup(self,world_size,rank):self.rank=rankcollective.init_collective_group(world_size,rank,"nccl","177")returnTruedefallreduce_call(self):collective.allreduce_multigpu([self.send1,self.send2],"177")return[self.send1,self.send2]defp2p_call(self):ifself.rank==0:collective.send_multigpu(self.send1*2,1,1,"8")else:collective.recv_multigpu(self.recv2,0,0,"8")returnself.recv2# Note that the world size is 2 but there are 4 GPUs.num_workers=2workers=[]init_rets=[]foriinrange(num_workers):w=Worker.remote()workers.append(w)init_rets.append(w.setup.remote(num_workers,i))a=ray.get(init_rets)results=ray.get([w.allreduce_call.remote()forwinworkers])results=ray.get([w.p2p_call.remote()forwinworkers])

More Resources#

The following links provide helpful resources on how to efficiently leverage theray.util.collective library.

API References#

APIs exposed under the namespace ray.util.collective.

classray.util.collective.collective.GroupManager[source]#

Use this class to manage the collective groups we created so far.

Each process will have an instance ofGroupManager. Each processcould belong to multiple collective groups. The membership informationand other metadata are stored in the global_group_mgr object.

create_collective_group(backend,world_size,rank,group_name,gloo_timeout)[source]#

The entry to create new collective groups in the manager.

Put the registration and the group information into the managermetadata as well.

get_group_by_name(group_name)[source]#

Get the collective group handle by its name.

destroy_collective_group(group_name)[source]#

Group destructor.

ray.util.collective.collective.is_group_initialized(group_name)[source]#

Check if the group is initialized in this process by the group name.

ray.util.collective.collective.init_collective_group(world_size:int,rank:int,backend='nccl',group_name:str='default',gloo_timeout:int=30000)[source]#

Initialize a collective group inside an actor process.

Parameters:
  • world_size – the total number of processes in the group.

  • rank – the rank of the current process.

  • backend – the CCL backend to use, NCCL or GLOO.

  • group_name – the name of the collective group.

Returns:

None

ray.util.collective.collective.create_collective_group(actors,world_size:int,ranks:List[int],backend='nccl',group_name:str='default',gloo_timeout:int=30000)[source]#

Declare a list of actors as a collective group.

Note: This function should be called in a driver process.

Parameters:
  • actors – a list of actors to be set in a collective group.

  • world_size – the total number of processes in the group.

  • ranks (List[int]) – the rank of each actor.

  • backend – the CCL backend to use, NCCL or GLOO.

  • group_name – the name of the collective group.

Returns:

None

ray.util.collective.collective.destroy_collective_group(group_name:str='default')None[source]#

Destroy a collective group given its group name.

ray.util.collective.collective.get_rank(group_name:str='default')int[source]#

Return the rank of this process in the given group.

Parameters:

group_name – the name of the group to query

Returns:

the rank of this process in the named group,-1 if the group does not exist or the process doesnot belong to the group.

ray.util.collective.collective.get_collective_group_size(group_name:str='default')int[source]#

Return the size of the collective group with the given name.

Parameters:

group_name – the name of the group to query

Returns:

The world size of the collective group, -1 if the group does

not exist or the process does not belong to the group.

ray.util.collective.collective.allreduce(tensor,group_name:str='default',op=ReduceOp.SUM)[source]#

Collective allreduce the tensor across the group.

Parameters:
  • tensor – the tensor to be all-reduced on this process.

  • group_name – the collective group name to perform allreduce.

  • op – The reduce operation.

Returns:

None

ray.util.collective.collective.allreduce_multigpu(tensor_list:list,group_name:str='default',op=ReduceOp.SUM)[source]#

Collective allreduce a list of tensors across the group.

Parameters:
  • tensor_list (List[tensor]) – list of tensors to be allreduced,each on a GPU.

  • group_name – the collective group name to perform allreduce.

Returns:

None

ray.util.collective.collective.barrier(group_name:str='default')[source]#

Barrier all processes in the collective group.

Parameters:

group_name – the name of the group to barrier.

Returns:

None

ray.util.collective.collective.reduce(tensor,dst_rank:int=0,group_name:str='default',op=ReduceOp.SUM)[source]#

Reduce the tensor across the group to the destination rank.

Parameters:
  • tensor – the tensor to be reduced on this process.

  • dst_rank – the rank of the destination process.

  • group_name – the collective group name to perform reduce.

  • op – The reduce operation.

Returns:

None

ray.util.collective.collective.reduce_multigpu(tensor_list:list,dst_rank:int=0,dst_tensor:int=0,group_name:str='default',op=ReduceOp.SUM)[source]#

Reduce the tensor across the group to the destination rankand destination tensor.

Parameters:
  • tensor_list – the list of tensors to be reduced on this process;each tensor located on a GPU.

  • dst_rank – the rank of the destination process.

  • dst_tensor – the index of GPU at the destination.

  • group_name – the collective group name to perform reduce.

  • op – The reduce operation.

Returns:

None

ray.util.collective.collective.broadcast(tensor,src_rank:int=0,group_name:str='default')[source]#

Broadcast the tensor from a source process to all others.

Parameters:
  • tensor – the tensor to be broadcasted (src) or received (destination).

  • src_rank – the rank of the source process.

  • group_name – the collective group name to perform broadcast.

Returns:

None

ray.util.collective.collective.broadcast_multigpu(tensor_list,src_rank:int=0,src_tensor:int=0,group_name:str='default')[source]#

Broadcast the tensor from a source GPU to all other GPUs.

Parameters:
  • tensor_list – the tensors to broadcast (src) or receive (dst).

  • src_rank – the rank of the source process.

  • src_tensor – the index of the source GPU on the source process.

  • group_name – the collective group name to perform broadcast.

Returns:

None

ray.util.collective.collective.allgather(tensor_list:list,tensor,group_name:str='default')[source]#

Allgather tensors from each process of the group into a list.

Parameters:
  • tensor_list – the results, stored as a list of tensors.

  • tensor – the tensor (to be gathered) in the current process

  • group_name – the name of the collective group.

Returns:

None

ray.util.collective.collective.allgather_multigpu(output_tensor_lists:list,input_tensor_list:list,group_name:str='default')[source]#

Allgather tensors from each gpus of the group into lists.

Parameters:
  • output_tensor_lists (List[List[tensor]]) – gathered results, with shapemust be num_gpus * world_size * shape(tensor).

  • input_tensor_list – (List[tensor]): a list of tensors, with shapenum_gpus * shape(tensor).

  • group_name – the name of the collective group.

Returns:

None

ray.util.collective.collective.reducescatter(tensor,tensor_list:list,group_name:str='default',op=ReduceOp.SUM)[source]#

Reducescatter a list of tensors across the group.

Reduce the list of the tensors across each process in the group, thenscatter the reduced list of tensors – one tensor for each process.

Parameters:
  • tensor – the resulted tensor on this process.

  • tensor_list – The list of tensors to be reduced and scattered.

  • group_name – the name of the collective group.

  • op – The reduce operation.

Returns:

None

ray.util.collective.collective.reducescatter_multigpu(output_tensor_list,input_tensor_lists,group_name:str='default',op=ReduceOp.SUM)[source]#

Reducescatter a list of tensors across all GPUs.

Parameters:
  • output_tensor_list – the resulted list of tensors, withshape: num_gpus * shape(tensor).

  • input_tensor_lists – the original tensors, with shape:num_gpus * world_size * shape(tensor).

  • group_name – the name of the collective group.

  • op – The reduce operation.

Returns:

None.

ray.util.collective.collective.send(tensor,dst_rank:int,group_name:str='default')[source]#

Send a tensor to a remote process synchronously.

Parameters:
  • tensor – the tensor to send.

  • dst_rank – the rank of the destination process.

  • group_name – the name of the collective group.

Returns:

None

ray.util.collective.collective.send_multigpu(tensor,dst_rank:int,dst_gpu_index:int,group_name:str='default',n_elements:int=0)[source]#

Send a tensor to a remote GPU synchronously.

The function assumes each process owns >1 GPUs, and the senderprocess and receiver process has equal number of GPUs.

Parameters:
  • tensor – the tensor to send, located on a GPU.

  • dst_rank – the rank of the destination process.

  • dst_gpu_index – the destination gpu index.

  • group_name – the name of the collective group.

  • n_elements – if specified, send the next n elementsfrom the starting address of tensor.

Returns:

None

ray.util.collective.collective.recv(tensor,src_rank:int,group_name:str='default')[source]#

Receive a tensor from a remote process synchronously.

Parameters:
  • tensor – the received tensor.

  • src_rank – the rank of the source process.

  • group_name – the name of the collective group.

Returns:

None

ray.util.collective.collective.recv_multigpu(tensor,src_rank:int,src_gpu_index:int,group_name:str='default',n_elements:int=0)[source]#

Receive a tensor from a remote GPU synchronously.

The function asssume each process owns >1 GPUs, and the senderprocess and receiver process has equal nubmer of GPUs.

Parameters:
  • tensor – The received tensor, located on a GPU.

  • src_rank – The rank of the source process.

  • src_gpu_index – The index of the source GPU on the src process.

  • group_name – The name of the collective group.

Returns:

None

ray.util.collective.collective.synchronize(gpu_id:int)[source]#

Synchronize the current process to a give device.

Parameters:

gpu_id – the GPU device id to synchronize.

Returns:

None

ray.util.collective.collective.get_group_handle(group_name:str='default')[source]#

Check if the group is initialized and return the group handle.

Parameters:

group_name – the name of the collective group.

Returns:

The collective group handle.


[8]ページ先頭

©2009-2025 Movatter.jp