- Notifications
You must be signed in to change notification settings - Fork6.6k
[Compiled Graph] Enhance Compile Graph with Multi-Device Support#51032
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
f63223c
to27215f3
Compare@ruisearch42 Good Day. Could you please review this PR? Thanks. |
""" | ||
raise NotImplementedError | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
If the method returns None, is it still necessary to mark it as an abstract method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Keep this class abstrct.
a4d06c9
to705a10d
CompareHi@hipudding , is this ready for review? btw, aDAG was renamed to Compiled Graph:https://docs.ray.io/en/latest/ray-core/compiled-graph/ray-compiled-graph.html |
hipudding commentedMar 5, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Yes, I think the main functionality of this PR is ready, I may make some minor adjustments later. Thanks for your review. |
Thanks. I will review tomorrow! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Trying to better understand the PR
python/ray/dag/compiled_dag_node.py Outdated
with self._send_stream: | ||
return self._write() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Why don't we want to use context manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This stream context manager is provided by cupy, it only support CUDA devices, So I just remote context manager and specified stream in read and write.
But I think I can implement a stream context manager in Ray, so cupy is not needed. Which is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Done. use torch StreamContext instead to avoid too many changes.
python/ray/dag/compiled_dag_node.py Outdated
else: | ||
future = ResolvedFuture(val) | ||
self._intermediate_future = future | ||
def reset_and_wait_intermediate_future(self) -> Any: | ||
def reset_and_wait_intermediate_future(self, stream: Any = None) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
update doc for arg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Done. reverted this part.
python/ray/dag/compiled_dag_node.py Outdated
self, | ||
val: Any, | ||
wrap_in_gpu_future: bool, | ||
stream: Any = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
update doc for stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Done. reverted this part.
class _DriverGroupHolder(Communicator): | ||
""" | ||
Communicator place holder for Driver, Since driver may has no | ||
Accelerators, TorchDeviceManager cannot be used. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you elaborate a bit why this is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I replace all torch.cuda.xxx by TorchDeviceManager, including create a _NcclGroup. But driver will keep a communicator instance itself. Driver will not join the group, but need record some information like world size and actors.
But Driver may has no GPU ( or other accelerators), it will choose the wrong TorchDeviceManager (CPUTorchDeviceManager), and get_communicator will return None.
Since Driver will not join the group, it use Communicator only for store some information. So I create a Hardware-independent class for driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Comment updated.
comm_id: int, | ||
rank: Optional[int], | ||
actor_handles: List["ray.actor.ActorHandle"], | ||
acl_stream: Optional[Any], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
What is acl_tream? doc for args
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
acl_stream is the raw stream in Ascend NPU. Just like cuda_stream to GPU.
What about name this stream torch_stream? since acl_stream and cuda_stream are both torch_stream now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Done. I changed it to npu_stream for better understanding.
def get_world_size(self) -> int: | ||
return self._world_size | ||
def send(self, buf: "torch.Tensor", peer_rank: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
many methods in this class look like code duplication from_NcclGroup
with minor changes. Should they be unified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yes, we should indeed reduce duplicate code. I will think about how to design it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Indeed, these two files share logical similarities and contain a lot of duplicate code. However, there are several differences in the details. For example, torch.cuda.xxx needs to be replaced with torch.npu.xxx, hccl_group requires an additional StreamContext, and the set device operation must be performed before get_unique_id and creating the HCCL group. Therefore, I haven’t found a good way to merge these two files. Do you have any suggestions?
1e80d9f
tod682702
Comparehipudding commentedMar 7, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Hi@ruisearch42, Could you please review this PR again? BTW, I fixed test case fail in doc_test_cgraph_nccl. It blocked this PR. |
…-project#51032)This PR improves multi-device support in Compile Graph, whichsignificantly reduces Tensor transmission latency by utilizingout-of-band communication. Currently, this feature only supports CUDA'sNCCL. Since Ray already supports multiple accelerators, it is necessaryto extend Compile Graph to support multi-device as well.This PR mainly introduces two key changes:1. Removed dependency on cupy.cuda.ExternalStream - Since this libraryonly supports CUDA devices, we replaced it with a more general streamcontext manager to accommodate various accelerators. The newimplementation uses torch.{device}.StreamContext.2. Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime -This allows automatic detection of the accelerator type and invokes theappropriate device-specific functions.```pythonimport rayimport torchimport torch_npufrom ray.dag import InputNodefrom ray.experimental.channel.hccl_group import _HcclGroupfrom ray.experimental.channel.accelerator_context import register_accelerator_context@ray.remoteclass TorchTensorWorker: def __init__(self): self.device = torch.device('npu:0') torch.npu.set_device(self.device) def send(self, shape, dtype, value: int): return torch.ones(shape, dtype=dtype, device=self.device) * value def recv(self, tensor): return (tensor[0].item(), tensor.shape, tensor.dtype)register_accelerator_context('npu', _HcclGroup)actor_cls = TorchTensorWorker.options(num_cpus=0, resources={'NPU': 1})sender = actor_cls.remote()receiver = actor_cls.remote()with InputNode() as inp: dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) dag = dag.with_tensor_transport(transport='nccl') dag = receiver.recv.bind(dag)shape = (10,)dtype = torch.float16compiled_dag = dag.experimental_compile()for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype)print("Success")```This PR is the main part of Task 2 inray-project#51574It would better to set the function name more general, such as changingrequires_nccl to require_communicator. This is implemented inray-project#51061.Signed-off-by: noemotiovon <757486878@qq.com>Co-authored-by: noemotiovon <757486878@qq.com>
This commit fixes two issues:1.Fixed the issue where with_tensor_transport would automatically select the device based on the environment, even when CUDA was explicitly specified.2.Fixed an NCCL "invalid memory access" error that occurred when ray serve was used with vllm PP > 1.Signed-off-by: hipudding <huafengchun@gmail.com>
This commit fixes two issues:1.Fixed the issue where with_tensor_transport would automatically select the device based on the environment, even when CUDA was explicitly specified.2.Fixed an NCCL "invalid memory access" error that occurred when ray serve was used with vllm PP > 1.Signed-off-by: hipudding <huafengchun@gmail.com>
…-project#51032)This PR improves multi-device support in Compile Graph, whichsignificantly reduces Tensor transmission latency by utilizingout-of-band communication. Currently, this feature only supports CUDA'sNCCL. Since Ray already supports multiple accelerators, it is necessaryto extend Compile Graph to support multi-device as well.This PR mainly introduces two key changes:1. Removed dependency on cupy.cuda.ExternalStream - Since this libraryonly supports CUDA devices, we replaced it with a more general streamcontext manager to accommodate various accelerators. The newimplementation uses torch.{device}.StreamContext.2. Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime -This allows automatic detection of the accelerator type and invokes theappropriate device-specific functions.```pythonimport rayimport torchimport torch_npufrom ray.dag import InputNodefrom ray.experimental.channel.hccl_group import _HcclGroupfrom ray.experimental.channel.accelerator_context import register_accelerator_context@ray.remoteclass TorchTensorWorker: def __init__(self): self.device = torch.device('npu:0') torch.npu.set_device(self.device) def send(self, shape, dtype, value: int): return torch.ones(shape, dtype=dtype, device=self.device) * value def recv(self, tensor): return (tensor[0].item(), tensor.shape, tensor.dtype)register_accelerator_context('npu', _HcclGroup)actor_cls = TorchTensorWorker.options(num_cpus=0, resources={'NPU': 1})sender = actor_cls.remote()receiver = actor_cls.remote()with InputNode() as inp: dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) dag = dag.with_tensor_transport(transport='nccl') dag = receiver.recv.bind(dag)shape = (10,)dtype = torch.float16compiled_dag = dag.experimental_compile()for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype)print("Success")```This PR is the main part of Task 2 inray-project#51574It would better to set the function name more general, such as changingrequires_nccl to require_communicator. This is implemented inray-project#51061.Signed-off-by: huafengchun <huafengchun@gmail.com>Co-authored-by: noemotiovon <757486878@qq.com>
This commit fixes two issues:1.Fixed the issue where with_tensor_transport would automatically select the device based on the environment, even when CUDA was explicitly specified.2.Fixed an NCCL "invalid memory access" error that occurred when ray serve was used with vllm PP > 1.Signed-off-by: huafengchun <huafengchun@gmail.com>
…-project#51032)This PR improves multi-device support in Compile Graph, whichsignificantly reduces Tensor transmission latency by utilizingout-of-band communication. Currently, this feature only supports CUDA'sNCCL. Since Ray already supports multiple accelerators, it is necessaryto extend Compile Graph to support multi-device as well.This PR mainly introduces two key changes:1. Removed dependency on cupy.cuda.ExternalStream - Since this libraryonly supports CUDA devices, we replaced it with a more general streamcontext manager to accommodate various accelerators. The newimplementation uses torch.{device}.StreamContext.2. Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime -This allows automatic detection of the accelerator type and invokes theappropriate device-specific functions.```pythonimport rayimport torchimport torch_npufrom ray.dag import InputNodefrom ray.experimental.channel.hccl_group import _HcclGroupfrom ray.experimental.channel.accelerator_context import register_accelerator_context@ray.remoteclass TorchTensorWorker: def __init__(self): self.device = torch.device('npu:0') torch.npu.set_device(self.device) def send(self, shape, dtype, value: int): return torch.ones(shape, dtype=dtype, device=self.device) * value def recv(self, tensor): return (tensor[0].item(), tensor.shape, tensor.dtype)register_accelerator_context('npu', _HcclGroup)actor_cls = TorchTensorWorker.options(num_cpus=0, resources={'NPU': 1})sender = actor_cls.remote()receiver = actor_cls.remote()with InputNode() as inp: dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) dag = dag.with_tensor_transport(transport='nccl') dag = receiver.recv.bind(dag)shape = (10,)dtype = torch.float16compiled_dag = dag.experimental_compile()for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype)print("Success")```This PR is the main part of Task 2 inray-project#51574It would better to set the function name more general, such as changingrequires_nccl to require_communicator. This is implemented inray-project#51061.Signed-off-by: hipudding <huafengchun@gmail.com>Co-authored-by: noemotiovon <757486878@qq.com>
This commit fixes two issues:1.Fixed the issue where with_tensor_transport would automatically select the device based on the environment, even when CUDA was explicitly specified.2.Fixed an NCCL "invalid memory access" error that occurred when ray serve was used with vllm PP > 1.Signed-off-by: hipudding <huafengchun@gmail.com>
…-project#51032)This PR improves multi-device support in Compile Graph, whichsignificantly reduces Tensor transmission latency by utilizingout-of-band communication. Currently, this feature only supports CUDA'sNCCL. Since Ray already supports multiple accelerators, it is necessaryto extend Compile Graph to support multi-device as well.This PR mainly introduces two key changes:1. Removed dependency on cupy.cuda.ExternalStream - Since this libraryonly supports CUDA devices, we replaced it with a more general streamcontext manager to accommodate various accelerators. The newimplementation uses torch.{device}.StreamContext.2. Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime -This allows automatic detection of the accelerator type and invokes theappropriate device-specific functions.```pythonimport rayimport torchimport torch_npufrom ray.dag import InputNodefrom ray.experimental.channel.hccl_group import _HcclGroupfrom ray.experimental.channel.accelerator_context import register_accelerator_context@ray.remoteclass TorchTensorWorker: def __init__(self): self.device = torch.device('npu:0') torch.npu.set_device(self.device) def send(self, shape, dtype, value: int): return torch.ones(shape, dtype=dtype, device=self.device) * value def recv(self, tensor): return (tensor[0].item(), tensor.shape, tensor.dtype)register_accelerator_context('npu', _HcclGroup)actor_cls = TorchTensorWorker.options(num_cpus=0, resources={'NPU': 1})sender = actor_cls.remote()receiver = actor_cls.remote()with InputNode() as inp: dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) dag = dag.with_tensor_transport(transport='nccl') dag = receiver.recv.bind(dag)shape = (10,)dtype = torch.float16compiled_dag = dag.experimental_compile()for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype)print("Success")```This PR is the main part of Task 2 inray-project#51574It would better to set the function name more general, such as changingrequires_nccl to require_communicator. This is implemented inray-project#51061.Signed-off-by: hipudding <huafengchun@gmail.com>Co-authored-by: noemotiovon <757486878@qq.com>
This commit fixes two issues:1.Fixed the issue where with_tensor_transport would automatically select the device based on the environment, even when CUDA was explicitly specified.2.Fixed an NCCL "invalid memory access" error that occurred when ray serve was used with vllm PP > 1.Signed-off-by: hipudding <huafengchun@gmail.com>
This commit fixes two issues:1.Fixed the issue where with_tensor_transport would automatically select the device based on the environment, even when CUDA was explicitly specified.2.Fixed an NCCL "invalid memory access" error that occurred when ray serve was used with vllm PP > 1.Signed-off-by: hipudding <huafengchun@gmail.com>
…-project#51032)This PR improves multi-device support in Compile Graph, whichsignificantly reduces Tensor transmission latency by utilizingout-of-band communication. Currently, this feature only supports CUDA'sNCCL. Since Ray already supports multiple accelerators, it is necessaryto extend Compile Graph to support multi-device as well.This PR mainly introduces two key changes:1. Removed dependency on cupy.cuda.ExternalStream - Since this libraryonly supports CUDA devices, we replaced it with a more general streamcontext manager to accommodate various accelerators. The newimplementation uses torch.{device}.StreamContext.2. Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime -This allows automatic detection of the accelerator type and invokes theappropriate device-specific functions.```pythonimport rayimport torchimport torch_npufrom ray.dag import InputNodefrom ray.experimental.channel.hccl_group import _HcclGroupfrom ray.experimental.channel.accelerator_context import register_accelerator_context@ray.remoteclass TorchTensorWorker: def __init__(self): self.device = torch.device('npu:0') torch.npu.set_device(self.device) def send(self, shape, dtype, value: int): return torch.ones(shape, dtype=dtype, device=self.device) * value def recv(self, tensor): return (tensor[0].item(), tensor.shape, tensor.dtype)register_accelerator_context('npu', _HcclGroup)actor_cls = TorchTensorWorker.options(num_cpus=0, resources={'NPU': 1})sender = actor_cls.remote()receiver = actor_cls.remote()with InputNode() as inp: dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) dag = dag.with_tensor_transport(transport='nccl') dag = receiver.recv.bind(dag)shape = (10,)dtype = torch.float16compiled_dag = dag.experimental_compile()for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype)print("Success")```This PR is the main part of Task 2 inray-project#51574It would better to set the function name more general, such as changingrequires_nccl to require_communicator. This is implemented inray-project#51061.Signed-off-by: hipudding <huafengchun@gmail.com>Co-authored-by: noemotiovon <757486878@qq.com>
This commit fixes two issues:1.Fixed the issue where with_tensor_transport would automatically select the device based on the environment, even when CUDA was explicitly specified.2.Fixed an NCCL "invalid memory access" error that occurred when ray serve was used with vllm PP > 1.Signed-off-by: hipudding <huafengchun@gmail.com>
…-project#51032)This PR improves multi-device support in Compile Graph, whichsignificantly reduces Tensor transmission latency by utilizingout-of-band communication. Currently, this feature only supports CUDA'sNCCL. Since Ray already supports multiple accelerators, it is necessaryto extend Compile Graph to support multi-device as well.This PR mainly introduces two key changes:1. Removed dependency on cupy.cuda.ExternalStream - Since this libraryonly supports CUDA devices, we replaced it with a more general streamcontext manager to accommodate various accelerators. The newimplementation uses torch.{device}.StreamContext.2. Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime -This allows automatic detection of the accelerator type and invokes theappropriate device-specific functions.```pythonimport rayimport torchimport torch_npufrom ray.dag import InputNodefrom ray.experimental.channel.hccl_group import _HcclGroupfrom ray.experimental.channel.accelerator_context import register_accelerator_context@ray.remoteclass TorchTensorWorker: def __init__(self): self.device = torch.device('npu:0') torch.npu.set_device(self.device) def send(self, shape, dtype, value: int): return torch.ones(shape, dtype=dtype, device=self.device) * value def recv(self, tensor): return (tensor[0].item(), tensor.shape, tensor.dtype)register_accelerator_context('npu', _HcclGroup)actor_cls = TorchTensorWorker.options(num_cpus=0, resources={'NPU': 1})sender = actor_cls.remote()receiver = actor_cls.remote()with InputNode() as inp: dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) dag = dag.with_tensor_transport(transport='nccl') dag = receiver.recv.bind(dag)shape = (10,)dtype = torch.float16compiled_dag = dag.experimental_compile()for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype)print("Success")```This PR is the main part of Task 2 inray-project#51574It would better to set the function name more general, such as changingrequires_nccl to require_communicator. This is implemented inray-project#51061.Signed-off-by: hipudding <huafengchun@gmail.com>Co-authored-by: noemotiovon <757486878@qq.com>
This commit fixes two issues:1.Fixed the issue where with_tensor_transport would automatically select the device based on the environment, even when CUDA was explicitly specified.2.Fixed an NCCL "invalid memory access" error that occurred when ray serve was used with vllm PP > 1.Signed-off-by: hipudding <huafengchun@gmail.com>
…-project#51032)This PR improves multi-device support in Compile Graph, whichsignificantly reduces Tensor transmission latency by utilizingout-of-band communication. Currently, this feature only supports CUDA’sNCCL. Since Ray already supports multiple accelerators, it is necessaryto extend Compile Graph to support multi-device as well.This PR mainly introduces two key changes:1. Removed dependency on cupy.cuda.ExternalStream – Since this libraryonly supports CUDA devices, we replaced it with a more general streamcontext manager to accommodate various accelerators. The newimplementation uses torch.{device}.StreamContext.2. Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime –This allows automatic detection of the accelerator type and invokes theappropriate device-specific functions.### How to add a new backend for CG? here's an example for Ascend NPU:```pythonimport rayimport torchimport torch_npufrom ray.dag import InputNode# implement customer Communicator classfrom ray.experimental.channel.hccl_group import _HcclGroupfrom ray.experimental.channel.accelerator_context import register_accelerator_context@ray.remoteclass TorchTensorWorker: def __init__(self): self.device = torch.device('npu:0') torch.npu.set_device(self.device) def send(self, shape, dtype, value: int): return torch.ones(shape, dtype=dtype, device=self.device) * value def recv(self, tensor): return (tensor[0].item(), tensor.shape, tensor.dtype)# global register accelerator contextregister_accelerator_context('npu', _HcclGroup)actor_cls = TorchTensorWorker.options(num_cpus=0, resources={'NPU': 1})sender = actor_cls.remote()receiver = actor_cls.remote()with InputNode() as inp: dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) dag = dag.with_tensor_transport(transport='nccl') dag = receiver.recv.bind(dag)shape = (10,)dtype = torch.float16compiled_dag = dag.experimental_compile()for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype)print("Success")```This PR is the main part of Task 2 inray-project#51574It would better to set the function name more general, such as changingrequires_nccl to require_communicator. This is implemented inray-project#51061.Signed-off-by: noemotiovon <757486878@qq.com>Co-authored-by: noemotiovon <757486878@qq.com>Signed-off-by: Vicky Tsang <vtsang@amd.com>
…ort (ray-project#51032)" (ray-project#53263)This reverts commit2c7f6d4.`test_torch_tensor_transport_gpu` is [failing onpostmerge](https://buildkite.com/ray-project/postmerge/builds/10332#0196fbb9-7c80-4513-96f7-0250e53fd671/177-959).It appears this test does not run on premerge.Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>Signed-off-by: Vicky Tsang <vtsang@amd.com>
…-project#51032)This PR improves multi-device support in Compile Graph, whichsignificantly reduces Tensor transmission latency by utilizingout-of-band communication. Currently, this feature only supports CUDA’sNCCL. Since Ray already supports multiple accelerators, it is necessaryto extend Compile Graph to support multi-device as well.This PR mainly introduces two key changes:1. Removed dependency on cupy.cuda.ExternalStream – Since this libraryonly supports CUDA devices, we replaced it with a more general streamcontext manager to accommodate various accelerators. The newimplementation uses torch.{device}.StreamContext.2. Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime –This allows automatic detection of the accelerator type and invokes theappropriate device-specific functions.### How to add a new backend for CG? here's an example for Ascend NPU:```pythonimport rayimport torchimport torch_npufrom ray.dag import InputNode# implement customer Communicator classfrom ray.experimental.channel.hccl_group import _HcclGroupfrom ray.experimental.channel.accelerator_context import register_accelerator_context@ray.remoteclass TorchTensorWorker: def __init__(self): self.device = torch.device('npu:0') torch.npu.set_device(self.device) def send(self, shape, dtype, value: int): return torch.ones(shape, dtype=dtype, device=self.device) * value def recv(self, tensor): return (tensor[0].item(), tensor.shape, tensor.dtype)# global register accelerator contextregister_accelerator_context('npu', _HcclGroup)actor_cls = TorchTensorWorker.options(num_cpus=0, resources={'NPU': 1})sender = actor_cls.remote()receiver = actor_cls.remote()with InputNode() as inp: dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) dag = dag.with_tensor_transport(transport='nccl') dag = receiver.recv.bind(dag)shape = (10,)dtype = torch.float16compiled_dag = dag.experimental_compile()for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype)print("Success")```This PR is the main part of Task 2 inray-project#51574 It would better to set the function name more general, such as changingrequires_nccl to require_communicator. This is implemented inray-project#51061.Signed-off-by: noemotiovon <757486878@qq.com>Co-authored-by: noemotiovon <757486878@qq.com>
…ort (ray-project#51032)" (ray-project#53263)This reverts commit2c7f6d4.`test_torch_tensor_transport_gpu` is [failing onpostmerge](https://buildkite.com/ray-project/postmerge/builds/10332#0196fbb9-7c80-4513-96f7-0250e53fd671/177-959).It appears this test does not run on premerge.Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
This commit is a follow-up toray-project#51032, which introduced multi-device supportin the Compiled Graph by leveraging CUDA's NCCL backend for efficient out-of-bandtensor communication. While the current implementation is NCCL-specific, theCompiled Graph runtime is now ready to support a broader range of device typesand collective communication libraries.To prepare for this generalization, this commit introduces the following changes:1. Refactored NCCL-specific naming and interfaces2. Established a pluggable communication backend interfaceThis refactor does not change the behavior for existing NCCL-based Compiled Graphexecution, but lays the foundation for enabling collective communication acrossdiverse hardware accelerators and runtime environments.Signed-off-by: noemotiovon <757486878@qq.com>
This commit is a follow-up toray-project#51032, which introduced multi-device supportin the Compiled Graph by leveraging CUDA's NCCL backend for efficient out-of-bandtensor communication. While the current implementation is NCCL-specific, theCompiled Graph runtime is now ready to support a broader range of device typesand collective communication libraries.To prepare for this generalization, this commit introduces the following changes:1. Refactored NCCL-specific naming and interfaces2. Established a pluggable communication backend interfaceThis refactor does not change the behavior for existing NCCL-based Compiled Graphexecution, but lays the foundation for enabling collective communication acrossdiverse hardware accelerators and runtime environments.Signed-off-by: noemotiovon <757486878@qq.com>
This commit is a follow-up toray-project#51032, which introduced multi-device supportin the Compiled Graph by leveraging CUDA's NCCL backend for efficient out-of-bandtensor communication. While the current implementation is NCCL-specific, theCompiled Graph runtime is now ready to support a broader range of device typesand collective communication libraries.To prepare for this generalization, this commit introduces the following changes:1. Refactored NCCL-specific naming and interfaces2. Established a pluggable communication backend interfaceThis refactor does not change the behavior for existing NCCL-based Compiled Graphexecution, but lays the foundation for enabling collective communication acrossdiverse hardware accelerators and runtime environments.Signed-off-by: noemotiovon <757486878@qq.com>
This commit is a follow-up toray-project#51032, which introduced multi-device supportin the Compiled Graph by leveraging CUDA's NCCL backend for efficient out-of-bandtensor communication. While the current implementation is NCCL-specific, theCompiled Graph runtime is now ready to support a broader range of device typesand collective communication libraries.To prepare for this generalization, this commit introduces the following changes:1. Refactored NCCL-specific naming and interfaces2. Established a pluggable communication backend interfaceThis refactor does not change the behavior for existing NCCL-based Compiled Graphexecution, but lays the foundation for enabling collective communication acrossdiverse hardware accelerators and runtime environments.Signed-off-by: noemotiovon <757486878@qq.com>
…-project#51032)This PR improves multi-device support in Compile Graph, whichsignificantly reduces Tensor transmission latency by utilizingout-of-band communication. Currently, this feature only supports CUDA’sNCCL. Since Ray already supports multiple accelerators, it is necessaryto extend Compile Graph to support multi-device as well.This PR mainly introduces two key changes:1. Removed dependency on cupy.cuda.ExternalStream – Since this libraryonly supports CUDA devices, we replaced it with a more general streamcontext manager to accommodate various accelerators. The newimplementation uses torch.{device}.StreamContext.2. Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime –This allows automatic detection of the accelerator type and invokes theappropriate device-specific functions.### How to add a new backend for CG? here's an example for Ascend NPU:```pythonimport rayimport torchimport torch_npufrom ray.dag import InputNode# implement customer Communicator classfrom ray.experimental.channel.hccl_group import _HcclGroupfrom ray.experimental.channel.accelerator_context import register_accelerator_context@ray.remoteclass TorchTensorWorker: def __init__(self): self.device = torch.device('npu:0') torch.npu.set_device(self.device) def send(self, shape, dtype, value: int): return torch.ones(shape, dtype=dtype, device=self.device) * value def recv(self, tensor): return (tensor[0].item(), tensor.shape, tensor.dtype)# global register accelerator contextregister_accelerator_context('npu', _HcclGroup)actor_cls = TorchTensorWorker.options(num_cpus=0, resources={'NPU': 1})sender = actor_cls.remote()receiver = actor_cls.remote()with InputNode() as inp: dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) dag = dag.with_tensor_transport(transport='nccl') dag = receiver.recv.bind(dag)shape = (10,)dtype = torch.float16compiled_dag = dag.experimental_compile()for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype)print("Success")```This PR is the main part of Task 2 inray-project#51574It would better to set the function name more general, such as changingrequires_nccl to require_communicator. This is implemented inray-project#51061.Signed-off-by: noemotiovon <757486878@qq.com>Co-authored-by: noemotiovon <757486878@qq.com>Signed-off-by: Scott Lee <scott.lee@rebellions.ai>
…ort (ray-project#51032)" (ray-project#53263)This reverts commit2c7f6d4.`test_torch_tensor_transport_gpu` is [failing onpostmerge](https://buildkite.com/ray-project/postmerge/builds/10332#0196fbb9-7c80-4513-96f7-0250e53fd671/177-959).It appears this test does not run on premerge.Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>Signed-off-by: Scott Lee <scott.lee@rebellions.ai>
This commit is a follow-up toray-project#51032, which introduced multi-device supportin the Compiled Graph by leveraging CUDA's NCCL backend for efficient out-of-bandtensor communication. While the current implementation is NCCL-specific, theCompiled Graph runtime is now ready to support a broader range of device typesand collective communication libraries.To prepare for this generalization, this commit introduces the following changes:1. Refactored NCCL-specific naming and interfaces2. Established a pluggable communication backend interfaceThis refactor does not change the behavior for existing NCCL-based Compiled Graphexecution, but lays the foundation for enabling collective communication acrossdiverse hardware accelerators and runtime environments.Signed-off-by: noemotiovon <757486878@qq.com>
This commit is a follow-up toray-project#51032, which introduced multi-device supportin the Compiled Graph by leveraging CUDA's NCCL backend for efficient out-of-bandtensor communication. While the current implementation is NCCL-specific, theCompiled Graph runtime is now ready to support a broader range of device typesand collective communication libraries.To prepare for this generalization, this commit introduces the following changes:1. Refactored NCCL-specific naming and interfaces2. Established a pluggable communication backend interfaceThis refactor does not change the behavior for existing NCCL-based Compiled Graphexecution, but lays the foundation for enabling collective communication acrossdiverse hardware accelerators and runtime environments.Signed-off-by: noemotiovon <757486878@qq.com>
## Why are these changes needed?### BackgroundThis PR is a follow-up to[#51032](#51032), whichintroduced multi-device support in the Compiled Graph by leveragingCUDA's NCCL backend for efficient out-of-band tensor communication.While the current implementation is tightly coupled with NCCL and CUDA,the Compiled Graph runtime is now ready to support a broader spectrum ofdevice types and collective communication backends (e.g., HCCL, RCCL).### What This PR Does?To enable extensibility and backend-agnostic design, this PR introducesthe following core changes:Refactored NCCL-specific naming and APIsNCCL-related modules, classes, and function names have been generalizedto eliminate hardcoded CUDA/NCCL assumptions.Introduced a pluggable communication backend interfaceA unified abstraction layer is added to decouple collectivecommunication logic from any specific implementation. This makes iteasier to support alternative collective libraries and device types inthe future.This refactor does not alter the existing behavior of NCCL-basedCompiled Graph execution. All current workflows using CUDA+NCCL continueto function as before.## Related issue number#51574<!-- For example: "Closes#1234" -->## Checks- [x] I've signed off every commit(by using the -s flag, i.e., `gitcommit -s`) in this PR.- [x] I've run `scripts/format.sh` to lint the changes in this PR.- [ ] I've included any doc changes needed forhttps://docs.ray.io/en/master/.- [ ] I've added any new APIs to the API Reference. For example, if Iadded amethod in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file.- [ ] I've made sure the tests are passing. Note that there might be afew flaky tests, see the recent failures athttps://flakey-tests.ray.io/- Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :(---------Signed-off-by: noemotiovon <757486878@qq.com>
## Why are these changes needed?### BackgroundThis PR is a follow-up to[ray-project#51032](ray-project#51032), whichintroduced multi-device support in the Compiled Graph by leveragingCUDA's NCCL backend for efficient out-of-band tensor communication.While the current implementation is tightly coupled with NCCL and CUDA,the Compiled Graph runtime is now ready to support a broader spectrum ofdevice types and collective communication backends (e.g., HCCL, RCCL).### What This PR Does?To enable extensibility and backend-agnostic design, this PR introducesthe following core changes:Refactored NCCL-specific naming and APIsNCCL-related modules, classes, and function names have been generalizedto eliminate hardcoded CUDA/NCCL assumptions.Introduced a pluggable communication backend interfaceA unified abstraction layer is added to decouple collectivecommunication logic from any specific implementation. This makes iteasier to support alternative collective libraries and device types inthe future.This refactor does not alter the existing behavior of NCCL-basedCompiled Graph execution. All current workflows using CUDA+NCCL continueto function as before.## Related issue numberray-project#51574<!-- For example: "Closesray-project#1234" -->## Checks- [x] I've signed off every commit(by using the -s flag, i.e., `gitcommit -s`) in this PR.- [x] I've run `scripts/format.sh` to lint the changes in this PR.- [ ] I've included any doc changes needed forhttps://docs.ray.io/en/master/.- [ ] I've added any new APIs to the API Reference. For example, if Iadded amethod in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file.- [ ] I've made sure the tests are passing. Note that there might be afew flaky tests, see the recent failures athttps://flakey-tests.ray.io/- Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :(---------Signed-off-by: noemotiovon <757486878@qq.com>Signed-off-by: doyoung <doyoung@anyscale.com>
## Why are these changes needed?### BackgroundThis PR is a follow-up to[ray-project#51032](ray-project#51032), whichintroduced multi-device support in the Compiled Graph by leveragingCUDA's NCCL backend for efficient out-of-band tensor communication.While the current implementation is tightly coupled with NCCL and CUDA,the Compiled Graph runtime is now ready to support a broader spectrum ofdevice types and collective communication backends (e.g., HCCL, RCCL).### What This PR Does?To enable extensibility and backend-agnostic design, this PR introducesthe following core changes:Refactored NCCL-specific naming and APIsNCCL-related modules, classes, and function names have been generalizedto eliminate hardcoded CUDA/NCCL assumptions.Introduced a pluggable communication backend interfaceA unified abstraction layer is added to decouple collectivecommunication logic from any specific implementation. This makes iteasier to support alternative collective libraries and device types inthe future.This refactor does not alter the existing behavior of NCCL-basedCompiled Graph execution. All current workflows using CUDA+NCCL continueto function as before.## Related issue numberray-project#51574<!-- For example: "Closesray-project#1234" -->## Checks- [x] I've signed off every commit(by using the -s flag, i.e., `gitcommit -s`) in this PR.- [x] I've run `scripts/format.sh` to lint the changes in this PR.- [ ] I've included any doc changes needed forhttps://docs.ray.io/en/master/.- [ ] I've added any new APIs to the API Reference. For example, if Iadded amethod in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file.- [ ] I've made sure the tests are passing. Note that there might be afew flaky tests, see the recent failures athttps://flakey-tests.ray.io/- Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :(---------Signed-off-by: noemotiovon <757486878@qq.com>Signed-off-by: doyoung <doyoung@anyscale.com>
Uh oh!
There was an error while loading.Please reload this page.
Why are these changes needed?
This PR improves multi-device support in Compile Graph, which significantly reduces Tensor transmission latency by utilizing out-of-band communication. Currently, this feature only supports CUDA’s NCCL. Since Ray already supports multiple accelerators, it is necessary to extend Compile Graph to support multi-device as well.
This PR mainly introduces two key changes:
1.Removed dependency on cupy.cuda.ExternalStream – Since this library only supports CUDA devices, we replaced it with a more general stream context manager to accommodate various accelerators. The new implementation uses torch.{device}.StreamContext.
2.Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime – This allows automatic detection of the accelerator type and invokes the appropriate device-specific functions.
How to add a new backend for CG? here's an example for Ascend NPU:
Related issue number
This PR is the main part of Task 2 in#51574
It would better to set the function name more general, such as changing requires_nccl to require_communicator. This is implemented in#51061.
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.