Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

(WIP) [core][compiled graphs] Unify code paths for NCCL P2P and collectives scheduling#48649

Open
AndyUB wants to merge145 commits intoray-project:master
base:master
Choose a base branch
Loading
fromAndyUB:union-dev-1105

Conversation

AndyUB
Copy link
Contributor

@AndyUBAndyUB commentedNov 8, 2024
edited
Loading

Why are these changes needed?

This PR unifies the code paths for NCCL P2P and collectives. Before, scheduling for NCCL operations is done by splitting each node into three operations:READ,COMPUTE, andWRITE. This PR simplifies the logic by only keeping the compute node. To ensure scheduling still works, NCCL operations are converted into special types of system-created compute nodes.

This PR also allows overlapping NCCL collectives with computation.

NCCL P2P Refactoring

withInputNode()asinp:dag=actor1.foo.bind(inp)dag=dag.with_tensor_transport("nccl")dag=actor2.bar.bind(dag)

Before this PR, compiling this dag will result in aTorchTensorNcclChannel fromfoo tobar.
image

This PR adds aNcclSendNode afterfoo and aNcclRecvNode beforebar. TheTorchTensorNcclChannel now connects the two added nodes. Sincefoo and the send node are on the same actor, the channel fromfoo to the send node is anIntraProcessChannel. Same thing for the recv side.
image

Multiple Receivers
withInputNode()asinp:dag=actor1.foo.bind(inp)dag=dag.with_tensor_transport("nccl")dag=MultiOutputNode([actor2.bar.bind(dag),actor3.baz.bind(dag)])

In this case, the sender sends to two different receivers.
image
Only oneNcclSendNode is created. OneNcclRecvNode is created per receiver. Like before, there is only 1TorchTensorNcclChannel.
image

Multiple Senders
withInputNode()asinp:branch1=actor1.foo.bind(inp)branch1=branch1.with_tensor_transport("nccl")branch2=actor2.bar.bind(inp)branch2=branch2.with_tensor_transport("nccl")dag=actor3.baz.bind(branch1,branch2)

The receiver receives from two senders.
image
1NcclSendNode is created per sender. 1NcclRecvNode is created per argument for the receiver. There are 2 differentTorchTensorNcclChannels.
image

Overlap NCCL Collectives

This is done by prioritizing NCCL operations over non-NCCL operations when scheduling, i.e., if both some NCCL operations and some non-NCCL operations are ready to be added into the actors' execution schedules, NCCL operations are always added before the non-NCCL ones.

Checks

  • I've signed off every commit(by using the -s flag, i.e.,git commit -s) in this PR.
  • I've runscripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed forhttps://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it indoc/source/tune/api/ under the
      corresponding.rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures athttps://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

dengwxnand others added10 commitsOctober 27, 2024 10:36
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
@dengwxn
Copy link
Contributor

Looks great. Some more TODOs before an initial review as we discussed offline:

  1. Refactor all the[CL] and[TODO] in the code. They are mainly missing comments, unused code blocks, branches to be merged, variable and function names to be renamed, etc.
  2. Introduce a special op node forNCCL_Collective similar to the currentNCCL_READ andNCCL_WRITE, such that theCOMPUTE node does not require NCCL.

cc@dengwxn

@dengwxn
Copy link
Contributor

@anyscalesam Could you help add a go badge to run more CI tests? Thanks!

@AndyUBAndyUB marked this pull request as ready for reviewNovember 8, 2024 18:49
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
This reverts commit941cb73.Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
@dengwxn
Copy link
Contributor

Introduce a special op node for NCCL_Collective similar to the current NCCL_READ and NCCL_WRITE, such that the COMPUTE node does not require NCCL.

After your attempt and a second thought, I think this might not be the best way to separate NCCL and non-NCCL ops by introducing anotherNCCL_Collective op. We can skip this and see what others think.

@dengwxn
Copy link
Contributor

As we discussed offline, we should remove all theNCCL_* op nodes, instead we should create system-level DAG nodes doing NCCL read/write. We will refactor based on this.

Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Copy link
Contributor

@dengwxndengwxn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

First pass. Structure seems right. Will look into details later.

Copy link
Contributor

@stephanie-wangstephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I think this can be made simpler. Try to think about how you can achieve the following:

  • _NCCLSendNode/_NCCLRecvNode should have the same interface as _CollectiveOperation
  • If the above is done properly, I believe we can get rid of most of the parts that need to differentiate between send/recv/collective. I.e. there should be only onerequires_nccl flag instead of three, and there should only be on kind of DAG op node, aCOMPUTE node.

@rkooo567rkooo567 self-assigned thisNov 12, 2024
@stephanie-wangstephanie-wang self-assigned thisNov 12, 2024
@jcotant1jcotant1 added the coreIssues that should be addressed in Ray Core labelNov 15, 2024
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
AndyUBand others added20 commitsFebruary 13, 2025 15:20
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
…uture before sending across actorsSigned-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
…e P2P send/recv operationsSigned-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>

def __init__(self):
# Task indices in a compiled DAG. The indices are appended
# in topological order if there are dependencies among the tasks.
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Relying on this can be error-prone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

TODO: If an actor sends to itself, the current schedule is correct becausetask_idxs containSEND andRECV in order. The order is provided from the topological sort.

@@ -183,6 +185,10 @@ def send(self, buf: "torch.Tensor", peer_rank: int) -> None:
# TODO(rui): find a better approach
self._send_stream.synchronize()

import torch

buf.record_stream(torch.cuda.ExternalStream(self._send_stream.ptr))
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Maybe use something better thanrecord_stream.

"Sending the result of an asynchronous NCCL operation across actors. "
"This will block the CPU while waiting for the NCCL operation to finish."
)
value = value.wait(blocking=True)
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

To be improved: Sending GPU future across actors forces a blocking wait.

Copy link
ContributorAuthor

@AndyUBAndyUBFeb 18, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Related: Currently, overlapping is done by immediately launching NCCL operations when their inputs are ready. The downstream task that reads the NCCL operation's output will get a GPU future and wait on it.

Limitations:

  1. Futures can't be sent across actors, in which case waiting can result in worse performance than executing the DAG withoverlap_gpu_communication=False.
  2. Even when futures are only read by the same actor locally, if the actor does multiple NCCL recv operations, they will be scheduled as the first operations in the execution schedule. Therecv_stream.synchronize() call in_NcclGroup.recv blocks the CPU on the second recv operation. (TODO: Determine if the synchronization is still needed.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

  1. We observe that when futures are sent across actors, overlap is slightly slower than non-overlap version. This is because: (1) There is no benefit from overlapping when futures are sent across actors; (2) The exact stream sync is different in overlap/non-overlap.

  2. Do we need different schedules for overlap/non-overlap? We change schedules and move P2P early in overlap.

  3. Do we actually need to sync before launching another NCCL read/write?

  4. When an actor have multiple NCCL reads, these NCCL reads are not perfectly overlapped since there is sync between each NCCL read.

Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
"Sending the result of an asynchronous NCCL operation across actors. "
"This will block the CPU while waiting for the NCCL operation to finish."
)
value = value.wait(blocking=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

  1. We observe that when futures are sent across actors, overlap is slightly slower than non-overlap version. This is because: (1) There is no benefit from overlapping when futures are sent across actors; (2) The exact stream sync is different in overlap/non-overlap.

  2. Do we need different schedules for overlap/non-overlap? We change schedules and move P2P early in overlap.

  3. Do we actually need to sync before launching another NCCL read/write?

  4. When an actor have multiple NCCL reads, these NCCL reads are not perfectly overlapped since there is sync between each NCCL read.

AndyUBand others added5 commitsFebruary 20, 2025 21:52
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Copy link
Contributor

@stephanie-wangstephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Just reviewed compiled_dag_node for now, will review the rest soon.

self.input_reader: Optional[ReaderInterface] = None
# NCCL P2P recv uses the NCCL channel instead of the input reader.
if not self.requires_nccl_read:
self.input_reader = SynchronousReader(self.input_channels)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Instead of adding this extra logic here, it would be better if we can make sureself.input_channels is always empty forself.requires_nccl_read when reaching this line. Same forself.output_channels andself.requires_nccl_write.

"""
from ray.experimental.channel.common import ChannelContext

ctx = ChannelContext.get_current().serialization_context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Nice fix here! Can you separate out the fix for CUDA event destruction into a separate PR? This will be faster to merge and less likely to get reverted.


def _read(self, overlap_gpu_communication: bool) -> bool:
def exec_operation_with_contexts(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This naming is slightly confusing because it sounds likecontext should be passed as an argument to this function. Maybe justexec_operation and_exec_operation_without_context for the inner method.

with _device_context_manager():
with self._send_stream:
return self._write()
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This looks much better!

other_args_to_resolve={
PARENT_CLASS_NODE_KEY: recv_actor_handle,
P2P_OPERATION_KEY: send_node.nccl_op,
BIND_INDEX_KEY: node._get_bind_index(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Does this mean that the send and recv nodes will get the same bind index as the original DAG node? Will it be a problem that multiple nodes have the same bind index?

executable_tasks.sort(
# If the bind index is the same, there are P2P send/recv tasks.
# The order is determined as follows:
# 1. P2P recv tasks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I think this is quite brittle and difficult to understand. It would be better if we can think of a way to make all of the bind indices for a given actor unique.

Either we need to compute the new bind indices when creating the CompiledDAG or we need a way to create the NCCL nodes while the user is creating the initial DAG.

Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@stephanie-wangstephanie-wangstephanie-wang requested changes

@dengwxndengwxndengwxn left review comments

Requested changes must be addressed to merge this pull request.

Labels
coreIssues that should be addressed in Ray Coregoadd ONLY when ready to merge, run all tests
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

5 participants
@AndyUB@dengwxn@stephanie-wang@rkooo567@jcotant1

[8]ページ先頭

©2009-2025 Movatter.jp