From 9785231d9efad50ec0e59c47437a57c79e12cea6 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 7 Aug 2025 21:16:23 +0530 Subject: [PATCH 1/7] [core] Fix RAY_CHECK failure during shutdown due to plasma store race condition Signed-off-by: Sagar Sumit --- src/ray/core_worker/core_worker.cc | 4 +++ src/ray/core_worker/core_worker.h | 6 +++++ src/ray/core_worker/core_worker_process.cc | 31 ++++++++++++++++++++-- 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 53288a3c45d9..ed48b1fde915 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -539,6 +539,10 @@ void CoreWorker::Shutdown() { RAY_LOG(INFO) << "Core worker ready to be deallocated."; } +bool CoreWorker::IsShuttingDown() const { + return is_shutdown_.load(); +} + void CoreWorker::ConnectToRayletInternal() { // Tell the raylet the port that we are listening on. // NOTE: This also marks the worker as available in Raylet. We do this at the diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 2299894a3956..d2b2f13e3e7f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -246,6 +246,12 @@ class CoreWorker { /// void Shutdown(); + /// Check if the core worker is currently shutting down. + /// This can be used to avoid operations that might fail during shutdown. + /// + /// \return true if shutdown has been initiated, false otherwise. + bool IsShuttingDown() const; + /// Start receiving and executing tasks. void RunTaskExecutionLoop(); diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index e8218c180a9e..801e524a231a 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -424,8 +424,35 @@ std::shared_ptr CoreWorkerProcessImpl::CreateCoreWorker( /*put_in_local_plasma_callback=*/ [this](const RayObject &object, const ObjectID &object_id) { auto core_worker = GetCoreWorker(); - RAY_CHECK_OK( - core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true)); + + // Check if the core worker is shutting down before attempting plasma operations. + // During shutdown, the plasma store connection may already be broken, so we + // should avoid plasma operations entirely. + if (core_worker->IsShuttingDown()) { + RAY_LOG(INFO) << "Skipping plasma store operation for error object " << object_id + << " because core worker is shutting down."; + return; + } + + auto status = core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true); + if (!status.ok()) { + if (status.IsIOError() && + (status.message().find("Broken pipe") != std::string::npos || + status.message().find("Connection reset") != std::string::npos || + status.message().find("Bad file descriptor") != std::string::npos)) { + // This is likely a shutdown race where the plasma store + // connection was closed before we could complete the operation. + // Log as warning since this is expected during shutdown scenarios. + RAY_LOG(WARNING) << "Failed to put error object " << object_id + << " in plasma store due to connection error (likely shutdown): " + << status.ToString(); + } else { + // For other types of errors, maintain the original + // behavior with RAY_CHECK_OK to catch real issues. + RAY_CHECK_OK(status) << "Failed to put error object " << object_id + << " in plasma store: " << status.ToString(); + } + } }, /* retry_task_callback= */ [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { From cc9921f040ffebfb8440e19bb00f7fbb9a1e0217 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Sun, 10 Aug 2025 16:47:49 +0530 Subject: [PATCH 2/7] remove error msg based check and add test Signed-off-by: Sagar Sumit --- src/ray/core_worker/core_worker.cc | 4 +- src/ray/core_worker/core_worker_process.cc | 35 +++++----- .../core_worker/tests/task_manager_test.cc | 68 +++++++++++++++++++ 3 files changed, 85 insertions(+), 22 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ed48b1fde915..bdb62cd79918 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -539,9 +539,7 @@ void CoreWorker::Shutdown() { RAY_LOG(INFO) << "Core worker ready to be deallocated."; } -bool CoreWorker::IsShuttingDown() const { - return is_shutdown_.load(); -} +bool CoreWorker::IsShuttingDown() const { return is_shutdown_.load(); } void CoreWorker::ConnectToRayletInternal() { // Tell the raylet the port that we are listening on. diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 801e524a231a..813a16a78530 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -424,33 +424,30 @@ std::shared_ptr CoreWorkerProcessImpl::CreateCoreWorker( /*put_in_local_plasma_callback=*/ [this](const RayObject &object, const ObjectID &object_id) { auto core_worker = GetCoreWorker(); - + // Check if the core worker is shutting down before attempting plasma operations. // During shutdown, the plasma store connection may already be broken, so we // should avoid plasma operations entirely. if (core_worker->IsShuttingDown()) { - RAY_LOG(INFO) << "Skipping plasma store operation for error object " << object_id - << " because core worker is shutting down."; + RAY_LOG(INFO) << "Skipping plasma store operation for error object " + << object_id << " because core worker is shutting down."; return; } - - auto status = core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true); + + auto status = + core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true); if (!status.ok()) { - if (status.IsIOError() && - (status.message().find("Broken pipe") != std::string::npos || - status.message().find("Connection reset") != std::string::npos || - status.message().find("Bad file descriptor") != std::string::npos)) { - // This is likely a shutdown race where the plasma store - // connection was closed before we could complete the operation. - // Log as warning since this is expected during shutdown scenarios. - RAY_LOG(WARNING) << "Failed to put error object " << object_id - << " in plasma store due to connection error (likely shutdown): " - << status.ToString(); + // During shutdown, plasma store connections can be closed causing IOError. + // We only tolerate IOError during shutdown to avoid masking real errors. + if (status.IsIOError() && core_worker->IsShuttingDown()) { + // Double-check shutdown state - this handles the race where shutdown + // began after our first check but before the plasma operation. + RAY_LOG(WARNING) << "Failed to put error object " << object_id + << " in plasma store during shutdown: " << status.ToString(); } else { - // For other types of errors, maintain the original - // behavior with RAY_CHECK_OK to catch real issues. - RAY_CHECK_OK(status) << "Failed to put error object " << object_id - << " in plasma store: " << status.ToString(); + // For any other error, or IOError when not shutting down, this indicates + // a real problem that should crash the process. + RAY_CHECK_OK(status); } } }, diff --git a/src/ray/core_worker/tests/task_manager_test.cc b/src/ray/core_worker/tests/task_manager_test.cc index 3830d9821474..22cedb5a5e6c 100644 --- a/src/ray/core_worker/tests/task_manager_test.cc +++ b/src/ray/core_worker/tests/task_manager_test.cc @@ -2704,6 +2704,74 @@ TEST_F(TaskManagerTest, TestTaskRetriedOnNodePreemption) { // Cleanup manager_.FailPendingTask(spec.TaskId(), rpc::ErrorType::WORKER_DIED); } + +class PlasmaShutdownRaceTest : public ::testing::Test { + public: + PlasmaShutdownRaceTest() : is_shutting_down_(false) {} + + Status SimulatePlasmaCallback(const ObjectID &object_id, bool simulate_failure) { + if (is_shutting_down_) { + skipped_operations_.insert(object_id); + return Status::OK(); + } + + if (simulate_failure) { + auto status = Status::IOError("Broken pipe"); + if (status.IsIOError() && is_shutting_down_) { + tolerated_operations_.insert(object_id); + return Status::OK(); + } else { + failed_operations_.insert(object_id); + return status; + } + } + + successful_operations_.insert(object_id); + return Status::OK(); + } + + void SetShuttingDown(bool shutting_down) { is_shutting_down_ = shutting_down; } + + protected: + bool is_shutting_down_; + std::unordered_set skipped_operations_; + std::unordered_set tolerated_operations_; + std::unordered_set successful_operations_; + std::unordered_set failed_operations_; +}; + +// Test plasma callback behavior during shutdown to prevent RAY_CHECK crashes +TEST_F(PlasmaShutdownRaceTest, PlasmaCallbackHandlesShutdownRaceCondition) { + auto object_id = ObjectID::FromRandom(); + + SetShuttingDown(false); + ASSERT_TRUE(SimulatePlasmaCallback(object_id, false).ok()); + ASSERT_EQ(successful_operations_.count(object_id), 1); + + auto object_id2 = ObjectID::FromRandom(); + auto status = SimulatePlasmaCallback(object_id2, true); + ASSERT_FALSE(status.ok()); + ASSERT_TRUE(status.IsIOError()); + ASSERT_EQ(failed_operations_.count(object_id2), 1); + + auto object_id3 = ObjectID::FromRandom(); + SetShuttingDown(true); + ASSERT_TRUE(SimulatePlasmaCallback(object_id3, false).ok()); + ASSERT_EQ(skipped_operations_.count(object_id3), 1); + + auto object_id4 = ObjectID::FromRandom(); + SetShuttingDown(false); + auto status4 = Status::IOError("Broken pipe"); + SetShuttingDown(true); + + if (status4.IsIOError() && is_shutting_down_) { + tolerated_operations_.insert(object_id4); + } else { + failed_operations_.insert(object_id4); + } + ASSERT_EQ(tolerated_operations_.count(object_id4), 1); +} + } // namespace core } // namespace ray From a1c5f0463441a07cf5b4228dd140b4bd073c8f6b Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Mon, 11 Aug 2025 08:46:04 +0000 Subject: [PATCH 3/7] doc errors and improve checks Signed-off-by: Sagar Sumit --- src/ray/core_worker/core_worker.h | 13 +++++++++++++ src/ray/core_worker/core_worker_process.cc | 11 +---------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index d2b2f13e3e7f..b28f517f1fed 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1499,6 +1499,19 @@ class CoreWorker { std::string *application_error); /// Put an object in the local plasma store. + /// + /// Return status semantics: + /// - Status::OK(): The object was created (or already existed) and bookkeeping was + /// updated. Note: an internal ObjectExists from the plasma provider is treated + /// as OK and does not surface here. + /// - Status::ObjectStoreFull(): The local plasma store is out of memory (or out of + /// disk when spilling). The error message contains context and a short memory + /// report. + /// - Status::IOError(): IPC/connection failures while talking to the plasma store + /// (e.g., broken pipe/connection reset during shutdown, store not reachable). + /// + /// Call sites that run during shutdown may choose to tolerate IOError specifically, + /// but should treat all other statuses as real failures. Status PutInLocalPlasmaStore(const RayObject &object, const ObjectID &object_id, bool pin_object); diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 813a16a78530..740a6b5a0912 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -425,20 +425,11 @@ std::shared_ptr CoreWorkerProcessImpl::CreateCoreWorker( [this](const RayObject &object, const ObjectID &object_id) { auto core_worker = GetCoreWorker(); - // Check if the core worker is shutting down before attempting plasma operations. - // During shutdown, the plasma store connection may already be broken, so we - // should avoid plasma operations entirely. - if (core_worker->IsShuttingDown()) { - RAY_LOG(INFO) << "Skipping plasma store operation for error object " - << object_id << " because core worker is shutting down."; - return; - } - auto status = core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true); if (!status.ok()) { // During shutdown, plasma store connections can be closed causing IOError. - // We only tolerate IOError during shutdown to avoid masking real errors. + // Tolerate only IOError during shutdown to avoid masking real errors. if (status.IsIOError() && core_worker->IsShuttingDown()) { // Double-check shutdown state - this handles the race where shutdown // began after our first check but before the plasma operation. From c674e0d61873645de5d0966208e1e0ec550d9a35 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 14 Aug 2025 13:13:50 +0530 Subject: [PATCH 4/7] PutInLocalPlasma to return Status and errors no longer silently ignored Signed-off-by: Sagar Sumit --- src/ray/core_worker/core_worker_process.cc | 6 +- src/ray/core_worker/task_manager.cc | 98 +++++++++++++------ src/ray/core_worker/task_manager.h | 17 ++-- .../core_worker/tests/task_manager_test.cc | 1 + 4 files changed, 84 insertions(+), 38 deletions(-) diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 740a6b5a0912..a7da727bb813 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -435,12 +435,12 @@ std::shared_ptr CoreWorkerProcessImpl::CreateCoreWorker( // began after our first check but before the plasma operation. RAY_LOG(WARNING) << "Failed to put error object " << object_id << " in plasma store during shutdown: " << status.ToString(); + return Status::OK(); } else { - // For any other error, or IOError when not shutting down, this indicates - // a real problem that should crash the process. - RAY_CHECK_OK(status); + return status; } } + return Status::OK(); }, /* retry_task_callback= */ [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 0992f15ff2f9..1e8e23544e5d 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -533,10 +533,11 @@ size_t TaskManager::NumPendingTasks() const { return num_pending_tasks_; } -bool TaskManager::HandleTaskReturn(const ObjectID &object_id, - const rpc::ReturnObject &return_object, - const NodeID &worker_node_id, - bool store_in_plasma) { +Status TaskManager::HandleTaskReturn(const ObjectID &object_id, + const rpc::ReturnObject &return_object, + const NodeID &worker_node_id, + bool store_in_plasma, + bool *direct_return_out) { bool direct_return = false; reference_counter_.UpdateObjectSize(object_id, return_object.size()); RAY_LOG(DEBUG) << "Task return object " << object_id << " has size " @@ -579,7 +580,10 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id, /*copy_data=*/false, tensor_transport.value_or(rpc::TensorTransport::OBJECT_STORE)); if (store_in_plasma) { - put_in_local_plasma_callback_(object, object_id); + Status s = put_in_local_plasma_callback_(object, object_id); + if (!s.ok()) { + return s; + } } else { in_memory_store_.Put(object, object_id); direct_return = true; @@ -595,7 +599,10 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id, } reference_counter_.AddNestedObjectIds(object_id, nested_ids, owner_address); } - return direct_return; + if (direct_return_out != nullptr) { + *direct_return_out = direct_return; + } + return Status::OK(); } bool TaskManager::TryDelObjectRefStream(const ObjectID &generator_id) { @@ -813,10 +820,13 @@ bool TaskManager::HandleReportGeneratorItemReturns( } // When an object is reported, the object is ready to be fetched. reference_counter_.UpdateObjectPendingCreation(object_id, false); - HandleTaskReturn(object_id, - return_object, - NodeID::FromBinary(request.worker_addr().node_id()), - /*store_in_plasma=*/store_in_plasma_ids.contains(object_id)); + bool _direct_unused = false; + RAY_UNUSED( + HandleTaskReturn(object_id, + return_object, + NodeID::FromBinary(request.worker_addr().node_id()), + /*store_in_plasma=*/store_in_plasma_ids.contains(object_id), + &_direct_unused)); } // Handle backpressure if needed. @@ -900,23 +910,41 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, reference_counter_.AddDynamicReturn(object_id, generator_id); dynamic_return_ids.push_back(object_id); } - if (!HandleTaskReturn(object_id, - return_object, - NodeID::FromBinary(worker_addr.node_id()), - store_in_plasma_ids.contains(object_id))) { - if (first_execution) { - dynamic_returns_in_plasma.push_back(object_id); - } + bool direct = false; + Status s = HandleTaskReturn(object_id, + return_object, + NodeID::FromBinary(worker_addr.node_id()), + store_in_plasma_ids.contains(object_id), + &direct); + if (!s.ok()) { + RAY_LOG(WARNING).WithField(object_id) + << "Failed to handle dynamic task return: " << s; + } else if (!direct && first_execution) { + dynamic_returns_in_plasma.push_back(object_id); } } } for (const auto &return_object : reply.return_objects()) { const auto object_id = ObjectID::FromBinary(return_object.object_id()); - if (HandleTaskReturn(object_id, - return_object, - NodeID::FromBinary(worker_addr.node_id()), - store_in_plasma_ids.contains(object_id))) { + bool direct = false; + Status s = HandleTaskReturn(object_id, + return_object, + NodeID::FromBinary(worker_addr.node_id()), + store_in_plasma_ids.contains(object_id), + &direct); + if (!s.ok()) { + RAY_LOG(WARNING).WithField(object_id) << "Failed to handle task return: " << s; + // If storing return in plasma failed, treat as system failure for this attempt. + // Do not proceed with normal completion. Mark task failed immediately. + FailOrRetryPendingTask(task_id, + rpc::ErrorType::WORKER_DIED, + &s, + /*ray_error_info=*/nullptr, + /*mark_task_object_failed=*/true, + /*fail_immediately=*/true); + return; + } else if (direct) { direct_return_ids.push_back(object_id); } } @@ -1040,10 +1068,12 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, const auto generator_return_id = spec.StreamingGeneratorReturnId(i); RAY_CHECK_EQ(reply.return_objects_size(), 1); const auto &return_object = reply.return_objects(0); - HandleTaskReturn(generator_return_id, - return_object, - NodeID::FromBinary(worker_addr.node_id()), - store_in_plasma_ids.contains(generator_return_id)); + bool _direct_unused = false; + RAY_UNUSED(HandleTaskReturn(generator_return_id, + return_object, + NodeID::FromBinary(worker_addr.node_id()), + store_in_plasma_ids.contains(generator_return_id), + &_direct_unused)); } } } @@ -1455,7 +1485,11 @@ void TaskManager::MarkTaskReturnObjectsFailed( for (int i = 0; i < num_returns; i++) { const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1); if (store_in_plasma_ids.contains(object_id)) { - put_in_local_plasma_callback_(error, object_id); + Status s = put_in_local_plasma_callback_(error, object_id); + if (!s.ok()) { + RAY_LOG(WARNING).WithField(object_id) + << "Failed to put error object in plasma: " << s; + } } else { in_memory_store_.Put(error, object_id); } @@ -1463,7 +1497,11 @@ void TaskManager::MarkTaskReturnObjectsFailed( if (spec.ReturnsDynamic()) { for (const auto &dynamic_return_id : spec.DynamicReturnIds()) { if (store_in_plasma_ids.contains(dynamic_return_id)) { - put_in_local_plasma_callback_(error, dynamic_return_id); + Status s = put_in_local_plasma_callback_(error, dynamic_return_id); + if (!s.ok()) { + RAY_LOG(WARNING).WithField(dynamic_return_id) + << "Failed to put error object in plasma: " << s; + } } else { in_memory_store_.Put(error, dynamic_return_id); } @@ -1488,7 +1526,11 @@ void TaskManager::MarkTaskReturnObjectsFailed( for (size_t i = 0; i < num_streaming_generator_returns; i++) { const auto generator_return_id = spec.StreamingGeneratorReturnId(i); if (store_in_plasma_ids.contains(generator_return_id)) { - put_in_local_plasma_callback_(error, generator_return_id); + Status s = put_in_local_plasma_callback_(error, generator_return_id); + if (!s.ok()) { + RAY_LOG(WARNING).WithField(generator_return_id) + << "Failed to put error object in plasma: " << s; + } } else { in_memory_store_.Put(error, generator_return_id); } diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 25f5c90717c9..f206f4de857a 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -25,6 +26,7 @@ #include "absl/container/flat_hash_map.h" #include "absl/synchronization/mutex.h" #include "ray/common/id.h" +#include "ray/common/status.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" #include "ray/core_worker/task_event_buffer.h" #include "ray/core_worker/task_manager_interface.h" @@ -42,7 +44,7 @@ class ActorManager; using TaskStatusCounter = CounterMap>; using PutInLocalPlasmaCallback = - std::function; + std::function; using RetryTaskCallback = std::function; using ReconstructObjectCallback = std::function; @@ -608,12 +610,13 @@ class TaskManager : public TaskManagerInterface { ABSL_LOCKS_EXCLUDED(mu_); /// Update nested ref count info and store the in-memory value for a task's - /// return object. Returns true if the task's return object was returned - /// directly by value. - bool HandleTaskReturn(const ObjectID &object_id, - const rpc::ReturnObject &return_object, - const NodeID &worker_node_id, - bool store_in_plasma) ABSL_LOCKS_EXCLUDED(mu_); + /// return object. On success, sets direct_return_out to true if the object's value + /// was returned directly by value (not stored in plasma). + Status HandleTaskReturn(const ObjectID &object_id, + const rpc::ReturnObject &return_object, + const NodeID &worker_node_id, + bool store_in_plasma, + bool *direct_return_out) ABSL_LOCKS_EXCLUDED(mu_); /// Remove a lineage reference to this object ID. This should be called /// whenever a task that depended on this object ID can no longer be retried. diff --git a/src/ray/core_worker/tests/task_manager_test.cc b/src/ray/core_worker/tests/task_manager_test.cc index 22cedb5a5e6c..3b91fa19cc20 100644 --- a/src/ray/core_worker/tests/task_manager_test.cc +++ b/src/ray/core_worker/tests/task_manager_test.cc @@ -158,6 +158,7 @@ class TaskManagerTest : public ::testing::Test { *reference_counter_, [this](const RayObject &object, const ObjectID &object_id) { stored_in_plasma.insert(object_id); + return Status::OK(); }, [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { num_retries_++; From 136c43c5d6ac6a1ed7ccbbe15972b8e438d62c96 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Mon, 18 Aug 2025 15:59:09 +0000 Subject: [PATCH 5/7] propagate status, update tests and minor refactor Signed-off-by: Sagar Sumit --- src/ray/core_worker/core_worker_process.cc | 15 +- src/ray/core_worker/task_manager.cc | 99 ++++++---- src/ray/core_worker/task_manager.h | 9 +- .../core_worker/tests/task_manager_test.cc | 185 ++++++++++++++++++ 4 files changed, 249 insertions(+), 59 deletions(-) diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index a7da727bb813..951489617a0d 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -424,21 +424,12 @@ std::shared_ptr CoreWorkerProcessImpl::CreateCoreWorker( /*put_in_local_plasma_callback=*/ [this](const RayObject &object, const ObjectID &object_id) { auto core_worker = GetCoreWorker(); - auto status = core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true); if (!status.ok()) { - // During shutdown, plasma store connections can be closed causing IOError. - // Tolerate only IOError during shutdown to avoid masking real errors. - if (status.IsIOError() && core_worker->IsShuttingDown()) { - // Double-check shutdown state - this handles the race where shutdown - // began after our first check but before the plasma operation. - RAY_LOG(WARNING) << "Failed to put error object " << object_id - << " in plasma store during shutdown: " << status.ToString(); - return Status::OK(); - } else { - return status; - } + RAY_LOG(WARNING).WithField(object_id) + << "Failed to put object in plasma store: " << status; + return status; } return Status::OK(); }, diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 1e8e23544e5d..f20347773ef9 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -533,11 +533,10 @@ size_t TaskManager::NumPendingTasks() const { return num_pending_tasks_; } -Status TaskManager::HandleTaskReturn(const ObjectID &object_id, - const rpc::ReturnObject &return_object, - const NodeID &worker_node_id, - bool store_in_plasma, - bool *direct_return_out) { +StatusOr TaskManager::HandleTaskReturn(const ObjectID &object_id, + const rpc::ReturnObject &return_object, + const NodeID &worker_node_id, + bool store_in_plasma) { bool direct_return = false; reference_counter_.UpdateObjectSize(object_id, return_object.size()); RAY_LOG(DEBUG) << "Task return object " << object_id << " has size " @@ -581,6 +580,11 @@ Status TaskManager::HandleTaskReturn(const ObjectID &object_id, tensor_transport.value_or(rpc::TensorTransport::OBJECT_STORE)); if (store_in_plasma) { Status s = put_in_local_plasma_callback_(object, object_id); + int retry_count = 0; + while (!s.ok() && s.IsTransientObjectStoreFull() && retry_count < 3) { + retry_count++; + s = put_in_local_plasma_callback_(object, object_id); + } if (!s.ok()) { return s; } @@ -599,10 +603,7 @@ Status TaskManager::HandleTaskReturn(const ObjectID &object_id, } reference_counter_.AddNestedObjectIds(object_id, nested_ids, owner_address); } - if (direct_return_out != nullptr) { - *direct_return_out = direct_return; - } - return Status::OK(); + return direct_return; } bool TaskManager::TryDelObjectRefStream(const ObjectID &generator_id) { @@ -820,13 +821,15 @@ bool TaskManager::HandleReportGeneratorItemReturns( } // When an object is reported, the object is ready to be fetched. reference_counter_.UpdateObjectPendingCreation(object_id, false); - bool _direct_unused = false; - RAY_UNUSED( + StatusOr put_res = HandleTaskReturn(object_id, return_object, NodeID::FromBinary(request.worker_addr().node_id()), - /*store_in_plasma=*/store_in_plasma_ids.contains(object_id), - &_direct_unused)); + /*store_in_plasma=*/store_in_plasma_ids.contains(object_id)); + if (!put_res.ok()) { + RAY_LOG(WARNING).WithField(object_id) + << "Failed to handle streaming dynamic return: " << put_res.status(); + } } // Handle backpressure if needed. @@ -910,16 +913,24 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, reference_counter_.AddDynamicReturn(object_id, generator_id); dynamic_return_ids.push_back(object_id); } - bool direct = false; - Status s = HandleTaskReturn(object_id, - return_object, - NodeID::FromBinary(worker_addr.node_id()), - store_in_plasma_ids.contains(object_id), - &direct); - if (!s.ok()) { + StatusOr direct_or = + HandleTaskReturn(object_id, + return_object, + NodeID::FromBinary(worker_addr.node_id()), + store_in_plasma_ids.contains(object_id)); + if (!direct_or.ok()) { RAY_LOG(WARNING).WithField(object_id) - << "Failed to handle dynamic task return: " << s; - } else if (!direct && first_execution) { + << "Failed to handle dynamic task return: " << direct_or.status(); + // Treat as system failure for this attempt and fail immediately to avoid hangs. + Status st = direct_or.status(); + FailOrRetryPendingTask(task_id, + rpc::ErrorType::WORKER_DIED, + &st, + /*ray_error_info=*/nullptr, + /*mark_task_object_failed=*/true, + /*fail_immediately=*/true); + return; + } else if (!direct_or.value() && first_execution) { dynamic_returns_in_plasma.push_back(object_id); } } @@ -927,24 +938,24 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, for (const auto &return_object : reply.return_objects()) { const auto object_id = ObjectID::FromBinary(return_object.object_id()); - bool direct = false; - Status s = HandleTaskReturn(object_id, - return_object, - NodeID::FromBinary(worker_addr.node_id()), - store_in_plasma_ids.contains(object_id), - &direct); - if (!s.ok()) { - RAY_LOG(WARNING).WithField(object_id) << "Failed to handle task return: " << s; + StatusOr direct_or = HandleTaskReturn(object_id, + return_object, + NodeID::FromBinary(worker_addr.node_id()), + store_in_plasma_ids.contains(object_id)); + if (!direct_or.ok()) { + RAY_LOG(WARNING).WithField(object_id) + << "Failed to handle task return: " << direct_or.status(); // If storing return in plasma failed, treat as system failure for this attempt. // Do not proceed with normal completion. Mark task failed immediately. + Status st = direct_or.status(); FailOrRetryPendingTask(task_id, rpc::ErrorType::WORKER_DIED, - &s, + &st, /*ray_error_info=*/nullptr, /*mark_task_object_failed=*/true, /*fail_immediately=*/true); return; - } else if (direct) { + } else if (direct_or.value()) { direct_return_ids.push_back(object_id); } } @@ -1068,12 +1079,16 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, const auto generator_return_id = spec.StreamingGeneratorReturnId(i); RAY_CHECK_EQ(reply.return_objects_size(), 1); const auto &return_object = reply.return_objects(0); - bool _direct_unused = false; - RAY_UNUSED(HandleTaskReturn(generator_return_id, - return_object, - NodeID::FromBinary(worker_addr.node_id()), - store_in_plasma_ids.contains(generator_return_id), - &_direct_unused)); + StatusOr res = + HandleTaskReturn(generator_return_id, + return_object, + NodeID::FromBinary(worker_addr.node_id()), + store_in_plasma_ids.contains(generator_return_id)); + if (!res.ok()) { + RAY_LOG(WARNING).WithField(generator_return_id) + << "Failed to handle generator return during app error propagation: " + << res.status(); + } } } } @@ -1484,26 +1499,26 @@ void TaskManager::MarkTaskReturnObjectsFailed( int64_t num_returns = spec.NumReturns(); for (int i = 0; i < num_returns; i++) { const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1); + // Always place an error marker in local memory to unblock waiters quickly. + in_memory_store_.Put(error, object_id); + // Best-effort plasma put if the object was meant to be in plasma. if (store_in_plasma_ids.contains(object_id)) { Status s = put_in_local_plasma_callback_(error, object_id); if (!s.ok()) { RAY_LOG(WARNING).WithField(object_id) << "Failed to put error object in plasma: " << s; } - } else { - in_memory_store_.Put(error, object_id); } } if (spec.ReturnsDynamic()) { for (const auto &dynamic_return_id : spec.DynamicReturnIds()) { + in_memory_store_.Put(error, dynamic_return_id); if (store_in_plasma_ids.contains(dynamic_return_id)) { Status s = put_in_local_plasma_callback_(error, dynamic_return_id); if (!s.ok()) { RAY_LOG(WARNING).WithField(dynamic_return_id) << "Failed to put error object in plasma: " << s; } - } else { - in_memory_store_.Put(error, dynamic_return_id); } } } diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index f206f4de857a..cde2d50fd856 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -612,11 +612,10 @@ class TaskManager : public TaskManagerInterface { /// Update nested ref count info and store the in-memory value for a task's /// return object. On success, sets direct_return_out to true if the object's value /// was returned directly by value (not stored in plasma). - Status HandleTaskReturn(const ObjectID &object_id, - const rpc::ReturnObject &return_object, - const NodeID &worker_node_id, - bool store_in_plasma, - bool *direct_return_out) ABSL_LOCKS_EXCLUDED(mu_); + StatusOr HandleTaskReturn(const ObjectID &object_id, + const rpc::ReturnObject &return_object, + const NodeID &worker_node_id, + bool store_in_plasma) ABSL_LOCKS_EXCLUDED(mu_); /// Remove a lineage reference to this object ID. This should be called /// whenever a task that depended on this object ID can no longer be retried. diff --git a/src/ray/core_worker/tests/task_manager_test.cc b/src/ray/core_worker/tests/task_manager_test.cc index 3b91fa19cc20..a3ebd6ce6550 100644 --- a/src/ray/core_worker/tests/task_manager_test.cc +++ b/src/ray/core_worker/tests/task_manager_test.cc @@ -1359,6 +1359,191 @@ TEST_F(TaskManagerLineageTest, TestResubmittedDynamicReturnsTaskFails) { ASSERT_EQ(stored_in_plasma.size(), 3); } +// High-level tests around plasma put failures and retries using a real memory store +TEST_F(TaskManagerTest, PlasmaPut_ObjectStoreFull_FailsTaskAndWritesError) { + auto local_ref_counter = std::make_shared( + addr_, + publisher_.get(), + subscriber_.get(), + /*is_node_dead=*/[this](const NodeID &) { return node_died_; }, + lineage_pinning_enabled_); + auto local_store = std::make_shared(io_context_.GetIoService(), + local_ref_counter.get()); + + TaskManager failing_mgr( + *local_store, + *local_ref_counter, + /*put_in_local_plasma_callback=*/ + [](const RayObject &, const ObjectID &) { + return Status::ObjectStoreFull("simulated"); + }, + [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { + num_retries_++; + last_delay_ms_ = delay_ms; + last_object_recovery_ = object_recovery; + return Status::OK(); + }, + [this](const TaskSpecification &spec) { + return this->did_queue_generator_resubmit_; + }, + [](const JobID &, const std::string &, const std::string &, double) { + return Status::OK(); + }, + /*max_lineage_bytes*/ 1024 * 1024, + *task_event_buffer_mock_.get(), + [](const ActorID &) -> std::shared_ptr { + return nullptr; + }, + mock_gcs_client_); + + rpc::Address caller_address; + auto spec = CreateTaskHelper(1, {}); + failing_mgr.AddPendingTask(caller_address, spec, ""); + failing_mgr.MarkDependenciesResolved(spec.TaskId()); + failing_mgr.MarkTaskWaitingForExecution( + spec.TaskId(), NodeID::FromRandom(), WorkerID::FromRandom()); + + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + auto return_id = spec.ReturnId(0); + return_object->set_object_id(return_id.Binary()); + return_object->set_in_plasma(true); + failing_mgr.CompletePendingTask( + spec.TaskId(), reply, rpc::Address(), /*app_err=*/false); + + ASSERT_FALSE(failing_mgr.IsTaskPending(spec.TaskId())); + std::vector> results; + WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); + RAY_CHECK_OK(local_store->Get({return_id}, 1, 0, ctx, false, &results)); + ASSERT_EQ(results.size(), 1); + ASSERT_TRUE(results[0]->IsException()); +} + +TEST_F(TaskManagerTest, PlasmaPut_TransientFull_RetriesThenSucceeds) { + std::shared_ptr> attempts = std::make_shared>(0); + auto local_ref_counter = std::make_shared( + addr_, + publisher_.get(), + subscriber_.get(), + /*is_node_dead=*/[this](const NodeID &) { return node_died_; }, + lineage_pinning_enabled_); + auto local_store = std::make_shared(io_context_.GetIoService(), + local_ref_counter.get()); + TaskManager retry_mgr( + *local_store, + *local_ref_counter, + /*put_in_local_plasma_callback=*/ + [attempts](const RayObject &, const ObjectID &) { + int n = ++(*attempts); + if (n < 3) { + return Status::TransientObjectStoreFull("retry"); + } + return Status::OK(); + }, + [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { + num_retries_++; + last_delay_ms_ = delay_ms; + last_object_recovery_ = object_recovery; + return Status::OK(); + }, + [this](const TaskSpecification &spec) { + return this->did_queue_generator_resubmit_; + }, + [](const JobID &, const std::string &, const std::string &, double) { + return Status::OK(); + }, + /*max_lineage_bytes*/ 1024 * 1024, + *task_event_buffer_mock_.get(), + [](const ActorID &) -> std::shared_ptr { + return nullptr; + }, + mock_gcs_client_); + + rpc::Address caller_address; + auto spec = CreateTaskHelper(1, {}); + retry_mgr.AddPendingTask(caller_address, spec, ""); + retry_mgr.MarkDependenciesResolved(spec.TaskId()); + retry_mgr.MarkTaskWaitingForExecution( + spec.TaskId(), NodeID::FromRandom(), WorkerID::FromRandom()); + + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + auto return_id = spec.ReturnId(0); + return_object->set_object_id(return_id.Binary()); + return_object->set_in_plasma(true); + retry_mgr.CompletePendingTask(spec.TaskId(), reply, rpc::Address(), /*app_err=*/false); + + std::vector> results; + WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); + RAY_CHECK_OK(local_store->Get({return_id}, 1, 0, ctx, false, &results)); + ASSERT_EQ(results.size(), 1); + ASSERT_TRUE(results[0]->IsInPlasmaError()); +} + +TEST_F(TaskManagerTest, DynamicReturn_PlasmaPutFailure_FailsTaskImmediately) { + bool first_fail_done = false; + auto local_ref_counter = std::make_shared( + addr_, + publisher_.get(), + subscriber_.get(), + /*is_node_dead=*/[this](const NodeID &) { return node_died_; }, + lineage_pinning_enabled_); + auto local_store = std::make_shared(io_context_.GetIoService(), + local_ref_counter.get()); + TaskManager dyn_mgr( + *local_store, + *local_ref_counter, + /*put_in_local_plasma_callback=*/ + [&first_fail_done](const RayObject &, const ObjectID &) { + if (!first_fail_done) { + first_fail_done = true; + return Status::IOError("broken pipe"); + } + return Status::OK(); + }, + [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { + num_retries_++; + last_delay_ms_ = delay_ms; + last_object_recovery_ = object_recovery; + return Status::OK(); + }, + [this](const TaskSpecification &spec) { + return this->did_queue_generator_resubmit_; + }, + [](const JobID &, const std::string &, const std::string &, double) { + return Status::OK(); + }, + /*max_lineage_bytes*/ 1024 * 1024, + *task_event_buffer_mock_.get(), + [](const ActorID &) -> std::shared_ptr { + return nullptr; + }, + mock_gcs_client_); + + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + dyn_mgr.AddPendingTask(addr_, spec, "", /*num_retries=*/0); + dyn_mgr.MarkDependenciesResolved(spec.TaskId()); + dyn_mgr.MarkTaskWaitingForExecution( + spec.TaskId(), NodeID::FromRandom(), WorkerID::FromRandom()); + + rpc::PushTaskReply reply; + auto generator_id = spec.ReturnId(0); + auto gen_obj = reply.add_return_objects(); + gen_obj->set_object_id(generator_id.Binary()); + auto data = GenerateRandomBuffer(); + gen_obj->set_data(data->Data(), data->Size()); + for (int i = 0; i < 2; i++) { + auto dyn_id = ObjectID::FromIndex(spec.TaskId(), i + 2); + auto dyn_obj = reply.add_dynamic_return_objects(); + dyn_obj->set_object_id(dyn_id.Binary()); + dyn_obj->set_data(data->Data(), data->Size()); + dyn_obj->set_in_plasma(true); + } + + dyn_mgr.CompletePendingTask(spec.TaskId(), reply, rpc::Address(), /*app_err=*/false); + ASSERT_FALSE(dyn_mgr.IsTaskPending(spec.TaskId())); +} + TEST_F(TaskManagerTest, TestObjectRefStreamCreateDelete) { /** * Test create and deletion of stream works. From ecc128da9f228e9af2f8b5e504a4541ee82321d5 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 21 Aug 2025 06:51:52 +0000 Subject: [PATCH 6/7] remove unused method and Map the status to a more specific error Signed-off-by: Sagar Sumit --- src/ray/core_worker/core_worker.cc | 2 -- src/ray/core_worker/core_worker.h | 6 ------ src/ray/core_worker/task_manager.cc | 11 ++++++++--- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index eae24f0a7e26..8efe6a7878bf 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -532,8 +532,6 @@ void CoreWorker::Shutdown() { /*force_shutdown=*/false, ShutdownReason::kGracefulExit, "ray.shutdown() called"); } -bool CoreWorker::IsShuttingDown() const { return is_shutdown_.load(); } - void CoreWorker::ConnectToRayletInternal() { // Tell the raylet the port that we are listening on. // NOTE: This also marks the worker as available in Raylet. We do this at the diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 68956dc224bc..cffda26e8c85 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -247,12 +247,6 @@ class CoreWorker { /// void Shutdown(); - /// Check if the core worker is currently shutting down. - /// This can be used to avoid operations that might fail during shutdown. - /// - /// \return true if shutdown has been initiated, false otherwise. - bool IsShuttingDown() const; - /// Start receiving and executing tasks. void RunTaskExecutionLoop(); diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index c908c2bad2c3..75f84cd21ad4 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -921,12 +921,17 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, if (!direct_or.ok()) { RAY_LOG(WARNING).WithField(object_id) << "Failed to handle dynamic task return: " << direct_or.status(); - // Treat as system failure for this attempt and fail immediately to avoid hangs. Status st = direct_or.status(); + rpc::ErrorType err_type = rpc::ErrorType::WORKER_DIED; + if (st.IsObjectStoreFull() || st.IsTransientObjectStoreFull()) { + err_type = rpc::ErrorType::OUT_OF_MEMORY; + } + rpc::RayErrorInfo err_info; + err_info.set_error_message(st.ToString()); FailOrRetryPendingTask(task_id, - rpc::ErrorType::WORKER_DIED, + err_type, &st, - /*ray_error_info=*/nullptr, + /*ray_error_info=*/&err_info, /*mark_task_object_failed=*/true, /*fail_immediately=*/true); return; From 02410782d02b75c5c293f16a0fb31f808a706edc Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 21 Aug 2025 17:31:19 +0530 Subject: [PATCH 7/7] rename shadow local vr Signed-off-by: Sagar Sumit --- src/ray/core_worker/core_worker_process.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 025f1b927576..783a184ca09f 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -423,12 +423,12 @@ std::shared_ptr CoreWorkerProcessImpl::CreateCoreWorker( /*put_in_local_plasma_callback=*/ [this](const RayObject &object, const ObjectID &object_id) { auto core_worker = GetCoreWorker(); - auto status = + auto put_status = core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true); - if (!status.ok()) { + if (!put_status.ok()) { RAY_LOG(WARNING).WithField(object_id) - << "Failed to put object in plasma store: " << status; - return status; + << "Failed to put object in plasma store: " << put_status; + return put_status; } return Status::OK(); },