- 
                Notifications
    You must be signed in to change notification settings 
- Fork 6.8k
[Core] Exit the Core Worker Early Error Received from Plasma Store #53679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Mengjin Yan <[email protected]>
Signed-off-by: Mengjin Yan <[email protected]>
Signed-off-by: Mengjin Yan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR addresses an issue during node shutdown where a broken pipe error from the plasma store causes unintended task failures by triggering an early exit of the core worker. Key changes include adding new overloads for StoreConn to identify core worker connections, updating WriteBuffer and ReadBuffer to trigger a quick shutdown on error, and propagating the core worker flag to PlasmaClient.
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description | 
|---|---|
| src/ray/object_manager/plasma/connection.h | Added core worker support and new buffer methods with error checking. | 
| src/ray/object_manager/plasma/connection.cc | Implemented new methods that call ShutdownWorkerIfLocalRayletDisconnected. | 
| src/ray/object_manager/plasma/client.h and client.cc | Propagated the core worker flag through PlasmaClient and its implementation. | 
| src/ray/core_worker/store_provider/plasma_store_provider.cc | Updated instantiation of PlasmaClient to mark it as in a core worker. | 
| src/ray/common/client_connection.h | Changed WriteBuffer and ReadBuffer to be virtual. | 
| "local raylet is dead. Terminate the process. Status: " | ||
| << status; | ||
| ray::QuickExit(); | ||
| RAY_LOG(FATAL) << "Unreachable."; | 
    
      
    
      Copilot
AI
    
    
    
      Jun 11, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log statement after ray::QuickExit is unreachable; consider removing it to improve code clarity.
| RAY_LOG(FATAL) << "Unreachable."; | 
| @israbbani @codope please review | 
| 
 What is the expected behavior for the job? Why doesn't the owner retry the task on a different node since this is a system error and not an application error.? | 
| 
 You are almost right. The expected behavior is to let the job to throw a system exception and let the owner to retry. The problem is that, now because the  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is urgent, I think this approach is okay as a very short-term fix. There are a few problems that I think need to be addressed in followups:
- The control plane and data plane should not be mixed in deciding when shutdown happens. It should be the control plane's responsibility to handle lifecycle. In this case, we should return a retriable exception (i.e. a system exception) to the owner of the task.
- The is_in_core_workerboolean breaks the class design. We should remove it once we remove all usage of the plasma client in the raylet.
| std::unordered_set<ObjectID> deletion_cache_; | ||
| /// A mutex which protects this class. | ||
| std::recursive_mutex client_mutex_; | ||
| /// Whether the client is in a core worker. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the comment adds anything because the name is pretty self-describing.
| /// A mutex which protects this class. | ||
| std::recursive_mutex client_mutex_; | ||
| /// Whether the client is in a core worker. | ||
| bool is_in_core_worker_; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the use of the boolean here. We should create a follow-up to do either
- subclass the client to have one for core_worker and one for raylet
- or better yet delete the plasma client from the raylet b/c they're in the same process
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should rename this to something that describes the feature better e.g. exit_on_connection_failure_. That way we're creating a weird feature, but not breaking encapsulation quite as explicitly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow, really good idea Ibrahim!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(dumb joke, we were discussing offline)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree sense. This is added to make sure the impact of the behavior only limited to the core work quick exit before we have a long term fix. Make sense to change it to a more general name and I'll add a comment explain currently how this feature is turned on.
| void StoreConn::ShutdownWorkerIfLocalRayletDisconnected(const ray::Status &status) { | ||
| // Here we don't explicitly check if the local raylet is dead, for the reasons that: | ||
| // 1. If the connection is from a core worker, the local raylet should be on the other | ||
| // side of the connection. | ||
| // 2. On the raylet side, we never proactivately close the plasma store connection | ||
| // even during shutdown. So any error from the raylet side should be a sign of raylet | ||
| // death. | ||
| if (is_in_core_worker_ && !status.ok()) { | ||
| RAY_LOG(WARNING) << "The connection to the plasma store is failed because the " | ||
| "local raylet is dead. Terminate the process. Status: " | ||
| << status; | ||
| ray::QuickExit(); | ||
| RAY_LOG(FATAL) << "Unreachable."; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to leave this in? If so, we should make the error message more clear e.g. telling the user if they see this exception, it's a bug and they should file an issue.
| // 2. On the raylet side, we never proactivately close the plasma store connection | ||
| // even during shutdown. So any error from the raylet side should be a sign of raylet | ||
| // death. | ||
| if (is_in_core_worker_ && !status.ok()) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would put the more likely to evaluate as false condition first:
  if (!status.ok() && is_in_core_worker_) | ray::Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer) override; | ||
|  | ||
| private: | ||
| // Whether the client is in a core worker. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment isn't adding anything
| It's also probably worth it to add a regression test that fails before this change and passes after this change based on your repro script. | 
| 
 Totally agree. Will create followup tickets for fixing the issues. | 
Co-authored-by: Ibrahim Rabbani <[email protected]> Signed-off-by: Mengjin Yan <[email protected]>
Signed-off-by: Mengjin Yan <[email protected]>
Signed-off-by: Mengjin Yan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few nits around naming and comments. Please fix and 🚢
| // TODO(myan): For better error handling, we should: (1) In the mid-term, evaluate if | ||
| // we should turn it on for the plasma client in other processes. (2) In the | ||
| // long-term, consolidate the shutdown path between core worker and raylet to make the | ||
| // shutdown procedure cleaner. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my discussion with @edoakes, I think we need a short-to-medium-term followup to handle this outside of the PlasmaClient inside the CoreWorker calling code and propagating the right exception to the user. Can you update the comment and create a jira issue with the label oncall-followup for this please?
| // Shutdown the current process if the passed in status is not OK and the client is | ||
| // configured to exit on failure. | ||
| // @param status: The status to check. | ||
| void ShutdownWorkerIfErrorStatus(const ray::Status &status); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove Worker from this to make this not look like we're special-casing the PlasmaClient for CoreWorker? Maybe ShutdownProcessIfConnectionError
Co-authored-by: Ibrahim Rabbani <[email protected]> Signed-off-by: Mengjin Yan <[email protected]>
Co-authored-by: Ibrahim Rabbani <[email protected]> Signed-off-by: Mengjin Yan <[email protected]>
Co-authored-by: Ibrahim Rabbani <[email protected]> Signed-off-by: Mengjin Yan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good for the short-term fix. As already pointed out in #53679 (review), we need a better separation of concerns. To followup, I have left a suggestion if that makes sense.
| std::recursive_mutex client_mutex_; | ||
| /// Whether the current process should exit when read or write to the connection fails. | ||
| /// It should only be turned on when the plasma client is in a core worker. | ||
| bool exit_on_connection_failure_; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of modifying low-level PlasmaClient, why can't we reuse existing check_signals_ mechanism in CoreWorkerPlasmaStoreProvider, which is the abstraction layer between the Core Worker and the plasma store?
The get and wait paths in CoreWorkerPlasmaStoreProvider proactively check for interrupts via check_signals_. We can do the same for other Plasma operations.
CoreWorker::ExitIfParentRayletDies() already checks periodically if the raylet process is alive. Then let the CoreWorker take a call what to do based on error propagated from CoreWorkerPlasmaStoreProvider.
Signed-off-by: Mengjin Yan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚢
…53679) What's the issue: - During node shutdown, when the raylet is killed before its core workers, and the tasks on the core workers read/write objects from the plasma store, a broken pipe error will be obtained and the tasks will fail due to the ray task error with reason broken pipe and thus the whole job failed. - This is not the desired behavior because the task failure due to node shutdown should be seen as the system failure and the core worker shouldn't continue executing tasks when the raylet is down. The PR made the change to mitigate the above issue: - In the plasmas store client, add the logic to do core worker quick exit when error happens during read/write buffer and the plasma store client is on the core worker side Test the logic manually to verify the behavior: - With the following test code: ``` ray.init() @ray.remote(max_retries=2) def test_task(obj_ref): time.sleep(1) raylet_pid = int(os.environ["RAY_RAYLET_PID"]) os.kill(raylet_pid, signal.SIGKILL) ray.put(obj_ref) a = ray.put([0] * 250000) ray.get(test_task.remote(a)) ``` - Without the change: ``` ray.exceptions.RayTaskError(OSError): ray::test_task() (pid=30681, ip=127.0.0.1) File "/Users/myan/ray-core-quickstart/test-tasks/test-tasks.py", line 18, in test_task ray.get(test_ref) File "python/ray/includes/common.pxi", line 93, in ray._raylet.check_status raise IOError(message) OSError: Failed to read data from the socket: End of file ``` - With the change in the PR: ``` ray.exceptions.LocalRayletDiedError: The task's local raylet died. Check raylet.out for more information. ``` --------- Signed-off-by: Mengjin Yan <[email protected]> Co-authored-by: Ibrahim Rabbani <[email protected]> Signed-off-by: elliot-barn <[email protected]>
…53679) What's the issue: - During node shutdown, when the raylet is killed before its core workers, and the tasks on the core workers read/write objects from the plasma store, a broken pipe error will be obtained and the tasks will fail due to the ray task error with reason broken pipe and thus the whole job failed. - This is not the desired behavior because the task failure due to node shutdown should be seen as the system failure and the core worker shouldn't continue executing tasks when the raylet is down. The PR made the change to mitigate the above issue: - In the plasmas store client, add the logic to do core worker quick exit when error happens during read/write buffer and the plasma store client is on the core worker side Test the logic manually to verify the behavior: - With the following test code: ``` ray.init() @ray.remote(max_retries=2) def test_task(obj_ref): time.sleep(1) raylet_pid = int(os.environ["RAY_RAYLET_PID"]) os.kill(raylet_pid, signal.SIGKILL) ray.put(obj_ref) a = ray.put([0] * 250000) ray.get(test_task.remote(a)) ``` - Without the change: ``` ray.exceptions.RayTaskError(OSError): ray::test_task() (pid=30681, ip=127.0.0.1) File "/Users/myan/ray-core-quickstart/test-tasks/test-tasks.py", line 18, in test_task ray.get(test_ref) File "python/ray/includes/common.pxi", line 93, in ray._raylet.check_status raise IOError(message) OSError: Failed to read data from the socket: End of file ``` - With the change in the PR: ``` ray.exceptions.LocalRayletDiedError: The task's local raylet died. Check raylet.out for more information. ``` --------- Signed-off-by: Mengjin Yan <[email protected]> Co-authored-by: Ibrahim Rabbani <[email protected]>
…53679) What's the issue: - During node shutdown, when the raylet is killed before its core workers, and the tasks on the core workers read/write objects from the plasma store, a broken pipe error will be obtained and the tasks will fail due to the ray task error with reason broken pipe and thus the whole job failed. - This is not the desired behavior because the task failure due to node shutdown should be seen as the system failure and the core worker shouldn't continue executing tasks when the raylet is down. The PR made the change to mitigate the above issue: - In the plasmas store client, add the logic to do core worker quick exit when error happens during read/write buffer and the plasma store client is on the core worker side Test the logic manually to verify the behavior: - With the following test code: ``` ray.init() @ray.remote(max_retries=2) def test_task(obj_ref): time.sleep(1) raylet_pid = int(os.environ["RAY_RAYLET_PID"]) os.kill(raylet_pid, signal.SIGKILL) ray.put(obj_ref) a = ray.put([0] * 250000) ray.get(test_task.remote(a)) ``` - Without the change: ``` ray.exceptions.RayTaskError(OSError): ray::test_task() (pid=30681, ip=127.0.0.1) File "/Users/myan/ray-core-quickstart/test-tasks/test-tasks.py", line 18, in test_task ray.get(test_ref) File "python/ray/includes/common.pxi", line 93, in ray._raylet.check_status raise IOError(message) OSError: Failed to read data from the socket: End of file ``` - With the change in the PR: ``` ray.exceptions.LocalRayletDiedError: The task's local raylet died. Check raylet.out for more information. ``` --------- Signed-off-by: Mengjin Yan <[email protected]> Co-authored-by: Ibrahim Rabbani <[email protected]> Signed-off-by: elliot-barn <[email protected]>
Why are these changes needed?
What's the issue:
The PR made the change to mitigate the above issue:
Test the logic manually to verify the behavior:
Related issue number
N/A
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.