Skip to content

Commit 633b4f1

Browse files
committed
remove error msg based check and add test
Signed-off-by: Sagar Sumit <[email protected]>
1 parent c91a56e commit 633b4f1

File tree

3 files changed

+85
-22
lines changed

3 files changed

+85
-22
lines changed

src/ray/core_worker/core_worker.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -538,9 +538,7 @@ void CoreWorker::Shutdown() {
538538
RAY_LOG(INFO) << "Core worker ready to be deallocated.";
539539
}
540540

541-
bool CoreWorker::IsShuttingDown() const {
542-
return is_shutdown_.load();
543-
}
541+
bool CoreWorker::IsShuttingDown() const { return is_shutdown_.load(); }
544542

545543
void CoreWorker::ConnectToRayletInternal() {
546544
// Tell the raylet the port that we are listening on.

src/ray/core_worker/core_worker_process.cc

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -423,33 +423,30 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
423423
/*put_in_local_plasma_callback=*/
424424
[this](const RayObject &object, const ObjectID &object_id) {
425425
auto core_worker = GetCoreWorker();
426-
426+
427427
// Check if the core worker is shutting down before attempting plasma operations.
428428
// During shutdown, the plasma store connection may already be broken, so we
429429
// should avoid plasma operations entirely.
430430
if (core_worker->IsShuttingDown()) {
431-
RAY_LOG(INFO) << "Skipping plasma store operation for error object " << object_id
432-
<< " because core worker is shutting down.";
431+
RAY_LOG(INFO) << "Skipping plasma store operation for error object "
432+
<< object_id << " because core worker is shutting down.";
433433
return;
434434
}
435-
436-
auto status = core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true);
435+
436+
auto status =
437+
core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true);
437438
if (!status.ok()) {
438-
if (status.IsIOError() &&
439-
(status.message().find("Broken pipe") != std::string::npos ||
440-
status.message().find("Connection reset") != std::string::npos ||
441-
status.message().find("Bad file descriptor") != std::string::npos)) {
442-
// This is likely a shutdown race where the plasma store
443-
// connection was closed before we could complete the operation.
444-
// Log as warning since this is expected during shutdown scenarios.
445-
RAY_LOG(WARNING) << "Failed to put error object " << object_id
446-
<< " in plasma store due to connection error (likely shutdown): "
447-
<< status.ToString();
439+
// During shutdown, plasma store connections can be closed causing IOError.
440+
// We only tolerate IOError during shutdown to avoid masking real errors.
441+
if (status.IsIOError() && core_worker->IsShuttingDown()) {
442+
// Double-check shutdown state - this handles the race where shutdown
443+
// began after our first check but before the plasma operation.
444+
RAY_LOG(WARNING) << "Failed to put error object " << object_id
445+
<< " in plasma store during shutdown: " << status.ToString();
448446
} else {
449-
// For other types of errors, maintain the original
450-
// behavior with RAY_CHECK_OK to catch real issues.
451-
RAY_CHECK_OK(status) << "Failed to put error object " << object_id
452-
<< " in plasma store: " << status.ToString();
447+
// For any other error, or IOError when not shutting down, this indicates
448+
// a real problem that should crash the process.
449+
RAY_CHECK_OK(status);
453450
}
454451
}
455452
},

src/ray/core_worker/test/task_manager_test.cc

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2692,6 +2692,74 @@ TEST_F(TaskManagerTest, TestTaskRetriedOnNodePreemption) {
26922692
// Cleanup
26932693
manager_.FailPendingTask(spec.TaskId(), rpc::ErrorType::WORKER_DIED);
26942694
}
2695+
2696+
class PlasmaShutdownRaceTest : public ::testing::Test {
2697+
public:
2698+
PlasmaShutdownRaceTest() : is_shutting_down_(false) {}
2699+
2700+
Status SimulatePlasmaCallback(const ObjectID &object_id, bool simulate_failure) {
2701+
if (is_shutting_down_) {
2702+
skipped_operations_.insert(object_id);
2703+
return Status::OK();
2704+
}
2705+
2706+
if (simulate_failure) {
2707+
auto status = Status::IOError("Broken pipe");
2708+
if (status.IsIOError() && is_shutting_down_) {
2709+
tolerated_operations_.insert(object_id);
2710+
return Status::OK();
2711+
} else {
2712+
failed_operations_.insert(object_id);
2713+
return status;
2714+
}
2715+
}
2716+
2717+
successful_operations_.insert(object_id);
2718+
return Status::OK();
2719+
}
2720+
2721+
void SetShuttingDown(bool shutting_down) { is_shutting_down_ = shutting_down; }
2722+
2723+
protected:
2724+
bool is_shutting_down_;
2725+
std::unordered_set<ObjectID> skipped_operations_;
2726+
std::unordered_set<ObjectID> tolerated_operations_;
2727+
std::unordered_set<ObjectID> successful_operations_;
2728+
std::unordered_set<ObjectID> failed_operations_;
2729+
};
2730+
2731+
// Test plasma callback behavior during shutdown to prevent RAY_CHECK crashes
2732+
TEST_F(PlasmaShutdownRaceTest, PlasmaCallbackHandlesShutdownRaceCondition) {
2733+
auto object_id = ObjectID::FromRandom();
2734+
2735+
SetShuttingDown(false);
2736+
ASSERT_TRUE(SimulatePlasmaCallback(object_id, false).ok());
2737+
ASSERT_EQ(successful_operations_.count(object_id), 1);
2738+
2739+
auto object_id2 = ObjectID::FromRandom();
2740+
auto status = SimulatePlasmaCallback(object_id2, true);
2741+
ASSERT_FALSE(status.ok());
2742+
ASSERT_TRUE(status.IsIOError());
2743+
ASSERT_EQ(failed_operations_.count(object_id2), 1);
2744+
2745+
auto object_id3 = ObjectID::FromRandom();
2746+
SetShuttingDown(true);
2747+
ASSERT_TRUE(SimulatePlasmaCallback(object_id3, false).ok());
2748+
ASSERT_EQ(skipped_operations_.count(object_id3), 1);
2749+
2750+
auto object_id4 = ObjectID::FromRandom();
2751+
SetShuttingDown(false);
2752+
auto status4 = Status::IOError("Broken pipe");
2753+
SetShuttingDown(true);
2754+
2755+
if (status4.IsIOError() && is_shutting_down_) {
2756+
tolerated_operations_.insert(object_id4);
2757+
} else {
2758+
failed_operations_.insert(object_id4);
2759+
}
2760+
ASSERT_EQ(tolerated_operations_.count(object_id4), 1);
2761+
}
2762+
26952763
} // namespace core
26962764
} // namespace ray
26972765

0 commit comments

Comments
 (0)