Skip to content

Conversation

@codope
Copy link
Contributor

@codope codope commented Aug 7, 2025

Why are these changes needed?

Workers crash with a fatal RAY_CHECK failure when the plasma store connection is broken during shutdown, causing the following error:

RAY_CHECK failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe

Stacktrace:

core_worker.cc:720 C  Check failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe 
*** StackTrace Information ***
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x141789a) [0x7924dd2c689a] ray::operator<<()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x479) [0x7924dd2c9319] ray::RayLog::~RayLog()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x95cc8a) [0x7924dc80bc8a] ray::core::CoreWorker::CoreWorker()::{lambda()#13}::operator()()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager27MarkTaskReturnObjectsFailedERKNS_17TaskSpecificationENS_3rpc9ErrorTypeEPKNS5_12RayErrorInfoERKN4absl12lts_2023080213flat_hash_setINS_8ObjectIDENSB_13hash_internal4HashISD_EESt8equal_toISD_ESaISD_EEE+0x679) [0x7924dc868f29] ray::core::TaskManager::MarkTaskReturnObjectsFailed()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager15FailPendingTaskERKNS_6TaskIDENS_3rpc9ErrorTypeEPKNS_6StatusEPKNS5_12RayErrorInfoE+0x416) [0x7924dc86f186] ray::core::TaskManager::FailPendingTask()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x9a90e6) [0x7924dc8580e6] ray::core::NormalTaskSubmitter::RequestNewWorkerIfNeeded()::{lambda()#1}::operator()()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray3rpc14ClientCallImplINS0_23RequestWorkerLeaseReplyEE15OnReplyReceivedEv+0x68) [0x7924dc94aa48] ray::rpc::ClientCallImpl<>::OnReplyReceived()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFvvEZN3ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E9_M_invokeERKSt9_Any_data+0x15) [0x7924dc79e285] std::_Function_handler<>::_M_invoke()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd9b4c8) [0x7924dcc4a4c8] EventTracker::RecordExecution()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd4648e) [0x7924dcbf548e] std::_Function_handler<>::_M_invoke()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd46906) [0x7924dcbf5906] boost::asio::detail::completion_handler<>::do_complete()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f417b) [0x7924dd2a317b] boost::asio::detail::scheduler::do_run_one()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f5af9) [0x7924dd2a4af9] boost::asio::detail::scheduler::run()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f6202) [0x7924dd2a5202] boost::asio::io_context::run()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker12RunIOServiceEv+0x91) [0x7924dc793a61] ray::core::CoreWorker::RunIOService()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xcba0b0) [0x7924dcb690b0] thread_proxy
/lib/x86_64-linux-gnu/libc.so.6(+0x94ac3) [0x7924dde71ac3]
/lib/x86_64-linux-gnu/libc.so.6(+0x126850) [0x7924ddf03850]

Stack trace flow:

  1. Task lease request fails -> NormalTaskSubmitter::RequestNewWorkerIfNeeded() callback.
  2. Triggers TaskManager::FailPendingTask() -> MarkTaskReturnObjectsFailed().
  3. System attempts to store error objects in plasma via put_in_local_plasma_callback_.
  4. Plasma connection is broken (raylet/plasma store already shut down).
  5. RAY_CHECK_OK() in the callback causes fatal crash instead of graceful handling.

Root Cause:

This is a shutdown ordering race condition:

  1. Raylet shuts down first: The raylet stops its IO context (main_service_.stop()) which closes plasma store connections.
  2. Worker still processes callbacks: Core worker continues processing pending callbacks on separate threads.
  3. Broken connection: When the callback tries to store error objects in plasma, the connection is already closed.
  4. Fatal crash: The RAY_CHECK_OK() treats this as an unexpected error and crashes the process.

Fix:

  1. Shutdown-aware plasma operations
  • Add CoreWorker::IsShuttingDown() method to check shutdown state.
  • Skip plasma operations entirely when shutdown is in progress.
  • Prevents attempting operations on already-closed connections.
  1. Targeted error handling for connection failures
  • Replace blanket RAY_CHECK_OK() with specific error type checking.
  • Handle connection errors (Broken pipe, Connection reset, Bad file descriptor) as warnings during shutdown scenarios.
  • Maintain RAY_CHECK_OK() for other error types to catch real issues.

@codope codope requested a review from a team as a code owner August 7, 2025 15:57
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @codope, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical RAY_CHECK failure that occurs when Ray workers shut down, specifically due to a race condition where the plasma store connection breaks before all pending operations complete. The changes implement a more robust shutdown mechanism by allowing the core worker to detect its shutdown state and gracefully handle expected connection errors with the plasma store, preventing fatal crashes and improving system stability.

Highlights

  • Graceful Shutdown: Introduced a new IsShuttingDown() method to the CoreWorker class, allowing the system to detect if the worker is in the process of shutting down. This enables plasma store operations to be conditionally skipped during shutdown, preventing attempts to interact with an already-closed connection.
  • Robust Error Handling: Enhanced the error handling for PutInLocalPlasmaStore operations within the put_in_local_plasma_callback. Instead of a blanket RAY_CHECK_OK() that would crash on connection errors, specific IOError messages like 'Broken pipe', 'Connection reset', and 'Bad file descriptor' are now caught and logged as warnings during shutdown, ensuring graceful failure. Other error types still trigger a fatal RAY_CHECK.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@codope codope marked this pull request as draft August 7, 2025 15:58
@codope codope requested review from israbbani and jjyao August 7, 2025 15:58
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

An excellent fix for a tricky shutdown race condition. The changes are well-reasoned and the addition of IsShuttingDown() along with more granular error handling in the put_in_local_plasma_callback effectively addresses the RAY_CHECK failure. My main feedback is a suggestion to improve the maintainability of the error-checking logic. Overall, this is a solid contribution that improves the robustness of the worker shutdown process.

@codope codope force-pushed the ray-check_PutInLocalPlasmaStore branch from 5a1b4cc to e489752 Compare August 8, 2025 12:26
@codope codope marked this pull request as ready for review August 8, 2025 12:33
@codope codope added the go add ONLY when ready to merge, run all tests label Aug 8, 2025
Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We likely probably have other similar edge conditions with the Raylet IPC client and other areas where we call the plasma client.

Not really sure the best way to handle this. In an ideal world, we'd propagate the error all the way back to the callsite and abort whatever operation is going on (which should be ok if we're in the shutdown procedure).

@edoakes
Copy link
Collaborator

edoakes commented Aug 8, 2025

@dayshah @israbbani this is very much in the realm of the ongoing discussion about improving our status/error handling.

@codope codope force-pushed the ray-check_PutInLocalPlasmaStore branch 2 times, most recently from 2bf9a69 to 633b4f1 Compare August 10, 2025 11:26
@dayshah
Copy link
Contributor

dayshah commented Aug 10, 2025

@dayshah @israbbani this is very much in the realm of the ongoing discussion about improving our status/error handling.

@codope what other statuses can the put in plasma possibly return other than IOError? Can you just document that in the header of the function so we explicitly know what's being handled

also we're just gonna have refactor everything with the new statuses after we merge that anyway, i think the usage makes sense if that's the only error code that it could return

@codope
Copy link
Contributor Author

codope commented Aug 11, 2025

@dayshah @edoakes I've added documentation to PutInLocalPlasmaStore describing all expected statuses. The callsite now relies solely on the error-path check and tolerates only IOError when shutting down. If we later introduce more granular status codes, we can extend the switch at the callsite; for now, IOError is the only connection failure code we need to handle specially.
For the broader concern (other IPC/plasma sites), I propose a follow-up to audit and centralize shutdown-aware error handling, so we don’t duplicate logic across callsites.

@codope codope force-pushed the ray-check_PutInLocalPlasmaStore branch from 5d8f603 to 3552891 Compare August 11, 2025 10:38
Comment on lines +1503 to +1511
/// Return status semantics:
/// - Status::OK(): The object was created (or already existed) and bookkeeping was
/// updated. Note: an internal ObjectExists from the plasma provider is treated
/// as OK and does not surface here.
/// - Status::ObjectStoreFull(): The local plasma store is out of memory (or out of
/// disk when spilling). The error message contains context and a short memory
/// report.
/// - Status::IOError(): IPC/connection failures while talking to the plasma store
/// (e.g., broken pipe/connection reset during shutdown, store not reachable).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Comment on lines 430 to 437
// During shutdown, plasma store connections can be closed causing IOError.
// Tolerate only IOError during shutdown to avoid masking real errors.
if (status.IsIOError() && core_worker->IsShuttingDown()) {
// Double-check shutdown state - this handles the race where shutdown
// began after our first check but before the plasma operation.
RAY_LOG(WARNING) << "Failed to put error object " << object_id
<< " in plasma store during shutdown: " << status.ToString();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still feels a bit dangerous to me because the callsites will assume that the plasma operation succeeded, when it really silently failed. This might cause check failures in other places in the stack or other undefined behavior due to a broken invariant.

It's unclear to me that it is a significant improvement over what we have now, which is equally incorrect but at least a little bit more explicit.

I'm OK with it as a temporary solution, but only if we intend to follow up and propagate the status appropriately and handle it at the callsites.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, in the task manager this would likely mean catching the bad status and marking the relevant task as failed due to an unexpected error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already tightened the current behavior to only tolerate IOError during shutdown and made the log explicit that the object was not stored. That said, I totally agree with the concern. Maybe we can add a counter/metric plasma_put_failed_due_to_shutdown so we can alert/observe this path.

To fully address this, I'll follow up by changing the PutInLocalPlasmaCallback to return Status and propagate that through TaskManager. For failures that are not (shutdown + IOError), TaskManager will mark the task failed with an unexpected/system error. This ensures callsites don't assume success on a failed plasma put. I'll also audit other Raylet IPC/plasma callsites for the same pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, changing the PutInLocalPlasmaCallback to return Status and propagate that through TaskManager would probably be a small, contained refactor. So I can it up in this PR itself. And auditing other Raylet IPC/plasma callsites for the same pattern could be a followup. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done the small refactor, and tracking full audit in #55612

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels scary and kind of a gotcha deeper in the callstack. Would prefer just propagating the error instead of hiding it if shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i have revised this. Deep callback no longer swallows errors. It logs and returns the Status, and shutdown-specific handling is centralized in TaskManager policy.

@MengjinYan
Copy link
Contributor

We likely probably have other similar edge conditions with the Raylet IPC client and other areas where we call the plasma client.

Not really sure the best way to handle this. In an ideal world, we'd propagate the error all the way back to the callsite and abort whatever operation is going on (which should be ok if we're in the shutdown procedure).

The issue is very similar to the issue that we fixed before: #53679. Both in the sense the worker cannot talk to the raylet after raylet shutdown which caused the job to fail.

The followup item to that change is to pass the correct error back to the owner of the task so that the task lifecycle can be handled properly there. And I think this is probably something we should here for the mid term fix as well.

} else {
// For any other error, or IOError when not shutting down, this indicates
// a real problem that should crash the process.
RAY_CHECK_OK(status);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might derives focus a bit from the PR. But I'm wondering in the error case here, from task execution's perspective, should we crash & fail the job or we should throw a system error & let the task owner to handle the task life cycle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not crash the worker. We should return a non-OK Status, have TaskManager mark the task attempt as failed (or retry if allowed), and surface a system error to the task owner. The worker keeps running and the owner handles the lifecycle.

@codope codope force-pushed the ray-check_PutInLocalPlasmaStore branch from b9e8e3d to ad09c9a Compare August 14, 2025 11:26
@ray-gardener ray-gardener bot added the core Issues that should be addressed in Ray Core label Aug 15, 2025
Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How possible are these two plasma errors and should we just fail everything and shutdown if either of them happen anyways? If the object store is full and there's nowhere to spill / nothing to offload or if there's an IPC failure, I feel like current behavior is to just ungracefully crash and the new behavior here should just be to fail everything and shutdown gracefully if this operation fails?

There's no precedent for handling these things gracefully today and I don't think users actually ever hit ObjectStoreFull or IOError on an ipc unless it's shutting down so the goal here shouldn't be to try to handle it gracefully.

NodeID::FromBinary(request.worker_addr().node_id()),
/*store_in_plasma=*/store_in_plasma_ids.contains(object_id));
bool _direct_unused = false;
RAY_UNUSED(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the bad status is going completely unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed; all HandleTaskReturn results are checked and acted upon (fail/abort/log), and the RAY_UNUSED is removed.

const rpc::ReturnObject &return_object,
const NodeID &worker_node_id,
bool store_in_plasma,
bool *direct_return_out) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer StatusOr to out parameters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done; the method now returns StatusOr (bool indicates direct return) and has no out parameters.

/// as OK and does not surface here.
/// - Status::ObjectStoreFull(): The local plasma store is out of memory (or out of
/// disk when spilling). The error message contains context and a short memory
/// report.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is standard behavior here if we're out of spill (do we just crash everywhere)... how do we recover because this could result in all kinds of problems. Also this is guaranteed to try to spill right, because the usage is always that we're storing a primary copy through this code path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spilling is attempted automatically on puts through this path:

  • Puts use CreateAndSpillIfNeeded:

    if (status.IsObjectStoreFull()) {
    StatusOr<std::string> memory_usage = GetMemoryUsage();
    RAY_CHECK_OK(memory_usage.status()) << "Unable to communicate with the Plasma Store.";
    RAY_LOG(ERROR) << "Failed to put object " << object_id
    << " in object store because it "
    << "is full. Object size is " << data_size << " bytes.\n"
    << "Plasma store status:\n"
    << memory_usage.value() << "\n---\n"
    << "--- Tip: Use the `ray memory` command to list active objects "
    "in the cluster."
    << "\n---\n";
    // Replace the status with a more helpful error message.
    std::ostringstream message;
    message << "Failed to put object " << object_id << " in object store because it "
    << "is full. Object size is " << data_size << " bytes.";
    status = Status::ObjectStoreFull(message.str());
    } else if (status.IsObjectExists()) {

  • plasma-side queue returns transient/full while it triggers GC/spill, then falls back to disk, and only after grace/fallback fails does it surface OutOfDisk:

    auto spill_pending = spill_objects_callback_();
    if (spill_pending) {
    RAY_LOG(DEBUG) << "Reset grace period " << status << " " << spill_pending;
    oom_start_time_ns_ = -1;
    return Status::TransientObjectStoreFull("Waiting for objects to spill.");
    }
    if (now - oom_start_time_ns_ < grace_period_ns) {
    // We need a grace period since (1) global GC takes a bit of time to
    // kick in, and (2) there is a race between spilling finishing and space
    // actually freeing up in the object store.
    RAY_LOG(DEBUG) << "In grace period before fallback allocation / oom.";
    return Status::ObjectStoreFull("Waiting for grace period.");
    }
    // Trigger the fallback allocator.
    status = ProcessRequest(/*fallback_allocator=*/true, *request_it);

Comment on lines 430 to 437
// During shutdown, plasma store connections can be closed causing IOError.
// Tolerate only IOError during shutdown to avoid masking real errors.
if (status.IsIOError() && core_worker->IsShuttingDown()) {
// Double-check shutdown state - this handles the race where shutdown
// began after our first check but before the plasma operation.
RAY_LOG(WARNING) << "Failed to put error object " << object_id
<< " in plasma store during shutdown: " << status.ToString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels scary and kind of a gotcha deeper in the callstack. Would prefer just propagating the error instead of hiding it if shutdown.

&direct);
if (!s.ok()) {
RAY_LOG(WARNING).WithField(object_id)
<< "Failed to handle dynamic task return: " << s;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most of these we're not actually doing anything with the bad status. This is pretty scary because a failed put can lead to hanging cluster, which is one of the hardest things to debug. This is all already in the pretty complicated reconstruction or MarkTaskReturnObjectsFailed path. At least now the owner just crashes so it's obvious lol. We should at minimum fail the task if we're shutting down so we can put an error in the memory store to unblock things and ideally should retry an operation if we ended up with a full obj store or some other unexpected transient error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now propagate the real Status out of the callback, handle it centrally in TaskManager: bounded retries on TransientObjectStoreFull, immediate task failure on persistent errors (writing an error to the memory store to unblock waiters), and system-exit semantics for non-shutdown IPC errors.

failed_operations_.insert(object_id4);
}
ASSERT_EQ(tolerated_operations_.count(object_id4), 1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this test isn't actually testing much and a higher level test would be ideal where you can have real task lifecycles. e.g. we want to make sure we don't have a check failure if we fail a task or retry a task somewhere. Maybe even having a real memory store would be better and the only thing you fake out is the plasma client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added high-level tests that use a real CoreWorkerMemoryStore and only stub plasma puts, covering ObjectStoreFull, TransientObjectStoreFull (with retries), and dynamic-return put failure leading to immediate task failure.

@codope codope force-pushed the ray-check_PutInLocalPlasmaStore branch from ad09c9a to 136c43c Compare August 18, 2025 16:00
@codope
Copy link
Contributor Author

codope commented Aug 18, 2025

How possible are these two plasma errors and should we just fail everything and shutdown if either of them happen anyways? If the object store is full and there's nowhere to spill / nothing to offload or if there's an IPC failure, I feel like current behavior is to just ungracefully crash and the new behavior here should just be to fail everything and shutdown gracefully if this operation fails?

There's no precedent for handling these things gracefully today and I don't think users actually ever hit ObjectStoreFull or IOError on an ipc unless it's shutting down so the goal here shouldn't be to try to handle it gracefully.

ObjectStoreFull/OutOfDisk are possible in steady state (spilling/backlog/disk), so we fail the operation and surface the error to the caller; IPC IOError outside shutdown indicates backend death, so we trigger a graceful worker exit, while shutdown-time IOErrors don’t crash but still fail the task.

@codope
Copy link
Contributor Author

codope commented Aug 18, 2025

@edoakes @dayshah @jjyao @MengjinYan As part of fixing the RAY_CHECK failure when the plasma store connection is broken, this PR standardizes error handling for plasma and Raylet IPC in the core worker (PutInLocalPlasmaCallback) and TaskManager, moving away from callsite tolerance/silent handling toward explicit propagation + centralized policy:

What changed

  • Propagate Status from the plasma put callback instead of swallowing it (no hidden "shutdown gotcha").
  • Centralize handling in TaskManager:
    • TransientObjectStoreFull: bounded retry (3) with short inline backoff; on continued failure, fail task and write error to memory store to unblock waiters.
    • ObjectStoreFull/OutOfDisk: fail task (no crash), write error to memory store; this reflects documented, user-facing semantics for memory pressure.
    • IOError (IPC) outside shutdown: treat as fatal/system-exit; during shutdown, don’t crash but still fail the task and publish an error.

Why this approach

  • Prevents hangs: bad puts now deterministically result in error objects or exit -- no silent "success" assumptions that lead to stuck waiters.
  • Safety and observability: centralized handling gives one place to tune policy and metrics; callsite toleration is brittle and hard to reason about.
  • Matches current semantics: ObjectStoreFull in Ray already surfaces to users; IPC failures typically imply raylet/plasma death (outside shutdown) and should exit.
  • Minimal blast radius: retries are scoped, with a small cap, to handle short spill/GC grace windows without masking real capacity problems.

Open questions
To apply the similar pattern to other Raylet IPC touchpoints, there are some open questions where I'd like input (to converge on a standard):

  • Retry policy: Is "up to 3 retries" with a short inline backoff the right default for TransientObjectStoreFull? Should we use exponential backoff (and what base/ceiling), or gate with a RayConfig?
  • Escalation thresholds: If repeated ObjectStoreFull persists, do we want to escalate to a system-level signal (e.g., push user-facing advisory, trigger node-local diagnostics)?
  • IPC IOError classification: We treat non-shutdown IOErrors as fatal (graceful exit). Are there legitimate non-fatal IOErrors we should tolerate/retry?

Copy link
Contributor

@MengjinYan MengjinYan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@edoakes edoakes merged commit dded833 into ray-project:master Aug 21, 2025
5 checks passed
Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry got back to re-reviewing this after the hackathon

There's some spots where there seems to be no error handling, can you check it out @codope cc @edoakes

// Do not proceed with normal completion. Mark task failed immediately.
Status st = direct_or.status();
FailOrRetryPendingTask(task_id,
rpc::ErrorType::WORKER_DIED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WORKER_DIED isn't the right error type here, worker died is normally referring to the executor worker dying, but here something wrong is happening on the owner, the error type should be different

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. We now consistently map Status → ErrorType via MapStatusToErrorType (IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY, OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED).

if (!res.ok()) {
RAY_LOG(WARNING).WithField(generator_return_id)
<< "Failed to handle generator return during app error propagation: "
<< res.status();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a warning log and forget, if this is possible this seems pretty bad, a cluster could hang waiting on this return value or am i missing that something downstream is handling this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it makes sense for HandleTaskReturn to just handle all error types instead of just ObjectStoreFull and not propagate up so we don't have to handle at every upstream location

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. No log-and-forget remains: on any HandleTaskReturn error we immediately FailOrRetryPendingTask with the mapped error and stop, so waiters are unblocked. I recommend keeping HandleTaskReturn side‑effect free (it already returns StatusOr) and centralizing failure policy in TaskManager callsites. To reduce duplication, I added a small helper (MapStatusToErrorType) and use it in the few places we call FailOrRetryPendingTask.

Status s = put_in_local_plasma_callback_(error, generator_return_id);
if (!s.ok()) {
RAY_LOG(WARNING).WithField(generator_return_id)
<< "Failed to put error object in plasma: " << s;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a log and forget with no memory store put, could lead to a hanging cluster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. We always write the error to the in-memory store first (unblocks), then best-effort plasma put with warning on failure.

@dayshah
Copy link
Contributor

dayshah commented Aug 25, 2025

Also noticing that we're not actually marking the task as failed in all the situations where storing the output fails, this wouldn't totally make sense to a user...

edoakes added a commit to edoakes/ray that referenced this pull request Aug 28, 2025
edoakes added a commit to edoakes/ray that referenced this pull request Aug 28, 2025
…ore race condition (ray-project#55367)"

This reverts commit dded833.

Signed-off-by: Edward Oakes <[email protected]>
edoakes pushed a commit that referenced this pull request Sep 9, 2025
## Why are these changes needed?

Followup to
#55367 (review)

- map Status -> ErrorType via MapStatusToErrorType
(IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY,
OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED).
- no longer log‑and‑forget on any HandleTaskReturn error.
- For streaming generator returns we now always put the error into the
in‑memory store first (unblocks waiters) and only then best‑effort put
to plasma; failures there are just warnings.

---------

Signed-off-by: Sagar Sumit <[email protected]>
jugalshah291 pushed a commit to jugalshah291/ray_fork that referenced this pull request Sep 11, 2025
… condition (ray-project#55367)

## Why are these changes needed?

Workers crash with a fatal `RAY_CHECK` failure when the plasma store
connection is broken during shutdown, causing the following error:
```
RAY_CHECK failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe
```
Stacktrace:
```
core_worker.cc:720 C  Check failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe
*** StackTrace Information ***
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x141789a) [0x7924dd2c689a] ray::operator<<()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x479) [0x7924dd2c9319] ray::RayLog::~RayLog()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x95cc8a) [0x7924dc80bc8a] ray::core::CoreWorker::CoreWorker()::{lambda()ray-project#13}::operator()()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager27MarkTaskReturnObjectsFailedERKNS_17TaskSpecificationENS_3rpc9ErrorTypeEPKNS5_12RayErrorInfoERKN4absl12lts_2023080213flat_hash_setINS_8ObjectIDENSB_13hash_internal4HashISD_EESt8equal_toISD_ESaISD_EEE+0x679) [0x7924dc868f29] ray::core::TaskManager::MarkTaskReturnObjectsFailed()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager15FailPendingTaskERKNS_6TaskIDENS_3rpc9ErrorTypeEPKNS_6StatusEPKNS5_12RayErrorInfoE+0x416) [0x7924dc86f186] ray::core::TaskManager::FailPendingTask()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x9a90e6) [0x7924dc8580e6] ray::core::NormalTaskSubmitter::RequestNewWorkerIfNeeded()::{lambda()ray-project#1}::operator()()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray3rpc14ClientCallImplINS0_23RequestWorkerLeaseReplyEE15OnReplyReceivedEv+0x68) [0x7924dc94aa48] ray::rpc::ClientCallImpl<>::OnReplyReceived()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFvvEZN3ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E9_M_invokeERKSt9_Any_data+0x15) [0x7924dc79e285] std::_Function_handler<>::_M_invoke()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd9b4c8) [0x7924dcc4a4c8] EventTracker::RecordExecution()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd4648e) [0x7924dcbf548e] std::_Function_handler<>::_M_invoke()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd46906) [0x7924dcbf5906] boost::asio::detail::completion_handler<>::do_complete()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f417b) [0x7924dd2a317b] boost::asio::detail::scheduler::do_run_one()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f5af9) [0x7924dd2a4af9] boost::asio::detail::scheduler::run()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f6202) [0x7924dd2a5202] boost::asio::io_context::run()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker12RunIOServiceEv+0x91) [0x7924dc793a61] ray::core::CoreWorker::RunIOService()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xcba0b0) [0x7924dcb690b0] thread_proxy
/lib/x86_64-linux-gnu/libc.so.6(+0x94ac3) [0x7924dde71ac3]
/lib/x86_64-linux-gnu/libc.so.6(+0x126850) [0x7924ddf03850]
```

Stack trace flow:
1. Task lease request fails ->
`NormalTaskSubmitter::RequestNewWorkerIfNeeded()` callback.
2. Triggers `TaskManager::FailPendingTask()` ->
`MarkTaskReturnObjectsFailed()`.
3. System attempts to store error objects in plasma via
`put_in_local_plasma_callback_`.
4. Plasma connection is broken (raylet/plasma store already shut down).
5. `RAY_CHECK_OK()` in the callback causes fatal crash instead of
graceful handling.

Root Cause:

This is a shutdown ordering race condition:
1. Raylet shuts down first: The raylet stops its IO context
([main_service_.stop()](https://github.com/ray-project/ray/blob/77c5475195e56a26891d88460973198391d20edf/src/ray/object_manager/plasma/store_runner.cc#L146))
which closes plasma store connections.
2. Worker still processes callbacks: Core worker continues processing
pending callbacks on separate threads.
3. Broken connection: When the callback tries to store error objects in
plasma, the connection is already closed.
4. Fatal crash: The `RAY_CHECK_OK()` treats this as an unexpected error
and crashes the process.

Fix:

1. Shutdown-aware plasma operations
- Add `CoreWorker::IsShuttingDown()` method to check shutdown state.
- Skip plasma operations entirely when shutdown is in progress.
- Prevents attempting operations on already-closed connections.

2. Targeted error handling for connection failures
- Replace blanket `RAY_CHECK_OK()` with specific error type checking.
- Handle connection errors (Broken pipe, Connection reset, Bad file
descriptor) as warnings during shutdown scenarios.
- Maintain `RAY_CHECK_OK()` for other error types to catch real issues.

---------

Signed-off-by: Sagar Sumit <[email protected]>
Signed-off-by: jugalshah291 <[email protected]>
ZacAttack pushed a commit to ZacAttack/ray that referenced this pull request Sep 24, 2025
## Why are these changes needed?

Followup to
ray-project#55367 (review)

- map Status -> ErrorType via MapStatusToErrorType
(IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY,
OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED).
- no longer log‑and‑forget on any HandleTaskReturn error.
- For streaming generator returns we now always put the error into the
in‑memory store first (unblocks waiters) and only then best‑effort put
to plasma; failures there are just warnings.

---------

Signed-off-by: Sagar Sumit <[email protected]>
Signed-off-by: zac <[email protected]>
dstrodtman pushed a commit that referenced this pull request Oct 6, 2025
… condition (#55367)

## Why are these changes needed?

Workers crash with a fatal `RAY_CHECK` failure when the plasma store
connection is broken during shutdown, causing the following error:
```
RAY_CHECK failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe
```
Stacktrace:
```
core_worker.cc:720 C  Check failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe
*** StackTrace Information ***
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x141789a) [0x7924dd2c689a] ray::operator<<()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x479) [0x7924dd2c9319] ray::RayLog::~RayLog()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x95cc8a) [0x7924dc80bc8a] ray::core::CoreWorker::CoreWorker()::{lambda()#13}::operator()()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager27MarkTaskReturnObjectsFailedERKNS_17TaskSpecificationENS_3rpc9ErrorTypeEPKNS5_12RayErrorInfoERKN4absl12lts_2023080213flat_hash_setINS_8ObjectIDENSB_13hash_internal4HashISD_EESt8equal_toISD_ESaISD_EEE+0x679) [0x7924dc868f29] ray::core::TaskManager::MarkTaskReturnObjectsFailed()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager15FailPendingTaskERKNS_6TaskIDENS_3rpc9ErrorTypeEPKNS_6StatusEPKNS5_12RayErrorInfoE+0x416) [0x7924dc86f186] ray::core::TaskManager::FailPendingTask()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x9a90e6) [0x7924dc8580e6] ray::core::NormalTaskSubmitter::RequestNewWorkerIfNeeded()::{lambda()#1}::operator()()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray3rpc14ClientCallImplINS0_23RequestWorkerLeaseReplyEE15OnReplyReceivedEv+0x68) [0x7924dc94aa48] ray::rpc::ClientCallImpl<>::OnReplyReceived()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFvvEZN3ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E9_M_invokeERKSt9_Any_data+0x15) [0x7924dc79e285] std::_Function_handler<>::_M_invoke()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd9b4c8) [0x7924dcc4a4c8] EventTracker::RecordExecution()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd4648e) [0x7924dcbf548e] std::_Function_handler<>::_M_invoke()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd46906) [0x7924dcbf5906] boost::asio::detail::completion_handler<>::do_complete()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f417b) [0x7924dd2a317b] boost::asio::detail::scheduler::do_run_one()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f5af9) [0x7924dd2a4af9] boost::asio::detail::scheduler::run()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f6202) [0x7924dd2a5202] boost::asio::io_context::run()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker12RunIOServiceEv+0x91) [0x7924dc793a61] ray::core::CoreWorker::RunIOService()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xcba0b0) [0x7924dcb690b0] thread_proxy
/lib/x86_64-linux-gnu/libc.so.6(+0x94ac3) [0x7924dde71ac3]
/lib/x86_64-linux-gnu/libc.so.6(+0x126850) [0x7924ddf03850]
```

Stack trace flow:
1. Task lease request fails ->
`NormalTaskSubmitter::RequestNewWorkerIfNeeded()` callback.
2. Triggers `TaskManager::FailPendingTask()` ->
`MarkTaskReturnObjectsFailed()`.
3. System attempts to store error objects in plasma via
`put_in_local_plasma_callback_`.
4. Plasma connection is broken (raylet/plasma store already shut down).
5. `RAY_CHECK_OK()` in the callback causes fatal crash instead of
graceful handling.

Root Cause:

This is a shutdown ordering race condition:
1. Raylet shuts down first: The raylet stops its IO context
([main_service_.stop()](https://github.com/ray-project/ray/blob/77c5475195e56a26891d88460973198391d20edf/src/ray/object_manager/plasma/store_runner.cc#L146))
which closes plasma store connections.
2. Worker still processes callbacks: Core worker continues processing
pending callbacks on separate threads.
3. Broken connection: When the callback tries to store error objects in
plasma, the connection is already closed.
4. Fatal crash: The `RAY_CHECK_OK()` treats this as an unexpected error
and crashes the process.

Fix:

1. Shutdown-aware plasma operations
- Add `CoreWorker::IsShuttingDown()` method to check shutdown state.
- Skip plasma operations entirely when shutdown is in progress.
- Prevents attempting operations on already-closed connections.

2. Targeted error handling for connection failures
- Replace blanket `RAY_CHECK_OK()` with specific error type checking.
- Handle connection errors (Broken pipe, Connection reset, Bad file
descriptor) as warnings during shutdown scenarios.
- Maintain `RAY_CHECK_OK()` for other error types to catch real issues.

---------

Signed-off-by: Sagar Sumit <[email protected]>
Signed-off-by: Douglas Strodtman <[email protected]>
dstrodtman pushed a commit to dstrodtman/ray that referenced this pull request Oct 6, 2025
## Why are these changes needed?

Followup to
ray-project#55367 (review)

- map Status -> ErrorType via MapStatusToErrorType
(IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY,
OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED).
- no longer log‑and‑forget on any HandleTaskReturn error.
- For streaming generator returns we now always put the error into the
in‑memory store first (unblocks waiters) and only then best‑effort put
to plasma; failures there are just warnings.

---------

Signed-off-by: Sagar Sumit <[email protected]>
Signed-off-by: Douglas Strodtman <[email protected]>
justinyeh1995 pushed a commit to justinyeh1995/ray that referenced this pull request Oct 20, 2025
## Why are these changes needed?

Followup to
ray-project#55367 (review)

- map Status -> ErrorType via MapStatusToErrorType
(IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY,
OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED).
- no longer log‑and‑forget on any HandleTaskReturn error.
- For streaming generator returns we now always put the error into the
in‑memory store first (unblocks waiters) and only then best‑effort put
to plasma; failures there are just warnings.

---------

Signed-off-by: Sagar Sumit <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants