- Notifications
You must be signed in to change notification settings - Fork6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Dag bind order execution fix#48603
base:master
Are you sure you want to change the base?
Dag bind order execution fix#48603
Conversation
… execution order. Added unit tests for fix. See issueray-project#47159Signed-off-by: Cade Dillon <42156426+cadedillon@users.noreply.github.com>
Signed-off-by: Cade Dillon <42156426+cadedillon@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Couldn't think of a way that doesn't require substantial code change. I guess you can try:
- Modifying
DAGNode.execute
to dispatch actor tasks in bind index order. - Changing
ClassMethodNode._execute_impl
to respect bind index order when invoking the actor tasks.
1 seems easier to me. 2 might not be a good idea.
python/ray/dag/dag_node.py Outdated
"bind_index", float("inf") | ||
) | ||
if hasattr(child, "get_other_args_to_resolve") | ||
else float("inf"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This would reorder all bound args. I think the order of bound args should not be changed. For example,
deftest_keep_original_order(shared_ray_instance):@ray.remoteclassActor:deffoo(self,input_data):returninput_data*2defbar(self,input_data):returninput_data/2defbaz(self,*inputs):print(inputs)returnsum(inputs)a=Actor.remote()withInputNode()asinp:x=a.foo.bind(inp)y=a.bar.bind(inp)z=a.baz.bind(1,2,3,x,y,inp)dag=MultiOutputNode([y,x,z])ray.get(dag.execute(4))
z
should print 1, 2, 3, x, y, inp. But the args are reordered as: x, y, 1, 2, 3, inp.
dag = MultiOutputNode([y, x]) | ||
assert ray.get(dag.execute(4)) == [8, 2] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
x
would get 8 andy
would get 2. I would expect executing this dag gives me[2, 8]
as the result, same as the order specified inMultiOutputNode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Hi Andy I had a follow up question about this. I've reworked my solution and I've gotten the functions to execute in bind order without inherently reordering the arguments to a node. However, I've run against the design decision about the ordering in which results should be returned to the user.
withInputNode()asinp:x=a.foo.bind(inp)y=a.bar.bind(inp)z=a.baz.bind(1,2,3,y,inp,x)dag=MultiOutputNode([y,x,z])# Execute the DAG with a sample input and observe the orderprint("Starting DAG execution to test task order...")result=ray.get(dag.execute(4))print("DAG execution result:",result)
In the way I wrote my first attempt, when this executes result will get [8, 2, 20] because foo returns 8, bar returns 2 and baz returns 20. Is this fundamentally the wrong approach? Should we always return these values in the position that they were passed in? This makes the most intuitive sense to me from a user perspective, but if that's the case then I'm not sure how to write unit tests for the ordering of functions that are executing remotely? Thanks for your help!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yes, we should keep the same order that the values are passed in.
For unit tests, you can check that the order is as expected by modifying state internal to the actor, like keeping a list of tasks that were called.
…etween nodes to avoid reordering bound args.Signed-off-by: Cade Dillon <42156426+cadedillon@users.noreply.github.com>
Hi@stephanie-wang and@AndyUB, I have pushed an update to the fix which involves altering the flow of execution according to a topological sort rather than reordering arguments that are bound to a node. Hopefully this update behaves as expected. Thank you for your help with this! |
dep_graph.add_dependency(other_node, node) | ||
# Topologically sort the control dependencies for this layer of the dag | ||
ctrl_dependencies = dep_graph.topological_sort() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I believe the nodes here are only for the current DAG node's arguments, not the entire DAG? In that case, I'm not sure that topological sort of just the current args will necessarily work.
withInputNode()asinp:x1=a.foo.bind(inp)x2=a.bar.bind(inp)y1=b.baz.bind(x1)y2=c.baz.bind(x2)dag=MultiOutputNode([y2,y1])
In this case, y2 and y1 execute on different actors and they have the same bind index, so we may visit y2 then y1. Then since it does DFS, it should execute x2, y2, x1, y1. But we want to make sure to execute x1 before x2.
Topological sort can work, but you would need to modify how the recursion is done to not do DFS I believe. The simpler way is probably to add a control dependency during DAG declaration instead of execution time, by adding the previous DAG node submitted to the same actor to the nodes found byscanner.find_nodes
.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.
|
Why are these changes needed?
Tasks bound to a non-compiled DAG can currently execute in any order, which is inconsistent with the behavior of ray core and compiled DAGs. This can make debugging harder because the same DAG may behave differently after compilation.
This change enforces an execution order of tasks based on the order that they were bound to the DAG. In apply_recursive we add a conditional which checks the children of the current node for the existence of a bind index. Any child node having a bind index will trigger a sort of the _bounds_args by the bind index. Handles mixed child node types by deprioritizing nodes without a bind index over those that do.
Related issue number
Closes#47159
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.