- Notifications
You must be signed in to change notification settings - Fork6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
[Compiled Graph] Enhance Compile Graph with Multi-Device Support#51032
base:master
Are you sure you want to change the base?
Conversation
f63223c
to27215f3
Compare@ruisearch42 Good Day. Could you please review this PR? Thanks. |
""" | ||
raise NotImplementedError | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
If the method returns None, is it still necessary to mark it as an abstract method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Keep this class abstrct.
a4d06c9
to705a10d
CompareHi@hipudding , is this ready for review? btw, aDAG was renamed to Compiled Graph:https://docs.ray.io/en/latest/ray-core/compiled-graph/ray-compiled-graph.html |
Yes, I think the main functionality of this PR is ready, I may make some minor adjustments later. Thanks for your review. |
Thanks. I will review tomorrow! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Trying to better understand the PR
python/ray/dag/compiled_dag_node.py Outdated
with self._send_stream: | ||
return self._write() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Why don't we want to use context manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This stream context manager is provided by cupy, it only support CUDA devices, So I just remote context manager and specified stream in read and write.
But I think I can implement a stream context manager in Ray, so cupy is not needed. Which is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Done. use torch StreamContext instead to avoid too many changes.
python/ray/dag/compiled_dag_node.py Outdated
else: | ||
future = ResolvedFuture(val) | ||
self._intermediate_future = future | ||
def reset_and_wait_intermediate_future(self) -> Any: | ||
def reset_and_wait_intermediate_future(self, stream: Any = None) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
update doc for arg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Done. reverted this part.
python/ray/dag/compiled_dag_node.py Outdated
self, | ||
val: Any, | ||
wrap_in_gpu_future: bool, | ||
stream: Any = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
update doc for stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Done. reverted this part.
class _DriverGroupHolder(Communicator): | ||
""" | ||
Communicator place holder for Driver, Since driver may has no | ||
Accelerators, TorchDeviceManager cannot be used. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you elaborate a bit why this is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I replace all torch.cuda.xxx by TorchDeviceManager, including create a _NcclGroup. But driver will keep a communicator instance itself. Driver will not join the group, but need record some information like world size and actors.
But Driver may has no GPU ( or other accelerators), it will choose the wrong TorchDeviceManager (CPUTorchDeviceManager), and get_communicator will return None.
Since Driver will not join the group, it use Communicator only for store some information. So I create a Hardware-independent class for driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Comment updated.
comm_id: int, | ||
rank: Optional[int], | ||
actor_handles: List["ray.actor.ActorHandle"], | ||
acl_stream: Optional[Any], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
What is acl_tream? doc for args
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
acl_stream is the raw stream in Ascend NPU. Just like cuda_stream to GPU.
What about name this stream torch_stream? since acl_stream and cuda_stream are both torch_stream now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Done. I changed it to npu_stream for better understanding.
def get_world_size(self) -> int: | ||
return self._world_size | ||
def send(self, buf: "torch.Tensor", peer_rank: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
many methods in this class look like code duplication from_NcclGroup
with minor changes. Should they be unified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yes, we should indeed reduce duplicate code. I will think about how to design it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Indeed, these two files share logical similarities and contain a lot of duplicate code. However, there are several differences in the details. For example, torch.cuda.xxx needs to be replaced with torch.npu.xxx, hccl_group requires an additional StreamContext, and the set device operation must be performed before get_unique_id and creating the HCCL group. Therefore, I haven’t found a good way to merge these two files. Do you have any suggestions?
1e80d9f
tod682702
CompareHi@ruisearch42, Could you please review this PR again? BTW, I fixed test case fail in doc_test_cgraph_nccl. It blocked this PR. |
@liuxsh9@Bye-legumes@noemotiovon Good day. Could you please review this PR? Thanks. |
11e5d68
to966d687
CompareThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
The overall architecture LGTM. Here are some minor suggestions.
returntorch.device("cuda:0") | ||
returngenerate_uid_id() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Could this method be directly inlined? Or alternatively, could theget_unique_id
method in the base class be defined as str(uuid.uuid4())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Fixed.
def __init__( | ||
self, | ||
world_size: int, | ||
comm_id: int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This parameter is unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Done.
The refactoring looks good to me! It seems that CG will be easily to support more accelerators. |
edbc7a2
toc13c2bf
Compare@ruisearch42 Hello, this PR is now ready for review. According to the issue#51574 description, this PR only includes modifications for multi-backend support and has temporarily removed the HCCL-related logic. The HCCL logic will be implemented in a separate PR. My colleague has already completed the review and fixed the review comments. |
Thanks. Will take a look! |
This commit introduces multi-device support in Compile Graph,extending beyond CUDA's NCCL to accommodate various accelerators.The key changes include:1. Removed dependency on `cupy.cuda.ExternalStream`, Replaced with `torch.{device}.StreamContext` to support diverse hardware.2. Replaced hardcoded `torch.cuda.xxx` calls with `AcceleratorRuntime` Enables automatic detection and invocation of device-specific functions.Signed-off-by: hipudding <huafengchun@gmail.com>
Thanks for this contribution! I'm still looking through the code, but regarding the API changes - can we make it so that you don't need to modify the Device enum to use a custom accelerator type? It would be better if we can only keep the NVIDIA backend in the main codebase and allow the user to provide a custom Communicator and AcceleratorRuntime. This would help maintainability of the project, as we also don't currently have a good way to test all different device types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Thanks for the PR. Please address Stephanie's comment as well.
Assumes that `CUDA_VISIBLE_DEVICES` is set and is a | ||
superset of the `ray.get_gpu_ids()`. | ||
class AcceleratorRuntime: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
These abstractions should be put into a different file rather thanutils.py
(CudaRuntime, or CpuRuntime). | ||
""" | ||
if cls._instance is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This is not thread-safe. Need to think a bit more carefully if we need locking, or just add a comment.
Check if an accelerator communicator is available for the specified device. | ||
If no device is provided, the function determines the default non-CPU device. | ||
Args: | ||
device (Optional[Union["torch.device", str]]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
arg does not match function signature
def has_communicator(self) -> bool: | ||
""" | ||
Check if an accelerator communicator is available for the specified device. | ||
If no device is provided, the function determines the default non-CPU device. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you please rephrase this to be clearer?
""" | ||
raise NotImplementedError | ||
def get_device_context(self, device): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can we have a type hint for the return value?
def get_unique_id(self) -> str: | ||
""" | ||
Generates a unique identifier for communication purposes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Why not call itgenerate_unique_id
orcreate_unique_id
?
def get_device_type(self) -> str: | ||
""" | ||
Retrieves the device type for the accelerator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
If this is device type from PyTorch, mention that
def get_transport_name(self) -> str: | ||
raise NotImplementedError | ||
def recv_stream(self): | ||
raise NotImplementedError | ||
def send_stream(self): | ||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
you don't need these, right?
Also for other methods that already haveraise NotImplementedError
in the base class
import torch | ||
class CommunicatorInfoHolder(Communicator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can we improve the naming a bit? I'm thinking aboutCommunicatorMetadata
, but it's still not accurate enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yes, let's call this something different. How about CommunicatorHandle to match ActorHandle?
Assumes that `CUDA_VISIBLE_DEVICES` is set and is a | ||
superset of the `ray.get_gpu_ids()`. | ||
class AcceleratorRuntime: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
How aboutAcceleratorContext
?
Thanks@stephanie-wang@ruisearch42. I will fix comment issue ASAP. |
Does this mean that the backend connects to new devices by registering Communicator and AcceleratorRuntime? I have considered integrating via with_tensor_transport(custom_Communicator), but this requires specifying the list of participating actors when creating the Communicator, and still need provide custome Acclerator runtime functions. However, I have a question to ask@stephanie-wang: where should the backend Communicator code be archived? Since this Communicator will only be used within Ray, would it be appropriate to place it in a separate folder (e.g., custom_communicators), so that user programs can import the required Communicator(or auto select)? The quality of these Communicators would be ensured by the backend vendors. For Device class, we can set a unified custom backend name, such as customer_accelerator, and dynamically modify the device name when AcceleratorRuntime loads—just like PyTorch’s privateuse1 mechanism. |
Yes, what I am imagining is to have some API call like
Ideally I would like to support CPU and CUDA runtimes out of the box and other Communicators can live outside of the Ray codebase for now. The problem with other accelerators is that right now we don't have a way to test them in CI.
Hmm I'm not sure if I understand this idea. Is it an alternative to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Thanks for this contribution! At a high level, I think it makes a lot of sense to try to abstract out the AcceleratorRuntime.
However, I believe need some iteration on the API calls. It would be useful if you can write a short doc that summarizes all of the APIs that need to be specialized to a specific device and shows how the rest of the code could call them. I think we'll be able to iterate faster on such a doc instead of the code. Happy to discuss synchronously too.
self._send_stream: Union["cp.cuda.Stream", nullcontext] = nullcontext() | ||
self._recv_stream: Union["cp.cuda.Stream", nullcontext] = nullcontext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's still keep a generic type to indicate that this is a context?
on when the future is resolved. If None, the current stream is used. | ||
""" | ||
import cupy as cp | ||
from ray.experimental.channel.utils import AcceleratorRuntime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's import at top-level. Importingcupy
here was only necessary because we don't want a hard dependency on it.
if self._event is None: | ||
return | ||
cp.cuda.runtime.eventDestroy(self._event.ptr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you bring this code back? Without this, the CUDA event can get destroyed before the stream and it may segfault.
Check if an accelerator communicator is available for the specified device. | ||
If no device is provided, the function determines the default non-CPU device. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I am a bit confused about what this method is supposed to do. Why does it always return True for CUDA and False for CPU?
""" | ||
raise NotImplementedError | ||
def is_available(self) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
It is a little confusing that the runtime can be created but not available. I guess this is meant to return whether the device is available for individual actors? Does it ever return anything different fromlen(get_devices()) > 0
?
""" | ||
raise NotImplementedError | ||
def get_unique_id(self) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
defget_unique_id(self)->str: | |
defgenerate_communicator_id(self)->str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
(Let's put communicator in the name)
import torch | ||
class CommunicatorInfoHolder(Communicator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yes, let's call this something different. How about CommunicatorHandle to match ActorHandle?
The driver maintains a communicator instance but does not participate | ||
in the group. However, it still needs to store certain information, | ||
such as world size and actor details. | ||
Since the driver may not have a GPU (or other accelerators), it could | ||
mistakenly select the wrong `TorchDeviceManager` | ||
(e.g., `CPUTorchDeviceManager`), causing `get_communicator` | ||
to return `None`. | ||
Because the driver does not actively join the group and only uses the | ||
communicator for metadata storage, Introduced a hardware-independent | ||
class specifically for the driver. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This comment is written in a bit of a confusing way because it is trying to justify why we need this class, so it only makes sense in the context of this PR.
Instead, can you just write what this class is responsible for?
def get_self_rank(self) -> Optional[int]: | ||
return None | ||
def send(self, tensor: "torch.Tensor", peer_rank: int): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Should we remove these? They seem unnecessary from the driver's perspective?
return "cuda" | ||
class CpuRuntime(AcceleratorRuntime): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I am a bit confused how the CpuRuntime fits in? Is it meant to be used as a default runtime? What happens if an actor requests 1 CPU and 1 GPU, will it have both a CpuRuntime and a CudaRuntime?
Thanks@stephanie-wang, I will update this PR soon. I will first write a document describing the overall work of the PR and the integration method for third-party backends. We’ll discuss based on the document first, and then move on to the code implementation. |
@stephanie-wang@ruisearch42 Good day. Here's thedoc of this feature, feel free to add any comments on it. |
Why are these changes needed?
This PR improves multi-device support in Compile Graph, which significantly reduces Tensor transmission latency by utilizing out-of-band communication. Currently, this feature only supports CUDA’s NCCL. Since Ray already supports multiple accelerators, it is necessary to extend Compile Graph to support multi-device as well.
This PR mainly introduces two key changes:
1.Removed dependency on cupy.cuda.ExternalStream – Since this library only supports CUDA devices, we replaced it with a more general stream context manager to accommodate various accelerators. The new implementation uses torch.{device}.StreamContext.
2.Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime – This allows automatic detection of the accelerator type and invokes the appropriate device-specific functions.
Steps required to add a new backend for CG:
1. Implement the new backend’s Communicator and AcceleratorRuntime.
2. Add the new device to the Device class and incorporate the automatic selection feature for the new device.
Related issue number
This PR is the main part of Task 2 in#51574
It would better to set the function name more general, such as changing requires_nccl to require_communicator. This is implemented in#51061.
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.