Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

[Core][Compiled Graph] Execute DAG on Actor's Main Thread#48608

Open
xslingcn wants to merge4 commits intoray-project:master
base:master
Choose a base branch
Loading
fromxslingcn:dag-on-actor-thread

Conversation

xslingcn
Copy link

@xslingcnxslingcn commentedNov 6, 2024
edited
Loading

Why are these changes needed?

As mentioned in#46336, the current implementation executes all aDAGs in a background concurrency group_ray_system, and actors run in their own default concurrency group. This discrepancy blocks the DAG from accessing thread-local states within actors that were initialized prior to the DAG execution. For example, consider the following code:

importrayimportthreadingfromray.dagimportInputNode@ray.remoteclassMyActor:def__init__(self):# data local to actor default executor threadself.local_data=threading.local()self.local_data.seed=42defcompute(self,value):returnvalue+self.local_data.seedactor=MyActor.remote()withInputNode()asinp:dag=actor.compute.bind(inp)# DAG running in _ray_syetem group, no access to actor.local_datacompiled_dag=dag.experimental_compile()print(ray.get(compiled_dag.execute(10)))

which will raise the error:

Traceback (most recent call last):...ray.exceptions.RayTaskError(AttributeError):ray::MyActor.__ray_call__() (pid=3136275, ip=172.0.0.1)                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  File"/Code/ray/run_thread_local_dag.py", line 15,in computereturn value + self.local_data.seed                   ^^^^^^^^^^^^^^^^^^^^AttributeError:'_thread._local' object has no attribute'seed'

This PR makes the DAG execution loop to run on the actor's default executor (ref), which ensures the DAG running on the same thread as the actor. Now the example provided above should produce the expected output.

Thethread_name API discussed in the original issue will be implemented in a seperate PR later.

Related issue number

#46336

Checks

  • I've signed off every commit(by using the -s flag, i.e.,git commit -s) in this PR.
  • I've runscripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed forhttps://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it indoc/source/tune/api/ under the
      corresponding.rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures athttps://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

yzh119 reacted with thumbs up emoji
Signed-off-by: xsling <me@xsl.sh>
@stephanie-wang
Copy link
Contributor

Thanks,@xslingcn ! Can you add a test similar to the one that you have in the PR description? Also, I think there may be some issues since there are sometimes background system tasks that need to run on a different thread from the execution one, and now they may be blocked. I'll unblock the buildkite/premerge tests to run so you can see the failures.

@stephanie-wangstephanie-wang self-assigned thisNov 12, 2024
@stephanie-wangstephanie-wang added the goadd ONLY when ready to merge, run all tests labelNov 12, 2024
@rkooo567rkooo567 self-assigned thisNov 12, 2024
@jcotant1jcotant1 added coreIssues that should be addressed in Ray Core compiled-graphs labelsNov 15, 2024
@xslingcn
Copy link
Author

xslingcn commentedNov 16, 2024
edited
Loading

Thanks@stephanie-wang for reviewing this!

Can you add a test similar to the one that you have in the PR description?

Done with8f6be8f .

Also, I think there may be some issues since there are sometimes background system tasks that need to run on a different thread from the execution one, and now they may be blocked.

It seems that ray now fails to teardown the dags, even if I moved the cancelation tasks to_ray_system concurrency group. Any hints on how to debug what tasks are blocking in the thread?

diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py@@ -1940,7 +1938,9 @@ class CompiledDAG:                     logger.info(f"Cancelling compiled worker on actor: {actor}")                 # Cancel all actor loops in parallel.                 cancel_refs = [-                    actor.__ray_call__.remote(do_cancel_executable_tasks, tasks)+                    actor.__ray_call__.options(+                        concurrency_group="_ray_system"+                    ).remote(do_cancel_executable_tasks, tasks)                     for actor, tasks in outer.actor_to_executable_tasks.items()                 ]                 for cancel_ref in cancel_refs:
$ pytest -v -s python/ray/dag/tests/experimental/test_multi_node_dag.py...2024-11-16 10:34:10,738 INFO compiled_dag_node.py:1933 -- Tearing down compiled DAG2024-11-16 10:34:10,738 INFO compiled_dag_node.py:1938 -- Cancelling compiled worker on actor: Actor(Actor, 95b3104e737fd143bb49c3a001000000)2024-11-16 10:34:10,739 INFO compiled_dag_node.py:1938 -- Cancelling compiled worker on actor: Actor(Actor, f8bf38b6e3680bccb52e420a01000000)2024-11-16 10:34:10,739 INFO compiled_dag_node.py:1938 -- Cancelling compiled worker on actor: Actor(Actor, 5080863784c80043a0018f8201000000)2024-11-16 10:34:40,745 ERROR compiled_dag_node.py:1954 -- Error cancelling worker taskTraceback (most recent call last):  File"/root/ray/python/ray/dag/compiled_dag_node.py", line 1948,in teardown    ray.get(cancel_ref, timeout=30)  File"/root/ray/python/ray/_private/auto_init_hook.py", line 21,in auto_init_wrapperreturn fn(*args,**kwargs)           ^^^^^^^^^^^^^^^^^^^  File"/root/ray/python/ray/_private/client_mode_hook.py", line 103,in wrapperreturn func(*args,**kwargs)           ^^^^^^^^^^^^^^^^^^^^^  File"/root/ray/python/ray/_private/worker.py", line 2755,in get    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  File"/root/ray/python/ray/_private/worker.py", line 882,in get_objects    ] = self.core_worker.get_objects(        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  File"python/ray/_raylet.pyx", line 3486,in ray._raylet.CoreWorker.get_objects    check_status(op_status)  File"python/ray/includes/common.pxi", line 81,in ray._raylet.check_status    raise GetTimeoutError(message)ray.exceptions.GetTimeoutError: Get timed out: some object(s) not ready.

@stephanie-wang
Copy link
Contributor

Can you tryray stack command to get a stacktrace of each actor?

@staleStale
Copy link

stalebot commentedJan 31, 2025

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.

  • If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@stalestalebot added the staleThe issue is stale. It will be closed within 7 days unless there are further conversation labelJan 31, 2025
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers
No reviews
Labels
compiled-graphscoreIssues that should be addressed in Ray Coregoadd ONLY when ready to merge, run all testsstaleThe issue is stale. It will be closed within 7 days unless there are further conversation
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

4 participants
@xslingcn@stephanie-wang@rkooo567@jcotant1

[8]ページ先頭

©2009-2025 Movatter.jp