- 
                Notifications
    You must be signed in to change notification settings 
- Fork 6.9k
[core] Fix RAY_CHECK failure during shutdown due to plasma store race condition #55367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9785231
              cc9921f
              a1c5f04
              c674e0d
              136c43c
              b0c6b1b
              c561af8
              ecc128d
              0241078
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -1494,6 +1494,19 @@ class CoreWorker { | |
| std::string *application_error); | ||
|  | ||
| /// Put an object in the local plasma store. | ||
| /// | ||
| /// Return status semantics: | ||
| /// - Status::OK(): The object was created (or already existed) and bookkeeping was | ||
| /// updated. Note: an internal ObjectExists from the plasma provider is treated | ||
| /// as OK and does not surface here. | ||
| /// - Status::ObjectStoreFull(): The local plasma store is out of memory (or out of | ||
| /// disk when spilling). The error message contains context and a short memory | ||
| /// report. | ||
| /// - Status::IOError(): IPC/connection failures while talking to the plasma store | ||
| /// (e.g., broken pipe/connection reset during shutdown, store not reachable). | ||
| 
      Comment on lines
    
      +1498
     to 
      +1506
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! | ||
| /// | ||
| /// Call sites that run during shutdown may choose to tolerate IOError specifically, | ||
| /// but should treat all other statuses as real failures. | ||
| Status PutInLocalPlasmaStore(const RayObject &object, | ||
| const ObjectID &object_id, | ||
| bool pin_object); | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -533,10 +533,10 @@ size_t TaskManager::NumPendingTasks() const { | |
| return num_pending_tasks_; | ||
| } | ||
|  | ||
| bool TaskManager::HandleTaskReturn(const ObjectID &object_id, | ||
| const rpc::ReturnObject &return_object, | ||
| const NodeID &worker_node_id, | ||
| bool store_in_plasma) { | ||
| StatusOr<bool> TaskManager::HandleTaskReturn(const ObjectID &object_id, | ||
| const rpc::ReturnObject &return_object, | ||
| const NodeID &worker_node_id, | ||
| bool store_in_plasma) { | ||
| bool direct_return = false; | ||
| reference_counter_.UpdateObjectSize(object_id, return_object.size()); | ||
| RAY_LOG(DEBUG) << "Task return object " << object_id << " has size " | ||
|  | @@ -579,7 +579,15 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id, | |
| /*copy_data=*/false, | ||
| tensor_transport.value_or(rpc::TensorTransport::OBJECT_STORE)); | ||
| if (store_in_plasma) { | ||
| put_in_local_plasma_callback_(object, object_id); | ||
| Status s = put_in_local_plasma_callback_(object, object_id); | ||
| int retry_count = 0; | ||
| while (!s.ok() && s.IsTransientObjectStoreFull() && retry_count < 3) { | ||
| retry_count++; | ||
| s = put_in_local_plasma_callback_(object, object_id); | ||
| } | ||
| if (!s.ok()) { | ||
| return s; | ||
| } | ||
| } else { | ||
| in_memory_store_.Put(object, object_id); | ||
| direct_return = true; | ||
|  | @@ -813,10 +821,15 @@ bool TaskManager::HandleReportGeneratorItemReturns( | |
| } | ||
| // When an object is reported, the object is ready to be fetched. | ||
| reference_counter_.UpdateObjectPendingCreation(object_id, false); | ||
| HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(request.worker_addr().node_id()), | ||
| /*store_in_plasma=*/store_in_plasma_ids.contains(object_id)); | ||
| StatusOr<bool> put_res = | ||
| HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(request.worker_addr().node_id()), | ||
| /*store_in_plasma=*/store_in_plasma_ids.contains(object_id)); | ||
| if (!put_res.ok()) { | ||
| RAY_LOG(WARNING).WithField(object_id) | ||
| << "Failed to handle streaming dynamic return: " << put_res.status(); | ||
| } | ||
| } | ||
|  | ||
| // Handle backpressure if needed. | ||
|  | @@ -900,23 +913,54 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, | |
| reference_counter_.AddDynamicReturn(object_id, generator_id); | ||
| dynamic_return_ids.push_back(object_id); | ||
| } | ||
| if (!HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(object_id))) { | ||
| if (first_execution) { | ||
| dynamic_returns_in_plasma.push_back(object_id); | ||
| StatusOr<bool> direct_or = | ||
| HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(object_id)); | ||
| if (!direct_or.ok()) { | ||
| RAY_LOG(WARNING).WithField(object_id) | ||
| << "Failed to handle dynamic task return: " << direct_or.status(); | ||
| Status st = direct_or.status(); | ||
| rpc::ErrorType err_type = rpc::ErrorType::WORKER_DIED; | ||
| if (st.IsObjectStoreFull() || st.IsTransientObjectStoreFull()) { | ||
| err_type = rpc::ErrorType::OUT_OF_MEMORY; | ||
| } | ||
| rpc::RayErrorInfo err_info; | ||
| err_info.set_error_message(st.ToString()); | ||
| FailOrRetryPendingTask(task_id, | ||
| err_type, | ||
| &st, | ||
| /*ray_error_info=*/&err_info, | ||
| /*mark_task_object_failed=*/true, | ||
| /*fail_immediately=*/true); | ||
| return; | ||
| } else if (!direct_or.value() && first_execution) { | ||
| dynamic_returns_in_plasma.push_back(object_id); | ||
| } | ||
| } | ||
| } | ||
|  | ||
| for (const auto &return_object : reply.return_objects()) { | ||
| const auto object_id = ObjectID::FromBinary(return_object.object_id()); | ||
| if (HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(object_id))) { | ||
| StatusOr<bool> direct_or = HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(object_id)); | ||
| if (!direct_or.ok()) { | ||
| RAY_LOG(WARNING).WithField(object_id) | ||
| << "Failed to handle task return: " << direct_or.status(); | ||
| // If storing return in plasma failed, treat as system failure for this attempt. | ||
| // Do not proceed with normal completion. Mark task failed immediately. | ||
| Status st = direct_or.status(); | ||
| FailOrRetryPendingTask(task_id, | ||
| rpc::ErrorType::WORKER_DIED, | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WORKER_DIED isn't the right error type here, worker died is normally referring to the executor worker dying, but here something wrong is happening on the owner, the error type should be different There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. We now consistently map Status → ErrorType via MapStatusToErrorType (IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY, OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED). | ||
| &st, | ||
| /*ray_error_info=*/nullptr, | ||
| /*mark_task_object_failed=*/true, | ||
| /*fail_immediately=*/true); | ||
| return; | ||
| } else if (direct_or.value()) { | ||
| direct_return_ids.push_back(object_id); | ||
| } | ||
| } | ||
|  | @@ -1040,10 +1084,16 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, | |
| const auto generator_return_id = spec.StreamingGeneratorReturnId(i); | ||
| RAY_CHECK_EQ(reply.return_objects_size(), 1); | ||
| const auto &return_object = reply.return_objects(0); | ||
| HandleTaskReturn(generator_return_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(generator_return_id)); | ||
| StatusOr<bool> res = | ||
| HandleTaskReturn(generator_return_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(generator_return_id)); | ||
| if (!res.ok()) { | ||
| RAY_LOG(WARNING).WithField(generator_return_id) | ||
| << "Failed to handle generator return during app error propagation: " | ||
| << res.status(); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a warning log and forget, if this is possible this seems pretty bad, a cluster could hang waiting on this return value or am i missing that something downstream is handling this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe it makes sense for HandleTaskReturn to just handle all error types instead of just ObjectStoreFull and not propagate up so we don't have to handle at every upstream location There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed. No log-and-forget remains: on any HandleTaskReturn error we immediately FailOrRetryPendingTask with the mapped error and stop, so waiters are unblocked. I recommend keeping HandleTaskReturn side‑effect free (it already returns StatusOr) and centralizing failure policy in TaskManager callsites. To reduce duplication, I added a small helper (MapStatusToErrorType) and use it in the few places we call FailOrRetryPendingTask. | ||
| } | ||
| } | ||
| } | ||
| } | ||
|  | @@ -1454,18 +1504,26 @@ void TaskManager::MarkTaskReturnObjectsFailed( | |
| int64_t num_returns = spec.NumReturns(); | ||
| for (int i = 0; i < num_returns; i++) { | ||
| const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1); | ||
| // Always place an error marker in local memory to unblock waiters quickly. | ||
| in_memory_store_.Put(error, object_id); | ||
| // Best-effort plasma put if the object was meant to be in plasma. | ||
| if (store_in_plasma_ids.contains(object_id)) { | ||
| put_in_local_plasma_callback_(error, object_id); | ||
| } else { | ||
| in_memory_store_.Put(error, object_id); | ||
| Status s = put_in_local_plasma_callback_(error, object_id); | ||
| if (!s.ok()) { | ||
| RAY_LOG(WARNING).WithField(object_id) | ||
| << "Failed to put error object in plasma: " << s; | ||
| } | ||
| } | ||
| } | ||
| if (spec.ReturnsDynamic()) { | ||
| for (const auto &dynamic_return_id : spec.DynamicReturnIds()) { | ||
| in_memory_store_.Put(error, dynamic_return_id); | ||
| if (store_in_plasma_ids.contains(dynamic_return_id)) { | ||
| put_in_local_plasma_callback_(error, dynamic_return_id); | ||
| } else { | ||
| in_memory_store_.Put(error, dynamic_return_id); | ||
| Status s = put_in_local_plasma_callback_(error, dynamic_return_id); | ||
| if (!s.ok()) { | ||
| RAY_LOG(WARNING).WithField(dynamic_return_id) | ||
| << "Failed to put error object in plasma: " << s; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|  | @@ -1488,7 +1546,11 @@ void TaskManager::MarkTaskReturnObjectsFailed( | |
| for (size_t i = 0; i < num_streaming_generator_returns; i++) { | ||
| const auto generator_return_id = spec.StreamingGeneratorReturnId(i); | ||
| if (store_in_plasma_ids.contains(generator_return_id)) { | ||
| put_in_local_plasma_callback_(error, generator_return_id); | ||
| Status s = put_in_local_plasma_callback_(error, generator_return_id); | ||
| if (!s.ok()) { | ||
| RAY_LOG(WARNING).WithField(generator_return_id) | ||
| << "Failed to put error object in plasma: " << s; | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a log and forget with no memory store put, could lead to a hanging cluster There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. We always write the error to the in-memory store first (unblocks), then best-effort plasma put with warning on failure. | ||
| } | ||
| } else { | ||
| in_memory_store_.Put(error, generator_return_id); | ||
| } | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is standard behavior here if we're out of spill (do we just crash everywhere)... how do we recover because this could result in all kinds of problems. Also this is guaranteed to try to spill right, because the usage is always that we're storing a primary copy through this code path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spilling is attempted automatically on puts through this path:
Puts use CreateAndSpillIfNeeded:
ray/src/ray/core_worker/store_provider/plasma_store_provider.cc
Lines 146 to 163 in e0d8e6f
plasma-side queue returns transient/full while it triggers GC/spill, then falls back to disk, and only after grace/fallback fails does it surface OutOfDisk:
ray/src/ray/object_manager/plasma/create_request_queue.cc
Lines 117 to 131 in e0d8e6f