Skip to content

Conversation

@MengjinYan
Copy link
Contributor

@MengjinYan MengjinYan commented Jun 9, 2025

Why are these changes needed?

What's the issue:

  • During node shutdown, when the raylet is killed before its core workers, and the tasks on the core workers read/write objects from the plasma store, a broken pipe error will be obtained and the tasks will fail due to the ray task error with reason broken pipe and thus the whole job failed.
  • This is not the desired behavior because the task failure due to node shutdown should be seen as the system failure and the core worker shouldn't continue executing tasks when the raylet is down.

The PR made the change to mitigate the above issue:

  • In the plasmas store client, add the logic to do core worker quick exit when error happens during read/write buffer and the plasma store client is on the core worker side

Test the logic manually to verify the behavior:

  • With the following test code:
ray.init()

@ray.remote(max_retries=2)
def test_task(obj_ref):
    time.sleep(1)
    raylet_pid = int(os.environ["RAY_RAYLET_PID"])
    os.kill(raylet_pid, signal.SIGKILL)
    ray.put(obj_ref)

a = ray.put([0] * 250000)
ray.get(test_task.remote(a))
  • Without the change:
ray.exceptions.RayTaskError(OSError): ray::test_task() (pid=30681, ip=127.0.0.1)
  File "/Users/myan/ray-core-quickstart/test-tasks/test-tasks.py", line 18, in test_task
    ray.get(test_ref)
  File "python/ray/includes/common.pxi", line 93, in ray._raylet.check_status
    raise IOError(message)
OSError: Failed to read data from the socket: End of file
  • With the change in the PR:
ray.exceptions.LocalRayletDiedError: The task's local raylet died. Check raylet.out for more information.

Related issue number

N/A

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Mengjin Yan <[email protected]>
Signed-off-by: Mengjin Yan <[email protected]>
@MengjinYan MengjinYan changed the title [WIP] [Core] Exit the Core Worker Early When Broken Pipe Received from Plasma Store [Core] Exit the Core Worker Early Error Received from Plasma Store Jun 10, 2025
@MengjinYan MengjinYan marked this pull request as ready for review June 11, 2025 00:05
Copilot AI review requested due to automatic review settings June 11, 2025 00:05
@MengjinYan MengjinYan added the go add ONLY when ready to merge, run all tests label Jun 11, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR addresses an issue during node shutdown where a broken pipe error from the plasma store causes unintended task failures by triggering an early exit of the core worker. Key changes include adding new overloads for StoreConn to identify core worker connections, updating WriteBuffer and ReadBuffer to trigger a quick shutdown on error, and propagating the core worker flag to PlasmaClient.

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
src/ray/object_manager/plasma/connection.h Added core worker support and new buffer methods with error checking.
src/ray/object_manager/plasma/connection.cc Implemented new methods that call ShutdownWorkerIfLocalRayletDisconnected.
src/ray/object_manager/plasma/client.h and client.cc Propagated the core worker flag through PlasmaClient and its implementation.
src/ray/core_worker/store_provider/plasma_store_provider.cc Updated instantiation of PlasmaClient to mark it as in a core worker.
src/ray/common/client_connection.h Changed WriteBuffer and ReadBuffer to be virtual.

"local raylet is dead. Terminate the process. Status: "
<< status;
ray::QuickExit();
RAY_LOG(FATAL) << "Unreachable.";
Copy link

Copilot AI Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log statement after ray::QuickExit is unreachable; consider removing it to improve code clarity.

Suggested change
RAY_LOG(FATAL) << "Unreachable.";

Copilot uses AI. Check for mistakes.
@MengjinYan MengjinYan requested a review from edoakes June 11, 2025 17:13
@edoakes edoakes assigned israbbani and unassigned jjyao and edoakes Jun 11, 2025
@edoakes
Copy link
Collaborator

edoakes commented Jun 11, 2025

@israbbani @codope please review

@israbbani
Copy link
Contributor

israbbani commented Jun 11, 2025

tasks will fail due to the ray task error with reason broken pipe and thus the whole job failed.

What is the expected behavior for the job? Why doesn't the owner retry the task on a different node since this is a system error and not an application error.?

@MengjinYan
Copy link
Contributor Author

tasks will fail due to the ray task error with reason broken pipe and thus the whole job failed.

What is the expected behavior for the job? Why doesn't the owner retry the task on a different node since this is a system error and not an application error.?

You are almost right. The expected behavior is to let the job to throw a system exception and let the owner to retry. The problem is that, now because the get() function fails with broken pipe, it will make it an RayTaskError which the system will not retry by default.

Copy link
Contributor

@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is urgent, I think this approach is okay as a very short-term fix. There are a few problems that I think need to be addressed in followups:

  • The control plane and data plane should not be mixed in deciding when shutdown happens. It should be the control plane's responsibility to handle lifecycle. In this case, we should return a retriable exception (i.e. a system exception) to the owner of the task.
  • The is_in_core_worker boolean breaks the class design. We should remove it once we remove all usage of the plasma client in the raylet.

std::unordered_set<ObjectID> deletion_cache_;
/// A mutex which protects this class.
std::recursive_mutex client_mutex_;
/// Whether the client is in a core worker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the comment adds anything because the name is pretty self-describing.

/// A mutex which protects this class.
std::recursive_mutex client_mutex_;
/// Whether the client is in a core worker.
bool is_in_core_worker_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the use of the boolean here. We should create a follow-up to do either

  • subclass the client to have one for core_worker and one for raylet
  • or better yet delete the plasma client from the raylet b/c they're in the same process

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rename this to something that describes the feature better e.g. exit_on_connection_failure_. That way we're creating a weird feature, but not breaking encapsulation quite as explicitly

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, really good idea Ibrahim!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(dumb joke, we were discussing offline)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree sense. This is added to make sure the impact of the behavior only limited to the core work quick exit before we have a long term fix. Make sense to change it to a more general name and I'll add a comment explain currently how this feature is turned on.

Comment on lines 211 to 223
void StoreConn::ShutdownWorkerIfLocalRayletDisconnected(const ray::Status &status) {
// Here we don't explicitly check if the local raylet is dead, for the reasons that:
// 1. If the connection is from a core worker, the local raylet should be on the other
// side of the connection.
// 2. On the raylet side, we never proactivately close the plasma store connection
// even during shutdown. So any error from the raylet side should be a sign of raylet
// death.
if (is_in_core_worker_ && !status.ok()) {
RAY_LOG(WARNING) << "The connection to the plasma store is failed because the "
"local raylet is dead. Terminate the process. Status: "
<< status;
ray::QuickExit();
RAY_LOG(FATAL) << "Unreachable.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to leave this in? If so, we should make the error message more clear e.g. telling the user if they see this exception, it's a bug and they should file an issue.

// 2. On the raylet side, we never proactivately close the plasma store connection
// even during shutdown. So any error from the raylet side should be a sign of raylet
// death.
if (is_in_core_worker_ && !status.ok()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put the more likely to evaluate as false condition first:

  if (!status.ok() && is_in_core_worker_) 

ray::Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer) override;

private:
// Whether the client is in a core worker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment isn't adding anything

@israbbani
Copy link
Contributor

It's also probably worth it to add a regression test that fails before this change and passes after this change based on your repro script.

@MengjinYan
Copy link
Contributor Author

Since this is urgent, I think this approach is okay as a very short-term fix. There are a few problems that I think need to be addressed in followups:

  • The control plane and data plane should not be mixed in deciding when shutdown happens. It should be the control plane's responsibility to handle lifecycle. In this case, we should return a retriable exception (i.e. a system exception) to the owner of the task.
  • The is_in_core_worker boolean breaks the class design. We should remove it once we remove all usage of the plasma client in the raylet.

Totally agree. Will create followup tickets for fixing the issues.

MengjinYan and others added 4 commits June 11, 2025 12:52
Copy link
Contributor

@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few nits around naming and comments. Please fix and 🚢

Comment on lines 181 to 184
// TODO(myan): For better error handling, we should: (1) In the mid-term, evaluate if
// we should turn it on for the plasma client in other processes. (2) In the
// long-term, consolidate the shutdown path between core worker and raylet to make the
// shutdown procedure cleaner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my discussion with @edoakes, I think we need a short-to-medium-term followup to handle this outside of the PlasmaClient inside the CoreWorker calling code and propagating the right exception to the user. Can you update the comment and create a jira issue with the label oncall-followup for this please?

// Shutdown the current process if the passed in status is not OK and the client is
// configured to exit on failure.
// @param status: The status to check.
void ShutdownWorkerIfErrorStatus(const ray::Status &status);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove Worker from this to make this not look like we're special-casing the PlasmaClient for CoreWorker? Maybe ShutdownProcessIfConnectionError

MengjinYan and others added 3 commits June 11, 2025 18:19
Co-authored-by: Ibrahim Rabbani <[email protected]>
Signed-off-by: Mengjin Yan <[email protected]>
Co-authored-by: Ibrahim Rabbani <[email protected]>
Signed-off-by: Mengjin Yan <[email protected]>
Copy link
Contributor

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good for the short-term fix. As already pointed out in #53679 (review), we need a better separation of concerns. To followup, I have left a suggestion if that makes sense.

std::recursive_mutex client_mutex_;
/// Whether the current process should exit when read or write to the connection fails.
/// It should only be turned on when the plasma client is in a core worker.
bool exit_on_connection_failure_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of modifying low-level PlasmaClient, why can't we reuse existing check_signals_ mechanism in CoreWorkerPlasmaStoreProvider, which is the abstraction layer between the Core Worker and the plasma store?

The get and wait paths in CoreWorkerPlasmaStoreProvider proactively check for interrupts via check_signals_. We can do the same for other Plasma operations.

CoreWorker::ExitIfParentRayletDies() already checks periodically if the raylet process is alive. Then let the CoreWorker take a call what to do based on error propagated from CoreWorkerPlasmaStoreProvider.

Signed-off-by: Mengjin Yan <[email protected]>
Copy link
Contributor

@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚢

@MengjinYan
Copy link
Contributor Author

@edoakes @jjyao Can you help to take a look and merge this? Thanks!

@edoakes edoakes merged commit b35e67a into ray-project:master Jun 12, 2025
5 checks passed
elliot-barn pushed a commit that referenced this pull request Jun 18, 2025
…53679)

What's the issue:
- During node shutdown, when the raylet is killed before its core
workers, and the tasks on the core workers read/write objects from the
plasma store, a broken pipe error will be obtained and the tasks will
fail due to the ray task error with reason broken pipe and thus the
whole job failed.
- This is not the desired behavior because the task failure due to node
shutdown should be seen as the system failure and the core worker
shouldn't continue executing tasks when the raylet is down.

The PR made the change to mitigate the above issue:
- In the plasmas store client, add the logic to do core worker quick
exit when error happens during read/write buffer and the plasma store
client is on the core worker side

Test the logic manually to verify the behavior:
- With the following test code:
```
ray.init()

@ray.remote(max_retries=2)
def test_task(obj_ref):
    time.sleep(1)
    raylet_pid = int(os.environ["RAY_RAYLET_PID"])
    os.kill(raylet_pid, signal.SIGKILL)
    ray.put(obj_ref)

a = ray.put([0] * 250000)
ray.get(test_task.remote(a))
```
- Without the change:
```
ray.exceptions.RayTaskError(OSError): ray::test_task() (pid=30681, ip=127.0.0.1)
  File "/Users/myan/ray-core-quickstart/test-tasks/test-tasks.py", line 18, in test_task
    ray.get(test_ref)
  File "python/ray/includes/common.pxi", line 93, in ray._raylet.check_status
    raise IOError(message)
OSError: Failed to read data from the socket: End of file
```
- With the change in the PR:
```
ray.exceptions.LocalRayletDiedError: The task's local raylet died. Check raylet.out for more information.
```
---------

Signed-off-by: Mengjin Yan <[email protected]>
Co-authored-by: Ibrahim Rabbani <[email protected]>
Signed-off-by: elliot-barn <[email protected]>
bveeramani pushed a commit that referenced this pull request Jun 24, 2025
…53679)

What's the issue:
- During node shutdown, when the raylet is killed before its core
workers, and the tasks on the core workers read/write objects from the
plasma store, a broken pipe error will be obtained and the tasks will
fail due to the ray task error with reason broken pipe and thus the
whole job failed.
- This is not the desired behavior because the task failure due to node
shutdown should be seen as the system failure and the core worker
shouldn't continue executing tasks when the raylet is down.

The PR made the change to mitigate the above issue: 
- In the plasmas store client, add the logic to do core worker quick
exit when error happens during read/write buffer and the plasma store
client is on the core worker side

Test the logic manually to verify the behavior:
- With the following test code:
```
ray.init()

@ray.remote(max_retries=2)
def test_task(obj_ref):
    time.sleep(1)
    raylet_pid = int(os.environ["RAY_RAYLET_PID"])
    os.kill(raylet_pid, signal.SIGKILL)
    ray.put(obj_ref)

a = ray.put([0] * 250000)
ray.get(test_task.remote(a))
```
- Without the change:
```
ray.exceptions.RayTaskError(OSError): ray::test_task() (pid=30681, ip=127.0.0.1)
  File "/Users/myan/ray-core-quickstart/test-tasks/test-tasks.py", line 18, in test_task
    ray.get(test_ref)
  File "python/ray/includes/common.pxi", line 93, in ray._raylet.check_status
    raise IOError(message)
OSError: Failed to read data from the socket: End of file
```
- With the change in the PR:
```
ray.exceptions.LocalRayletDiedError: The task's local raylet died. Check raylet.out for more information.
```
---------

Signed-off-by: Mengjin Yan <[email protected]>
Co-authored-by: Ibrahim Rabbani <[email protected]>
elliot-barn pushed a commit that referenced this pull request Jul 2, 2025
…53679)

What's the issue:
- During node shutdown, when the raylet is killed before its core
workers, and the tasks on the core workers read/write objects from the
plasma store, a broken pipe error will be obtained and the tasks will
fail due to the ray task error with reason broken pipe and thus the
whole job failed.
- This is not the desired behavior because the task failure due to node
shutdown should be seen as the system failure and the core worker
shouldn't continue executing tasks when the raylet is down.

The PR made the change to mitigate the above issue:
- In the plasmas store client, add the logic to do core worker quick
exit when error happens during read/write buffer and the plasma store
client is on the core worker side

Test the logic manually to verify the behavior:
- With the following test code:
```
ray.init()

@ray.remote(max_retries=2)
def test_task(obj_ref):
    time.sleep(1)
    raylet_pid = int(os.environ["RAY_RAYLET_PID"])
    os.kill(raylet_pid, signal.SIGKILL)
    ray.put(obj_ref)

a = ray.put([0] * 250000)
ray.get(test_task.remote(a))
```
- Without the change:
```
ray.exceptions.RayTaskError(OSError): ray::test_task() (pid=30681, ip=127.0.0.1)
  File "/Users/myan/ray-core-quickstart/test-tasks/test-tasks.py", line 18, in test_task
    ray.get(test_ref)
  File "python/ray/includes/common.pxi", line 93, in ray._raylet.check_status
    raise IOError(message)
OSError: Failed to read data from the socket: End of file
```
- With the change in the PR:
```
ray.exceptions.LocalRayletDiedError: The task's local raylet died. Check raylet.out for more information.
```
---------

Signed-off-by: Mengjin Yan <[email protected]>
Co-authored-by: Ibrahim Rabbani <[email protected]>
Signed-off-by: elliot-barn <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants