Skip to content

Commit e489752

Browse files
committed
[core] Fix RAY_CHECK failure during shutdown due to plasma store race condition
Signed-off-by: Sagar Sumit <[email protected]>
1 parent ddec976 commit e489752

File tree

3 files changed

+39
-2
lines changed

3 files changed

+39
-2
lines changed

src/ray/core_worker/core_worker.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,8 @@ void CoreWorker::Shutdown() {
534534
RAY_LOG(INFO) << "Core worker ready to be deallocated.";
535535
}
536536

537+
bool CoreWorker::IsShuttingDown() const { return is_shutdown_.load(); }
538+
537539
void CoreWorker::ConnectToRayletInternal() {
538540
// Tell the raylet the port that we are listening on.
539541
// NOTE: This also marks the worker as available in Raylet. We do this at the

src/ray/core_worker/core_worker.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,12 @@ class CoreWorker {
244244
///
245245
void Shutdown();
246246

247+
/// Check if the core worker is currently shutting down.
248+
/// This can be used to avoid operations that might fail during shutdown.
249+
///
250+
/// \return true if shutdown has been initiated, false otherwise.
251+
bool IsShuttingDown() const;
252+
247253
/// Start receiving and executing tasks.
248254
void RunTaskExecutionLoop();
249255

src/ray/core_worker/core_worker_process.cc

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,8 +423,37 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
423423
/*put_in_local_plasma_callback=*/
424424
[this](const RayObject &object, const ObjectID &object_id) {
425425
auto core_worker = GetCoreWorker();
426-
RAY_CHECK_OK(
427-
core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true));
426+
427+
// Check if the core worker is shutting down before attempting plasma operations.
428+
// During shutdown, the plasma store connection may already be broken, so we
429+
// should avoid plasma operations entirely.
430+
if (core_worker->IsShuttingDown()) {
431+
RAY_LOG(INFO) << "Skipping plasma store operation for error object "
432+
<< object_id << " because core worker is shutting down.";
433+
return;
434+
}
435+
436+
auto status =
437+
core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true);
438+
if (!status.ok()) {
439+
if (status.IsIOError() &&
440+
(status.message().find("Broken pipe") != std::string::npos ||
441+
status.message().find("Connection reset") != std::string::npos ||
442+
status.message().find("Bad file descriptor") != std::string::npos)) {
443+
// This is likely a shutdown race where the plasma store
444+
// connection was closed before we could complete the operation.
445+
// Log as warning since this is expected during shutdown scenarios.
446+
RAY_LOG(WARNING)
447+
<< "Failed to put error object " << object_id
448+
<< " in plasma store due to connection error (likely shutdown): "
449+
<< status.ToString();
450+
} else {
451+
// For other types of errors, maintain the original
452+
// behavior with RAY_CHECK_OK to catch real issues.
453+
RAY_CHECK_OK(status) << "Failed to put error object " << object_id
454+
<< " in plasma store: " << status.ToString();
455+
}
456+
}
428457
},
429458
/* retry_task_callback= */
430459
[this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) {

0 commit comments

Comments
 (0)