- Notifications
You must be signed in to change notification settings - Fork6.6k
[core] Threaded actors get stuck forever if they receive two exit signals#51582
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
This is a blocker of#51058. |
src/ray/core_worker/core_worker.cc Outdated
@@ -1167,6 +1160,13 @@ void CoreWorker::Exit( | |||
const rpc::WorkerExitType exit_type, | |||
const std::string &detail, | |||
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes) { | |||
if (is_shutdown_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
SinceShutdown
is only called in theshutdown
callback withinExit
, promising to executeExit
at most once also ensures thatShutdown
is executed at most once.
MortalHappinessMar 22, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I don't think this is correct.CoreWorker::Shutdown
is also called byCoreWorkerProcessImpl::ShutdownDriver
, which is called inCoreWorker.shutdown_driver
in_raylet.pyx
.
ray/src/ray/core_worker/core_worker_process.cc
Lines 337 to 349 inc6639d2
voidCoreWorkerProcessImpl::ShutdownDriver() { | |
RAY_CHECK(options_.worker_type == WorkerType::DRIVER) | |
<<"The `Shutdown` interface is for driver only."; | |
auto global_worker =GetCoreWorker(); | |
RAY_CHECK(global_worker); | |
global_worker->Disconnect(/*exit_type*/ rpc::WorkerExitType::INTENDED_USER_EXIT, | |
/*exit_detail*/"Shutdown by ray.shutdown()."); | |
global_worker->Shutdown(); | |
{ | |
auto write_locked = core_worker_.LockForWrite(); | |
write_locked.Get().reset(); | |
} | |
} |
Lines 3047 to 3055 inc6639d2
defshutdown_driver(self): | |
# If it's a worker, the core worker process should have been | |
# shutdown. So we can't call | |
# `CCoreWorkerProcess.GetCoreWorker().GetWorkerType()` here. | |
# Instead, we use the cached `is_driver` flag to test if it's a | |
# driver. | |
assertself.is_driver | |
with nogil: | |
CCoreWorkerProcess.Shutdown() |
MortalHappinessMar 22, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Should also add a test to check the case thatray.shutdown
is called.
kevin85421Mar 22, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Good catch! I only checked the call incore_worker.cc
. I'm surprised there's another call toShutdown
outside ofcore_worker.cc
. Maybe we should consider unifying the code paths for both the core worker driver and the worker shutdown process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I will use another flag instead of reusingis_shutdown_
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
In the future, is there a way that we can better unify this shutdown sequence or is there a reason why we need separate public interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I don't actually delve deeply into it, but I fully agree that we should have a single public function to terminate the core worker, whether it is a driver or a worker as I said#51582 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
We can leaveExit
as the only one public interface to gracefully terminate a core worker process, and onlyExit
can callShutdown
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you take it as a follow-up to investigate and fix it if it isn't too hard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you take it as a follow-up to investigate and fix it if it isn't too hard?
Sure! Create an issue to track the progress:#51642.
ping when ready for review |
what's the testing plan for this change? |
Sorry, I initially thought it would be tested in#51058. Let me add a test in this PR. |
Uh oh!
There was an error while loading.Please reload this page.
Added a test. |
please help review@MortalHappiness //@dayshah |
MortalHappiness commentedMar 22, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Not sure whether the following reproduciton script addresses the same issue as this one. importtimeimportrayray.init(address="auto")@ray.remote(concurrency_groups={"io":1})classTestActor:defexit(self):ray.actor.exit_actor()actors= [TestActor.remote()for_inrange(50)]ray.get([a.__ray_ready__.remote()forainactors])ray.wait([a.exit.remote()forainactors],timeout=10.0)print("Sleeping")time.sleep(3600) Run this script until the I'm also wondering where the actor received the exit signal twice? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
LGTM pending comment
src/ray/core_worker/core_worker.h Outdated
/// Whether the `Exit` function has been called, to avoid executing the exit | ||
/// process multiple times. | ||
std::atomic<bool> is_exit_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
drop a comment about the relationship between this andis_shutdown_
in general, in cases where we have confusing duplicate behaviors we should document it as clearly as possible to help future readers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Added
CI passes cc@edoakes would you mind merging this PR? The auto-merge was canceled when I sync with the master branch. |
6b805b5
intomasterUh oh!
There was an error while loading.Please reload this page.
"signal and is shutting down."; | ||
return; | ||
} | ||
is_exit_ = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
(not familiar with the code, just question for my curiosity)
is it possible multiple threads checkingis_exit_
and all of them pass through?
If it's not supposed to be called in multi-threaded env, we don't need atomic variable here.
In a word, reading through the code, I think you should use CAS
Ref:https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
…nals (ray-project#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).### Reproduction```shray start --head --include-dashboard=True --num-cpus=1#https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223python3 test.py# Some actors are still ALIVE. If all actors are DEAD, increase the number of actors.ray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>
…nals (ray-project#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).### Reproduction```shray start --head --include-dashboard=True --num-cpus=1#https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223python3 test.py# Some actors are still ALIVE. If all actors are DEAD, increase the number of actors.ray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>
…nals (ray-project#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).### Reproduction```shray start --head --include-dashboard=True --num-cpus=1#https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223python3 test.py# Some actors are still ALIVE. If all actors are DEAD, increase the number of actors.ray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>Signed-off-by: Dhakshin Suriakannu <d_suriakannu@apple.com>
…nals (ray-project#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).### Reproduction```shray start --head --include-dashboard=True --num-cpus=1#https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223python3 test.py# Some actors are still ALIVE. If all actors are DEAD, increase the number of actors.ray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>
…nals (#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).### Reproduction```shray start --head --include-dashboard=True --num-cpus=1#https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223python3 test.py# Some actors are still ALIVE. If all actors are DEAD, increase the number of actors.ray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
…nals (ray-project#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).```shray start --head --include-dashboard=True --num-cpus=1python3 test.pyray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>
Uh oh!
There was an error while loading.Please reload this page.
Why are these changes needed?
If a threaded actor receives exit signals twice as shown in the above screenshot, it will executetask_receiver_->Stop() twice. However, the second call to
task_receiver_->Stop()
will get stuck forever when executing thereleaser.Reproduction
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.