- Notifications
You must be signed in to change notification settings - Fork6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
(WIP) [core][compiled graphs] Unify code paths for NCCL P2P and collectives scheduling#48649
base:master
Are you sure you want to change the base?
Conversation
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Looks great. Some more TODOs before an initial review as we discussed offline:
cc@dengwxn |
@anyscalesam Could you help add a go badge to run more CI tests? Thanks! |
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
This reverts commit941cb73.Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
After your attempt and a second thought, I think this might not be the best way to separate NCCL and non-NCCL ops by introducing another |
As we discussed offline, we should remove all the |
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
First pass. Structure seems right. Will look into details later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I think this can be made simpler. Try to think about how you can achieve the following:
- _NCCLSendNode/_NCCLRecvNode should have the same interface as _CollectiveOperation
- If the above is done properly, I believe we can get rid of most of the parts that need to differentiate between send/recv/collective. I.e. there should be only one
requires_nccl
flag instead of three, and there should only be on kind of DAG op node, aCOMPUTE
node.
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
…uture before sending across actorsSigned-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
…e P2P send/recv operationsSigned-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
python/ray/dag/nccl_operation.py Outdated
def __init__(self): | ||
# Task indices in a compiled DAG. The indices are appended | ||
# in topological order if there are dependencies among the tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Relying on this can be error-prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
TODO: If an actor sends to itself, the current schedule is correct becausetask_idxs
containSEND
andRECV
in order. The order is provided from the topological sort.
@@ -183,6 +185,10 @@ def send(self, buf: "torch.Tensor", peer_rank: int) -> None: | |||
# TODO(rui): find a better approach | |||
self._send_stream.synchronize() | |||
import torch | |||
buf.record_stream(torch.cuda.ExternalStream(self._send_stream.ptr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Maybe use something better thanrecord_stream
.
"Sending the result of an asynchronous NCCL operation across actors. " | ||
"This will block the CPU while waiting for the NCCL operation to finish." | ||
) | ||
value = value.wait(blocking=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
To be improved: Sending GPU future across actors forces a blocking wait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Related: Currently, overlapping is done by immediately launching NCCL operations when their inputs are ready. The downstream task that reads the NCCL operation's output will get a GPU future and wait on it.
Limitations:
- Futures can't be sent across actors, in which case waiting can result in worse performance than executing the DAG with
overlap_gpu_communication=False
. - Even when futures are only read by the same actor locally, if the actor does multiple NCCL recv operations, they will be scheduled as the first operations in the execution schedule. The
recv_stream.synchronize()
call in_NcclGroup.recv
blocks the CPU on the second recv operation. (TODO: Determine if the synchronization is still needed.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
We observe that when futures are sent across actors, overlap is slightly slower than non-overlap version. This is because: (1) There is no benefit from overlapping when futures are sent across actors; (2) The exact stream sync is different in overlap/non-overlap.
Do we need different schedules for overlap/non-overlap? We change schedules and move P2P early in overlap.
Do we actually need to sync before launching another NCCL read/write?
When an actor have multiple NCCL reads, these NCCL reads are not perfectly overlapped since there is sync between each NCCL read.
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
"Sending the result of an asynchronous NCCL operation across actors. " | ||
"This will block the CPU while waiting for the NCCL operation to finish." | ||
) | ||
value = value.wait(blocking=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
We observe that when futures are sent across actors, overlap is slightly slower than non-overlap version. This is because: (1) There is no benefit from overlapping when futures are sent across actors; (2) The exact stream sync is different in overlap/non-overlap.
Do we need different schedules for overlap/non-overlap? We change schedules and move P2P early in overlap.
Do we actually need to sync before launching another NCCL read/write?
When an actor have multiple NCCL reads, these NCCL reads are not perfectly overlapped since there is sync between each NCCL read.
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Just reviewed compiled_dag_node for now, will review the rest soon.
self.input_reader: Optional[ReaderInterface] = None | ||
# NCCL P2P recv uses the NCCL channel instead of the input reader. | ||
if not self.requires_nccl_read: | ||
self.input_reader = SynchronousReader(self.input_channels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Instead of adding this extra logic here, it would be better if we can make sureself.input_channels
is always empty forself.requires_nccl_read
when reaching this line. Same forself.output_channels
andself.requires_nccl_write
.
""" | ||
from ray.experimental.channel.common import ChannelContext | ||
ctx = ChannelContext.get_current().serialization_context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Nice fix here! Can you separate out the fix for CUDA event destruction into a separate PR? This will be faster to merge and less likely to get reverted.
def _read(self, overlap_gpu_communication: bool) -> bool: | ||
def exec_operation_with_contexts( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This naming is slightly confusing because it sounds likecontext
should be passed as an argument to this function. Maybe justexec_operation
and_exec_operation_without_context
for the inner method.
with _device_context_manager(): | ||
with self._send_stream: | ||
return self._write() | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This looks much better!
other_args_to_resolve={ | ||
PARENT_CLASS_NODE_KEY: recv_actor_handle, | ||
P2P_OPERATION_KEY: send_node.nccl_op, | ||
BIND_INDEX_KEY: node._get_bind_index(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Does this mean that the send and recv nodes will get the same bind index as the original DAG node? Will it be a problem that multiple nodes have the same bind index?
executable_tasks.sort( | ||
# If the bind index is the same, there are P2P send/recv tasks. | ||
# The order is determined as follows: | ||
# 1. P2P recv tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I think this is quite brittle and difficult to understand. It would be better if we can think of a way to make all of the bind indices for a given actor unique.
Either we need to compute the new bind indices when creating the CompiledDAG or we need a way to create the NCCL nodes while the user is creating the initial DAG.
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Why are these changes needed?
This PR unifies the code paths for NCCL P2P and collectives. Before, scheduling for NCCL operations is done by splitting each node into three operations:
READ
,COMPUTE
, andWRITE
. This PR simplifies the logic by only keeping the compute node. To ensure scheduling still works, NCCL operations are converted into special types of system-created compute nodes.This PR also allows overlapping NCCL collectives with computation.
NCCL P2P Refactoring
Before this PR, compiling this dag will result in a

TorchTensorNcclChannel
fromfoo
tobar
.This PR adds a

NcclSendNode
afterfoo
and aNcclRecvNode
beforebar
. TheTorchTensorNcclChannel
now connects the two added nodes. Sincefoo
and the send node are on the same actor, the channel fromfoo
to the send node is anIntraProcessChannel
. Same thing for the recv side.Multiple Receivers
In this case, the sender sends to two different receivers.


Only one
NcclSendNode
is created. OneNcclRecvNode
is created per receiver. Like before, there is only 1TorchTensorNcclChannel
.Multiple Senders
The receiver receives from two senders.


1
NcclSendNode
is created per sender. 1NcclRecvNode
is created per argument for the receiver. There are 2 differentTorchTensorNcclChannel
s.Overlap NCCL Collectives
This is done by prioritizing NCCL operations over non-NCCL operations when scheduling, i.e., if both some NCCL operations and some non-NCCL operations are ready to be added into the actors' execution schedules, NCCL operations are always added before the non-NCCL ones.
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.