-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[core] Fix RAY_CHECK failure during shutdown due to plasma store race condition #55367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Fix RAY_CHECK failure during shutdown due to plasma store race condition #55367
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @codope, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request addresses a critical RAY_CHECK failure that occurs when Ray workers shut down, specifically due to a race condition where the plasma store connection breaks before all pending operations complete. The changes implement a more robust shutdown mechanism by allowing the core worker to detect its shutdown state and gracefully handle expected connection errors with the plasma store, preventing fatal crashes and improving system stability.
Highlights
- Graceful Shutdown: Introduced a new
IsShuttingDown()method to theCoreWorkerclass, allowing the system to detect if the worker is in the process of shutting down. This enables plasma store operations to be conditionally skipped during shutdown, preventing attempts to interact with an already-closed connection. - Robust Error Handling: Enhanced the error handling for
PutInLocalPlasmaStoreoperations within theput_in_local_plasma_callback. Instead of a blanketRAY_CHECK_OK()that would crash on connection errors, specificIOErrormessages like 'Broken pipe', 'Connection reset', and 'Bad file descriptor' are now caught and logged as warnings during shutdown, ensuring graceful failure. Other error types still trigger a fatalRAY_CHECK.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
An excellent fix for a tricky shutdown race condition. The changes are well-reasoned and the addition of IsShuttingDown() along with more granular error handling in the put_in_local_plasma_callback effectively addresses the RAY_CHECK failure. My main feedback is a suggestion to improve the maintainability of the error-checking logic. Overall, this is a solid contribution that improves the robustness of the worker shutdown process.
5a1b4cc to
e489752
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We likely probably have other similar edge conditions with the Raylet IPC client and other areas where we call the plasma client.
Not really sure the best way to handle this. In an ideal world, we'd propagate the error all the way back to the callsite and abort whatever operation is going on (which should be ok if we're in the shutdown procedure).
|
@dayshah @israbbani this is very much in the realm of the ongoing discussion about improving our status/error handling. |
2bf9a69 to
633b4f1
Compare
@codope what other statuses can the put in plasma possibly return other than IOError? Can you just document that in the header of the function so we explicitly know what's being handled also we're just gonna have refactor everything with the new statuses after we merge that anyway, i think the usage makes sense if that's the only error code that it could return |
|
@dayshah @edoakes I've added documentation to |
5d8f603 to
3552891
Compare
| /// Return status semantics: | ||
| /// - Status::OK(): The object was created (or already existed) and bookkeeping was | ||
| /// updated. Note: an internal ObjectExists from the plasma provider is treated | ||
| /// as OK and does not surface here. | ||
| /// - Status::ObjectStoreFull(): The local plasma store is out of memory (or out of | ||
| /// disk when spilling). The error message contains context and a short memory | ||
| /// report. | ||
| /// - Status::IOError(): IPC/connection failures while talking to the plasma store | ||
| /// (e.g., broken pipe/connection reset during shutdown, store not reachable). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
| // During shutdown, plasma store connections can be closed causing IOError. | ||
| // Tolerate only IOError during shutdown to avoid masking real errors. | ||
| if (status.IsIOError() && core_worker->IsShuttingDown()) { | ||
| // Double-check shutdown state - this handles the race where shutdown | ||
| // began after our first check but before the plasma operation. | ||
| RAY_LOG(WARNING) << "Failed to put error object " << object_id | ||
| << " in plasma store during shutdown: " << status.ToString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still feels a bit dangerous to me because the callsites will assume that the plasma operation succeeded, when it really silently failed. This might cause check failures in other places in the stack or other undefined behavior due to a broken invariant.
It's unclear to me that it is a significant improvement over what we have now, which is equally incorrect but at least a little bit more explicit.
I'm OK with it as a temporary solution, but only if we intend to follow up and propagate the status appropriately and handle it at the callsites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, in the task manager this would likely mean catching the bad status and marking the relevant task as failed due to an unexpected error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already tightened the current behavior to only tolerate IOError during shutdown and made the log explicit that the object was not stored. That said, I totally agree with the concern. Maybe we can add a counter/metric plasma_put_failed_due_to_shutdown so we can alert/observe this path.
To fully address this, I'll follow up by changing the PutInLocalPlasmaCallback to return Status and propagate that through TaskManager. For failures that are not (shutdown + IOError), TaskManager will mark the task failed with an unexpected/system error. This ensures callsites don't assume success on a failed plasma put. I'll also audit other Raylet IPC/plasma callsites for the same pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, changing the PutInLocalPlasmaCallback to return Status and propagate that through TaskManager would probably be a small, contained refactor. So I can it up in this PR itself. And auditing other Raylet IPC/plasma callsites for the same pattern could be a followup. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done the small refactor, and tracking full audit in #55612
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels scary and kind of a gotcha deeper in the callstack. Would prefer just propagating the error instead of hiding it if shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, i have revised this. Deep callback no longer swallows errors. It logs and returns the Status, and shutdown-specific handling is centralized in TaskManager policy.
The issue is very similar to the issue that we fixed before: #53679. Both in the sense the worker cannot talk to the raylet after raylet shutdown which caused the job to fail. The followup item to that change is to pass the correct error back to the owner of the task so that the task lifecycle can be handled properly there. And I think this is probably something we should here for the mid term fix as well. |
| } else { | ||
| // For any other error, or IOError when not shutting down, this indicates | ||
| // a real problem that should crash the process. | ||
| RAY_CHECK_OK(status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might derives focus a bit from the PR. But I'm wondering in the error case here, from task execution's perspective, should we crash & fail the job or we should throw a system error & let the task owner to handle the task life cycle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not crash the worker. We should return a non-OK Status, have TaskManager mark the task attempt as failed (or retry if allowed), and surface a system error to the task owner. The worker keeps running and the owner handles the lifecycle.
b9e8e3d to
ad09c9a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How possible are these two plasma errors and should we just fail everything and shutdown if either of them happen anyways? If the object store is full and there's nowhere to spill / nothing to offload or if there's an IPC failure, I feel like current behavior is to just ungracefully crash and the new behavior here should just be to fail everything and shutdown gracefully if this operation fails?
There's no precedent for handling these things gracefully today and I don't think users actually ever hit ObjectStoreFull or IOError on an ipc unless it's shutting down so the goal here shouldn't be to try to handle it gracefully.
src/ray/core_worker/task_manager.cc
Outdated
| NodeID::FromBinary(request.worker_addr().node_id()), | ||
| /*store_in_plasma=*/store_in_plasma_ids.contains(object_id)); | ||
| bool _direct_unused = false; | ||
| RAY_UNUSED( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the bad status is going completely unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed; all HandleTaskReturn results are checked and acted upon (fail/abort/log), and the RAY_UNUSED is removed.
src/ray/core_worker/task_manager.cc
Outdated
| const rpc::ReturnObject &return_object, | ||
| const NodeID &worker_node_id, | ||
| bool store_in_plasma, | ||
| bool *direct_return_out) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer StatusOr to out parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done; the method now returns StatusOr (bool indicates direct return) and has no out parameters.
| /// as OK and does not surface here. | ||
| /// - Status::ObjectStoreFull(): The local plasma store is out of memory (or out of | ||
| /// disk when spilling). The error message contains context and a short memory | ||
| /// report. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is standard behavior here if we're out of spill (do we just crash everywhere)... how do we recover because this could result in all kinds of problems. Also this is guaranteed to try to spill right, because the usage is always that we're storing a primary copy through this code path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spilling is attempted automatically on puts through this path:
-
Puts use CreateAndSpillIfNeeded:
ray/src/ray/core_worker/store_provider/plasma_store_provider.cc
Lines 146 to 163 in e0d8e6f
if (status.IsObjectStoreFull()) { StatusOr<std::string> memory_usage = GetMemoryUsage(); RAY_CHECK_OK(memory_usage.status()) << "Unable to communicate with the Plasma Store."; RAY_LOG(ERROR) << "Failed to put object " << object_id << " in object store because it " << "is full. Object size is " << data_size << " bytes.\n" << "Plasma store status:\n" << memory_usage.value() << "\n---\n" << "--- Tip: Use the `ray memory` command to list active objects " "in the cluster." << "\n---\n"; // Replace the status with a more helpful error message. std::ostringstream message; message << "Failed to put object " << object_id << " in object store because it " << "is full. Object size is " << data_size << " bytes."; status = Status::ObjectStoreFull(message.str()); } else if (status.IsObjectExists()) { -
plasma-side queue returns transient/full while it triggers GC/spill, then falls back to disk, and only after grace/fallback fails does it surface OutOfDisk:
ray/src/ray/object_manager/plasma/create_request_queue.cc
Lines 117 to 131 in e0d8e6f
auto spill_pending = spill_objects_callback_(); if (spill_pending) { RAY_LOG(DEBUG) << "Reset grace period " << status << " " << spill_pending; oom_start_time_ns_ = -1; return Status::TransientObjectStoreFull("Waiting for objects to spill."); } if (now - oom_start_time_ns_ < grace_period_ns) { // We need a grace period since (1) global GC takes a bit of time to // kick in, and (2) there is a race between spilling finishing and space // actually freeing up in the object store. RAY_LOG(DEBUG) << "In grace period before fallback allocation / oom."; return Status::ObjectStoreFull("Waiting for grace period."); } // Trigger the fallback allocator. status = ProcessRequest(/*fallback_allocator=*/true, *request_it);
| // During shutdown, plasma store connections can be closed causing IOError. | ||
| // Tolerate only IOError during shutdown to avoid masking real errors. | ||
| if (status.IsIOError() && core_worker->IsShuttingDown()) { | ||
| // Double-check shutdown state - this handles the race where shutdown | ||
| // began after our first check but before the plasma operation. | ||
| RAY_LOG(WARNING) << "Failed to put error object " << object_id | ||
| << " in plasma store during shutdown: " << status.ToString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels scary and kind of a gotcha deeper in the callstack. Would prefer just propagating the error instead of hiding it if shutdown.
src/ray/core_worker/task_manager.cc
Outdated
| &direct); | ||
| if (!s.ok()) { | ||
| RAY_LOG(WARNING).WithField(object_id) | ||
| << "Failed to handle dynamic task return: " << s; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most of these we're not actually doing anything with the bad status. This is pretty scary because a failed put can lead to hanging cluster, which is one of the hardest things to debug. This is all already in the pretty complicated reconstruction or MarkTaskReturnObjectsFailed path. At least now the owner just crashes so it's obvious lol. We should at minimum fail the task if we're shutting down so we can put an error in the memory store to unblock things and ideally should retry an operation if we ended up with a full obj store or some other unexpected transient error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now propagate the real Status out of the callback, handle it centrally in TaskManager: bounded retries on TransientObjectStoreFull, immediate task failure on persistent errors (writing an error to the memory store to unblock waiters), and system-exit semantics for non-shutdown IPC errors.
| failed_operations_.insert(object_id4); | ||
| } | ||
| ASSERT_EQ(tolerated_operations_.count(object_id4), 1); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this test isn't actually testing much and a higher level test would be ideal where you can have real task lifecycles. e.g. we want to make sure we don't have a check failure if we fail a task or retry a task somewhere. Maybe even having a real memory store would be better and the only thing you fake out is the plasma client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added high-level tests that use a real CoreWorkerMemoryStore and only stub plasma puts, covering ObjectStoreFull, TransientObjectStoreFull (with retries), and dynamic-return put failure leading to immediate task failure.
… condition Signed-off-by: Sagar Sumit <[email protected]>
Signed-off-by: Sagar Sumit <[email protected]>
Signed-off-by: Sagar Sumit <[email protected]>
Signed-off-by: Sagar Sumit <[email protected]>
Signed-off-by: Sagar Sumit <[email protected]>
ad09c9a to
136c43c
Compare
ObjectStoreFull/OutOfDisk are possible in steady state (spilling/backlog/disk), so we fail the operation and surface the error to the caller; IPC IOError outside shutdown indicates backend death, so we trigger a graceful worker exit, while shutdown-time IOErrors don’t crash but still fail the task. |
|
@edoakes @dayshah @jjyao @MengjinYan As part of fixing the What changed
Why this approach
Open questions
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Signed-off-by: Sagar Sumit <[email protected]>
Signed-off-by: Sagar Sumit <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Do not proceed with normal completion. Mark task failed immediately. | ||
| Status st = direct_or.status(); | ||
| FailOrRetryPendingTask(task_id, | ||
| rpc::ErrorType::WORKER_DIED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WORKER_DIED isn't the right error type here, worker died is normally referring to the executor worker dying, but here something wrong is happening on the owner, the error type should be different
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. We now consistently map Status → ErrorType via MapStatusToErrorType (IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY, OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED).
| if (!res.ok()) { | ||
| RAY_LOG(WARNING).WithField(generator_return_id) | ||
| << "Failed to handle generator return during app error propagation: " | ||
| << res.status(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a warning log and forget, if this is possible this seems pretty bad, a cluster could hang waiting on this return value or am i missing that something downstream is handling this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it makes sense for HandleTaskReturn to just handle all error types instead of just ObjectStoreFull and not propagate up so we don't have to handle at every upstream location
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed. No log-and-forget remains: on any HandleTaskReturn error we immediately FailOrRetryPendingTask with the mapped error and stop, so waiters are unblocked. I recommend keeping HandleTaskReturn side‑effect free (it already returns StatusOr) and centralizing failure policy in TaskManager callsites. To reduce duplication, I added a small helper (MapStatusToErrorType) and use it in the few places we call FailOrRetryPendingTask.
| Status s = put_in_local_plasma_callback_(error, generator_return_id); | ||
| if (!s.ok()) { | ||
| RAY_LOG(WARNING).WithField(generator_return_id) | ||
| << "Failed to put error object in plasma: " << s; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a log and forget with no memory store put, could lead to a hanging cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. We always write the error to the in-memory store first (unblocks), then best-effort plasma put with warning on failure.
|
Also noticing that we're not actually marking the task as failed in all the situations where storing the output fails, this wouldn't totally make sense to a user... |
…ore race condition (ray-project#55367)" This reverts commit dded833.
…ore race condition (ray-project#55367)" This reverts commit dded833. Signed-off-by: Edward Oakes <[email protected]>
## Why are these changes needed? Followup to #55367 (review) - map Status -> ErrorType via MapStatusToErrorType (IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY, OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED). - no longer log‑and‑forget on any HandleTaskReturn error. - For streaming generator returns we now always put the error into the in‑memory store first (unblocks waiters) and only then best‑effort put to plasma; failures there are just warnings. --------- Signed-off-by: Sagar Sumit <[email protected]>
… condition (ray-project#55367) ## Why are these changes needed? Workers crash with a fatal `RAY_CHECK` failure when the plasma store connection is broken during shutdown, causing the following error: ``` RAY_CHECK failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe ``` Stacktrace: ``` core_worker.cc:720 C Check failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe *** StackTrace Information *** /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x141789a) [0x7924dd2c689a] ray::operator<<() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x479) [0x7924dd2c9319] ray::RayLog::~RayLog() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x95cc8a) [0x7924dc80bc8a] ray::core::CoreWorker::CoreWorker()::{lambda()ray-project#13}::operator()() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager27MarkTaskReturnObjectsFailedERKNS_17TaskSpecificationENS_3rpc9ErrorTypeEPKNS5_12RayErrorInfoERKN4absl12lts_2023080213flat_hash_setINS_8ObjectIDENSB_13hash_internal4HashISD_EESt8equal_toISD_ESaISD_EEE+0x679) [0x7924dc868f29] ray::core::TaskManager::MarkTaskReturnObjectsFailed() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager15FailPendingTaskERKNS_6TaskIDENS_3rpc9ErrorTypeEPKNS_6StatusEPKNS5_12RayErrorInfoE+0x416) [0x7924dc86f186] ray::core::TaskManager::FailPendingTask() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x9a90e6) [0x7924dc8580e6] ray::core::NormalTaskSubmitter::RequestNewWorkerIfNeeded()::{lambda()ray-project#1}::operator()() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray3rpc14ClientCallImplINS0_23RequestWorkerLeaseReplyEE15OnReplyReceivedEv+0x68) [0x7924dc94aa48] ray::rpc::ClientCallImpl<>::OnReplyReceived() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFvvEZN3ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E9_M_invokeERKSt9_Any_data+0x15) [0x7924dc79e285] std::_Function_handler<>::_M_invoke() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd9b4c8) [0x7924dcc4a4c8] EventTracker::RecordExecution() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd4648e) [0x7924dcbf548e] std::_Function_handler<>::_M_invoke() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd46906) [0x7924dcbf5906] boost::asio::detail::completion_handler<>::do_complete() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f417b) [0x7924dd2a317b] boost::asio::detail::scheduler::do_run_one() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f5af9) [0x7924dd2a4af9] boost::asio::detail::scheduler::run() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f6202) [0x7924dd2a5202] boost::asio::io_context::run() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker12RunIOServiceEv+0x91) [0x7924dc793a61] ray::core::CoreWorker::RunIOService() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xcba0b0) [0x7924dcb690b0] thread_proxy /lib/x86_64-linux-gnu/libc.so.6(+0x94ac3) [0x7924dde71ac3] /lib/x86_64-linux-gnu/libc.so.6(+0x126850) [0x7924ddf03850] ``` Stack trace flow: 1. Task lease request fails -> `NormalTaskSubmitter::RequestNewWorkerIfNeeded()` callback. 2. Triggers `TaskManager::FailPendingTask()` -> `MarkTaskReturnObjectsFailed()`. 3. System attempts to store error objects in plasma via `put_in_local_plasma_callback_`. 4. Plasma connection is broken (raylet/plasma store already shut down). 5. `RAY_CHECK_OK()` in the callback causes fatal crash instead of graceful handling. Root Cause: This is a shutdown ordering race condition: 1. Raylet shuts down first: The raylet stops its IO context ([main_service_.stop()](https://github.com/ray-project/ray/blob/77c5475195e56a26891d88460973198391d20edf/src/ray/object_manager/plasma/store_runner.cc#L146)) which closes plasma store connections. 2. Worker still processes callbacks: Core worker continues processing pending callbacks on separate threads. 3. Broken connection: When the callback tries to store error objects in plasma, the connection is already closed. 4. Fatal crash: The `RAY_CHECK_OK()` treats this as an unexpected error and crashes the process. Fix: 1. Shutdown-aware plasma operations - Add `CoreWorker::IsShuttingDown()` method to check shutdown state. - Skip plasma operations entirely when shutdown is in progress. - Prevents attempting operations on already-closed connections. 2. Targeted error handling for connection failures - Replace blanket `RAY_CHECK_OK()` with specific error type checking. - Handle connection errors (Broken pipe, Connection reset, Bad file descriptor) as warnings during shutdown scenarios. - Maintain `RAY_CHECK_OK()` for other error types to catch real issues. --------- Signed-off-by: Sagar Sumit <[email protected]> Signed-off-by: jugalshah291 <[email protected]>
## Why are these changes needed? Followup to ray-project#55367 (review) - map Status -> ErrorType via MapStatusToErrorType (IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY, OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED). - no longer log‑and‑forget on any HandleTaskReturn error. - For streaming generator returns we now always put the error into the in‑memory store first (unblocks waiters) and only then best‑effort put to plasma; failures there are just warnings. --------- Signed-off-by: Sagar Sumit <[email protected]> Signed-off-by: zac <[email protected]>
… condition (#55367) ## Why are these changes needed? Workers crash with a fatal `RAY_CHECK` failure when the plasma store connection is broken during shutdown, causing the following error: ``` RAY_CHECK failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe ``` Stacktrace: ``` core_worker.cc:720 C Check failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe *** StackTrace Information *** /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x141789a) [0x7924dd2c689a] ray::operator<<() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x479) [0x7924dd2c9319] ray::RayLog::~RayLog() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x95cc8a) [0x7924dc80bc8a] ray::core::CoreWorker::CoreWorker()::{lambda()#13}::operator()() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager27MarkTaskReturnObjectsFailedERKNS_17TaskSpecificationENS_3rpc9ErrorTypeEPKNS5_12RayErrorInfoERKN4absl12lts_2023080213flat_hash_setINS_8ObjectIDENSB_13hash_internal4HashISD_EESt8equal_toISD_ESaISD_EEE+0x679) [0x7924dc868f29] ray::core::TaskManager::MarkTaskReturnObjectsFailed() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager15FailPendingTaskERKNS_6TaskIDENS_3rpc9ErrorTypeEPKNS_6StatusEPKNS5_12RayErrorInfoE+0x416) [0x7924dc86f186] ray::core::TaskManager::FailPendingTask() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x9a90e6) [0x7924dc8580e6] ray::core::NormalTaskSubmitter::RequestNewWorkerIfNeeded()::{lambda()#1}::operator()() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray3rpc14ClientCallImplINS0_23RequestWorkerLeaseReplyEE15OnReplyReceivedEv+0x68) [0x7924dc94aa48] ray::rpc::ClientCallImpl<>::OnReplyReceived() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFvvEZN3ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E9_M_invokeERKSt9_Any_data+0x15) [0x7924dc79e285] std::_Function_handler<>::_M_invoke() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd9b4c8) [0x7924dcc4a4c8] EventTracker::RecordExecution() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd4648e) [0x7924dcbf548e] std::_Function_handler<>::_M_invoke() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd46906) [0x7924dcbf5906] boost::asio::detail::completion_handler<>::do_complete() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f417b) [0x7924dd2a317b] boost::asio::detail::scheduler::do_run_one() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f5af9) [0x7924dd2a4af9] boost::asio::detail::scheduler::run() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f6202) [0x7924dd2a5202] boost::asio::io_context::run() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker12RunIOServiceEv+0x91) [0x7924dc793a61] ray::core::CoreWorker::RunIOService() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xcba0b0) [0x7924dcb690b0] thread_proxy /lib/x86_64-linux-gnu/libc.so.6(+0x94ac3) [0x7924dde71ac3] /lib/x86_64-linux-gnu/libc.so.6(+0x126850) [0x7924ddf03850] ``` Stack trace flow: 1. Task lease request fails -> `NormalTaskSubmitter::RequestNewWorkerIfNeeded()` callback. 2. Triggers `TaskManager::FailPendingTask()` -> `MarkTaskReturnObjectsFailed()`. 3. System attempts to store error objects in plasma via `put_in_local_plasma_callback_`. 4. Plasma connection is broken (raylet/plasma store already shut down). 5. `RAY_CHECK_OK()` in the callback causes fatal crash instead of graceful handling. Root Cause: This is a shutdown ordering race condition: 1. Raylet shuts down first: The raylet stops its IO context ([main_service_.stop()](https://github.com/ray-project/ray/blob/77c5475195e56a26891d88460973198391d20edf/src/ray/object_manager/plasma/store_runner.cc#L146)) which closes plasma store connections. 2. Worker still processes callbacks: Core worker continues processing pending callbacks on separate threads. 3. Broken connection: When the callback tries to store error objects in plasma, the connection is already closed. 4. Fatal crash: The `RAY_CHECK_OK()` treats this as an unexpected error and crashes the process. Fix: 1. Shutdown-aware plasma operations - Add `CoreWorker::IsShuttingDown()` method to check shutdown state. - Skip plasma operations entirely when shutdown is in progress. - Prevents attempting operations on already-closed connections. 2. Targeted error handling for connection failures - Replace blanket `RAY_CHECK_OK()` with specific error type checking. - Handle connection errors (Broken pipe, Connection reset, Bad file descriptor) as warnings during shutdown scenarios. - Maintain `RAY_CHECK_OK()` for other error types to catch real issues. --------- Signed-off-by: Sagar Sumit <[email protected]> Signed-off-by: Douglas Strodtman <[email protected]>
## Why are these changes needed? Followup to ray-project#55367 (review) - map Status -> ErrorType via MapStatusToErrorType (IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY, OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED). - no longer log‑and‑forget on any HandleTaskReturn error. - For streaming generator returns we now always put the error into the in‑memory store first (unblocks waiters) and only then best‑effort put to plasma; failures there are just warnings. --------- Signed-off-by: Sagar Sumit <[email protected]> Signed-off-by: Douglas Strodtman <[email protected]>
## Why are these changes needed? Followup to ray-project#55367 (review) - map Status -> ErrorType via MapStatusToErrorType (IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY, OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED). - no longer log‑and‑forget on any HandleTaskReturn error. - For streaming generator returns we now always put the error into the in‑memory store first (unblocks waiters) and only then best‑effort put to plasma; failures there are just warnings. --------- Signed-off-by: Sagar Sumit <[email protected]>
Why are these changes needed?
Workers crash with a fatal
RAY_CHECKfailure when the plasma store connection is broken during shutdown, causing the following error:Stacktrace:
Stack trace flow:
NormalTaskSubmitter::RequestNewWorkerIfNeeded()callback.TaskManager::FailPendingTask()->MarkTaskReturnObjectsFailed().put_in_local_plasma_callback_.RAY_CHECK_OK()in the callback causes fatal crash instead of graceful handling.Root Cause:
This is a shutdown ordering race condition:
RAY_CHECK_OK()treats this as an unexpected error and crashes the process.Fix:
CoreWorker::IsShuttingDown()method to check shutdown state.RAY_CHECK_OK()with specific error type checking.RAY_CHECK_OK()for other error types to catch real issues.