Skip to content
13 changes: 13 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,19 @@ class CoreWorker {
std::string *application_error);

/// Put an object in the local plasma store.
///
/// Return status semantics:
/// - Status::OK(): The object was created (or already existed) and bookkeeping was
/// updated. Note: an internal ObjectExists from the plasma provider is treated
/// as OK and does not surface here.
/// - Status::ObjectStoreFull(): The local plasma store is out of memory (or out of
/// disk when spilling). The error message contains context and a short memory
/// report.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is standard behavior here if we're out of spill (do we just crash everywhere)... how do we recover because this could result in all kinds of problems. Also this is guaranteed to try to spill right, because the usage is always that we're storing a primary copy through this code path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spilling is attempted automatically on puts through this path:

  • Puts use CreateAndSpillIfNeeded:

    if (status.IsObjectStoreFull()) {
    StatusOr<std::string> memory_usage = GetMemoryUsage();
    RAY_CHECK_OK(memory_usage.status()) << "Unable to communicate with the Plasma Store.";
    RAY_LOG(ERROR) << "Failed to put object " << object_id
    << " in object store because it "
    << "is full. Object size is " << data_size << " bytes.\n"
    << "Plasma store status:\n"
    << memory_usage.value() << "\n---\n"
    << "--- Tip: Use the `ray memory` command to list active objects "
    "in the cluster."
    << "\n---\n";
    // Replace the status with a more helpful error message.
    std::ostringstream message;
    message << "Failed to put object " << object_id << " in object store because it "
    << "is full. Object size is " << data_size << " bytes.";
    status = Status::ObjectStoreFull(message.str());
    } else if (status.IsObjectExists()) {

  • plasma-side queue returns transient/full while it triggers GC/spill, then falls back to disk, and only after grace/fallback fails does it surface OutOfDisk:

    auto spill_pending = spill_objects_callback_();
    if (spill_pending) {
    RAY_LOG(DEBUG) << "Reset grace period " << status << " " << spill_pending;
    oom_start_time_ns_ = -1;
    return Status::TransientObjectStoreFull("Waiting for objects to spill.");
    }
    if (now - oom_start_time_ns_ < grace_period_ns) {
    // We need a grace period since (1) global GC takes a bit of time to
    // kick in, and (2) there is a race between spilling finishing and space
    // actually freeing up in the object store.
    RAY_LOG(DEBUG) << "In grace period before fallback allocation / oom.";
    return Status::ObjectStoreFull("Waiting for grace period.");
    }
    // Trigger the fallback allocator.
    status = ProcessRequest(/*fallback_allocator=*/true, *request_it);

/// - Status::IOError(): IPC/connection failures while talking to the plasma store
/// (e.g., broken pipe/connection reset during shutdown, store not reachable).
Comment on lines +1498 to +1506
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

///
/// Call sites that run during shutdown may choose to tolerate IOError specifically,
/// but should treat all other statuses as real failures.
Status PutInLocalPlasmaStore(const RayObject &object,
const ObjectID &object_id,
bool pin_object);
Expand Down
10 changes: 8 additions & 2 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,14 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
/*put_in_local_plasma_callback=*/
[this](const RayObject &object, const ObjectID &object_id) {
auto core_worker = GetCoreWorker();
RAY_CHECK_OK(
core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true));
auto put_status =
core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true);
if (!put_status.ok()) {
RAY_LOG(WARNING).WithField(object_id)
<< "Failed to put object in plasma store: " << put_status;
return put_status;
}
return Status::OK();
},
/* retry_task_callback= */
[this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) {
Expand Down
122 changes: 92 additions & 30 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,10 +533,10 @@ size_t TaskManager::NumPendingTasks() const {
return num_pending_tasks_;
}

bool TaskManager::HandleTaskReturn(const ObjectID &object_id,
const rpc::ReturnObject &return_object,
const NodeID &worker_node_id,
bool store_in_plasma) {
StatusOr<bool> TaskManager::HandleTaskReturn(const ObjectID &object_id,
const rpc::ReturnObject &return_object,
const NodeID &worker_node_id,
bool store_in_plasma) {
bool direct_return = false;
reference_counter_.UpdateObjectSize(object_id, return_object.size());
RAY_LOG(DEBUG) << "Task return object " << object_id << " has size "
Expand Down Expand Up @@ -579,7 +579,15 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id,
/*copy_data=*/false,
tensor_transport.value_or(rpc::TensorTransport::OBJECT_STORE));
if (store_in_plasma) {
put_in_local_plasma_callback_(object, object_id);
Status s = put_in_local_plasma_callback_(object, object_id);
int retry_count = 0;
while (!s.ok() && s.IsTransientObjectStoreFull() && retry_count < 3) {
retry_count++;
s = put_in_local_plasma_callback_(object, object_id);
}
if (!s.ok()) {
return s;
}
} else {
in_memory_store_.Put(object, object_id);
direct_return = true;
Expand Down Expand Up @@ -813,10 +821,15 @@ bool TaskManager::HandleReportGeneratorItemReturns(
}
// When an object is reported, the object is ready to be fetched.
reference_counter_.UpdateObjectPendingCreation(object_id, false);
HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(request.worker_addr().node_id()),
/*store_in_plasma=*/store_in_plasma_ids.contains(object_id));
StatusOr<bool> put_res =
HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(request.worker_addr().node_id()),
/*store_in_plasma=*/store_in_plasma_ids.contains(object_id));
if (!put_res.ok()) {
RAY_LOG(WARNING).WithField(object_id)
<< "Failed to handle streaming dynamic return: " << put_res.status();
}
}

// Handle backpressure if needed.
Expand Down Expand Up @@ -900,23 +913,54 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
reference_counter_.AddDynamicReturn(object_id, generator_id);
dynamic_return_ids.push_back(object_id);
}
if (!HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(worker_addr.node_id()),
store_in_plasma_ids.contains(object_id))) {
if (first_execution) {
dynamic_returns_in_plasma.push_back(object_id);
StatusOr<bool> direct_or =
HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(worker_addr.node_id()),
store_in_plasma_ids.contains(object_id));
if (!direct_or.ok()) {
RAY_LOG(WARNING).WithField(object_id)
<< "Failed to handle dynamic task return: " << direct_or.status();
Status st = direct_or.status();
rpc::ErrorType err_type = rpc::ErrorType::WORKER_DIED;
if (st.IsObjectStoreFull() || st.IsTransientObjectStoreFull()) {
err_type = rpc::ErrorType::OUT_OF_MEMORY;
}
rpc::RayErrorInfo err_info;
err_info.set_error_message(st.ToString());
FailOrRetryPendingTask(task_id,
err_type,
&st,
/*ray_error_info=*/&err_info,
/*mark_task_object_failed=*/true,
/*fail_immediately=*/true);
return;
} else if (!direct_or.value() && first_execution) {
dynamic_returns_in_plasma.push_back(object_id);
}
}
}

for (const auto &return_object : reply.return_objects()) {
const auto object_id = ObjectID::FromBinary(return_object.object_id());
if (HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(worker_addr.node_id()),
store_in_plasma_ids.contains(object_id))) {
StatusOr<bool> direct_or = HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(worker_addr.node_id()),
store_in_plasma_ids.contains(object_id));
if (!direct_or.ok()) {
RAY_LOG(WARNING).WithField(object_id)
<< "Failed to handle task return: " << direct_or.status();
// If storing return in plasma failed, treat as system failure for this attempt.
// Do not proceed with normal completion. Mark task failed immediately.
Status st = direct_or.status();
FailOrRetryPendingTask(task_id,
rpc::ErrorType::WORKER_DIED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WORKER_DIED isn't the right error type here, worker died is normally referring to the executor worker dying, but here something wrong is happening on the owner, the error type should be different

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. We now consistently map Status → ErrorType via MapStatusToErrorType (IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY, OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED).

&st,
/*ray_error_info=*/nullptr,
/*mark_task_object_failed=*/true,
/*fail_immediately=*/true);
return;
} else if (direct_or.value()) {
direct_return_ids.push_back(object_id);
}
}
Expand Down Expand Up @@ -1040,10 +1084,16 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
const auto generator_return_id = spec.StreamingGeneratorReturnId(i);
RAY_CHECK_EQ(reply.return_objects_size(), 1);
const auto &return_object = reply.return_objects(0);
HandleTaskReturn(generator_return_id,
return_object,
NodeID::FromBinary(worker_addr.node_id()),
store_in_plasma_ids.contains(generator_return_id));
StatusOr<bool> res =
HandleTaskReturn(generator_return_id,
return_object,
NodeID::FromBinary(worker_addr.node_id()),
store_in_plasma_ids.contains(generator_return_id));
if (!res.ok()) {
RAY_LOG(WARNING).WithField(generator_return_id)
<< "Failed to handle generator return during app error propagation: "
<< res.status();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a warning log and forget, if this is possible this seems pretty bad, a cluster could hang waiting on this return value or am i missing that something downstream is handling this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it makes sense for HandleTaskReturn to just handle all error types instead of just ObjectStoreFull and not propagate up so we don't have to handle at every upstream location

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. No log-and-forget remains: on any HandleTaskReturn error we immediately FailOrRetryPendingTask with the mapped error and stop, so waiters are unblocked. I recommend keeping HandleTaskReturn side‑effect free (it already returns StatusOr) and centralizing failure policy in TaskManager callsites. To reduce duplication, I added a small helper (MapStatusToErrorType) and use it in the few places we call FailOrRetryPendingTask.

}
}
}
}
Expand Down Expand Up @@ -1454,18 +1504,26 @@ void TaskManager::MarkTaskReturnObjectsFailed(
int64_t num_returns = spec.NumReturns();
for (int i = 0; i < num_returns; i++) {
const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1);
// Always place an error marker in local memory to unblock waiters quickly.
in_memory_store_.Put(error, object_id);
// Best-effort plasma put if the object was meant to be in plasma.
if (store_in_plasma_ids.contains(object_id)) {
put_in_local_plasma_callback_(error, object_id);
} else {
in_memory_store_.Put(error, object_id);
Status s = put_in_local_plasma_callback_(error, object_id);
if (!s.ok()) {
RAY_LOG(WARNING).WithField(object_id)
<< "Failed to put error object in plasma: " << s;
}
}
}
if (spec.ReturnsDynamic()) {
for (const auto &dynamic_return_id : spec.DynamicReturnIds()) {
in_memory_store_.Put(error, dynamic_return_id);
if (store_in_plasma_ids.contains(dynamic_return_id)) {
put_in_local_plasma_callback_(error, dynamic_return_id);
} else {
in_memory_store_.Put(error, dynamic_return_id);
Status s = put_in_local_plasma_callback_(error, dynamic_return_id);
if (!s.ok()) {
RAY_LOG(WARNING).WithField(dynamic_return_id)
<< "Failed to put error object in plasma: " << s;
}
}
}
}
Expand All @@ -1488,7 +1546,11 @@ void TaskManager::MarkTaskReturnObjectsFailed(
for (size_t i = 0; i < num_streaming_generator_returns; i++) {
const auto generator_return_id = spec.StreamingGeneratorReturnId(i);
if (store_in_plasma_ids.contains(generator_return_id)) {
put_in_local_plasma_callback_(error, generator_return_id);
Status s = put_in_local_plasma_callback_(error, generator_return_id);
if (!s.ok()) {
RAY_LOG(WARNING).WithField(generator_return_id)
<< "Failed to put error object in plasma: " << s;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a log and forget with no memory store put, could lead to a hanging cluster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. We always write the error to the in-memory store first (unblocks), then best-effort plasma put with warning on failure.

}
} else {
in_memory_store_.Put(error, generator_return_id);
}
Expand Down
16 changes: 9 additions & 7 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <tuple>
Expand All @@ -25,6 +26,7 @@
#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/mutex.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker/task_event_buffer.h"
#include "ray/core_worker/task_manager_interface.h"
Expand All @@ -42,7 +44,7 @@ class ActorManager;

using TaskStatusCounter = CounterMap<std::tuple<std::string, rpc::TaskStatus, bool>>;
using PutInLocalPlasmaCallback =
std::function<void(const RayObject &object, const ObjectID &object_id)>;
std::function<Status(const RayObject &object, const ObjectID &object_id)>;
using RetryTaskCallback =
std::function<void(TaskSpecification &spec, bool object_recovery, uint32_t delay_ms)>;
using ReconstructObjectCallback = std::function<void(const ObjectID &object_id)>;
Expand Down Expand Up @@ -608,12 +610,12 @@ class TaskManager : public TaskManagerInterface {
ABSL_LOCKS_EXCLUDED(mu_);

/// Update nested ref count info and store the in-memory value for a task's
/// return object. Returns true if the task's return object was returned
/// directly by value.
bool HandleTaskReturn(const ObjectID &object_id,
const rpc::ReturnObject &return_object,
const NodeID &worker_node_id,
bool store_in_plasma) ABSL_LOCKS_EXCLUDED(mu_);
/// return object. On success, sets direct_return_out to true if the object's value
/// was returned directly by value (not stored in plasma).
StatusOr<bool> HandleTaskReturn(const ObjectID &object_id,
const rpc::ReturnObject &return_object,
const NodeID &worker_node_id,
bool store_in_plasma) ABSL_LOCKS_EXCLUDED(mu_);

/// Remove a lineage reference to this object ID. This should be called
/// whenever a task that depended on this object ID can no longer be retried.
Expand Down
Loading