- Notifications
You must be signed in to change notification settings - Fork6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
[core] Threaded actors get stuck forever if they receive two exit signals#51582
Conversation
This is a blocker of#51058. |
src/ray/core_worker/core_worker.cc Outdated
@@ -1167,6 +1160,13 @@ void CoreWorker::Exit( | |||
const rpc::WorkerExitType exit_type, | |||
const std::string &detail, | |||
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes) { | |||
if (is_shutdown_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
SinceShutdown
is only called in theshutdown
callback withinExit
, promising to executeExit
at most once also ensures thatShutdown
is executed at most once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I don't think this is correct.CoreWorker::Shutdown
is also called byCoreWorkerProcessImpl::ShutdownDriver
, which is called inCoreWorker.shutdown_driver
in_raylet.pyx
.
ray/src/ray/core_worker/core_worker_process.cc
Lines 337 to 349 inc6639d2
voidCoreWorkerProcessImpl::ShutdownDriver() { | |
RAY_CHECK(options_.worker_type == WorkerType::DRIVER) | |
<<"The `Shutdown` interface is for driver only."; | |
auto global_worker =GetCoreWorker(); | |
RAY_CHECK(global_worker); | |
global_worker->Disconnect(/*exit_type*/ rpc::WorkerExitType::INTENDED_USER_EXIT, | |
/*exit_detail*/"Shutdown by ray.shutdown()."); | |
global_worker->Shutdown(); | |
{ | |
auto write_locked = core_worker_.LockForWrite(); | |
write_locked.Get().reset(); | |
} | |
} |
Lines 3047 to 3055 inc6639d2
defshutdown_driver(self): | |
# If it's a worker, the core worker process should have been | |
# shutdown. So we can't call | |
# `CCoreWorkerProcess.GetCoreWorker().GetWorkerType()` here. | |
# Instead, we use the cached `is_driver` flag to test if it's a | |
# driver. | |
assertself.is_driver | |
with nogil: | |
CCoreWorkerProcess.Shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Should also add a test to check the case thatray.shutdown
is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Good catch! I only checked the call incore_worker.cc
. I'm surprised there's another call toShutdown
outside ofcore_worker.cc
. Maybe we should consider unifying the code paths for both the core worker driver and the worker shutdown process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I will use another flag instead of reusingis_shutdown_
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
In the future, is there a way that we can better unify this shutdown sequence or is there a reason why we need separate public interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I don't actually delve deeply into it, but I fully agree that we should have a single public function to terminate the core worker, whether it is a driver or a worker as I said#51582 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
We can leaveExit
as the only one public interface to gracefully terminate a core worker process, and onlyExit
can callShutdown
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you take it as a follow-up to investigate and fix it if it isn't too hard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you take it as a follow-up to investigate and fix it if it isn't too hard?
Sure! Create an issue to track the progress:#51642.
ping when ready for review |
what's the testing plan for this change? |
Sorry, I initially thought it would be tested in#51058. Let me add a test in this PR. |
Added a test. |
please help review@MortalHappiness //@dayshah |
Not sure whether the following reproduciton script addresses the same issue as this one. importtimeimportrayray.init(address="auto")@ray.remote(concurrency_groups={"io":1})classTestActor:defexit(self):ray.actor.exit_actor()actors= [TestActor.remote()for_inrange(50)]ray.get([a.__ray_ready__.remote()forainactors])ray.wait([a.exit.remote()forainactors],timeout=10.0)print("Sleeping")time.sleep(3600) Run this script until the I'm also wondering where the actor received the exit signal twice? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
LGTM pending comment
src/ray/core_worker/core_worker.h Outdated
/// Whether the `Exit` function has been called, to avoid executing the exit | ||
/// process multiple times. | ||
std::atomic<bool> is_exit_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
drop a comment about the relationship between this andis_shutdown_
in general, in cases where we have confusing duplicate behaviors we should document it as clearly as possible to help future readers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Added
CI passes cc@edoakes would you mind merging this PR? The auto-merge was canceled when I sync with the master branch. |
"signal and is shutting down."; | ||
return; | ||
} | ||
is_exit_ = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
(not familiar with the code, just question for my curiosity)
is it possible multiple threads checkingis_exit_
and all of them pass through?
If it's not supposed to be called in multi-threaded env, we don't need atomic variable here.
In a word, reading through the code, I think you should use CAS
Ref:https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
…nals (ray-project#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).### Reproduction```shray start --head --include-dashboard=True --num-cpus=1#https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223python3 test.py# Some actors are still ALIVE. If all actors are DEAD, increase the number of actors.ray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>
…nals (ray-project#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).### Reproduction```shray start --head --include-dashboard=True --num-cpus=1#https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223python3 test.py# Some actors are still ALIVE. If all actors are DEAD, increase the number of actors.ray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>
…nals (ray-project#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).### Reproduction```shray start --head --include-dashboard=True --num-cpus=1#https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223python3 test.py# Some actors are still ALIVE. If all actors are DEAD, increase the number of actors.ray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>Signed-off-by: Dhakshin Suriakannu <d_suriakannu@apple.com>
…nals (ray-project#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).### Reproduction```shray start --head --include-dashboard=True --num-cpus=1#https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223python3 test.py# Some actors are still ALIVE. If all actors are DEAD, increase the number of actors.ray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>
…nals (#51582)If a threaded actor receives exit signals twice as shown in the abovescreenshot, it will execute[task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224)twice. However, the second call to `task_receiver_->Stop()` will getstuck forever when executing the[releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135).### Reproduction```shray start --head --include-dashboard=True --num-cpus=1#https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223python3 test.py# Some actors are still ALIVE. If all actors are DEAD, increase the number of actors.ray list actors```---------Signed-off-by: kaihsun <kaihsun@anyscale.com>Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Why are these changes needed?
If a threaded actor receives exit signals twice as shown in the above screenshot, it will executetask_receiver_->Stop() twice. However, the second call to
task_receiver_->Stop()
will get stuck forever when executing thereleaser.Reproduction
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.