From cdf88500de3935d24fb66ca1b2c26b63d7acd527 Mon Sep 17 00:00:00 2001 From: Joyee Cheung Date: Sat, 26 Apr 2025 23:10:12 +0200 Subject: [PATCH 1/4] test: remove deadlock workaround --- test/parallel/test-file-write-stream4.js | 7 +------ test/parallel/test-net-write-fully-async-buffer.js | 2 +- test/parallel/test-net-write-fully-async-hex-string.js | 2 +- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/test/parallel/test-file-write-stream4.js b/test/parallel/test-file-write-stream4.js index e741cdd79036b4..6b3862fa714d7c 100644 --- a/test/parallel/test-file-write-stream4.js +++ b/test/parallel/test-file-write-stream4.js @@ -1,4 +1,3 @@ -// Flags: --expose-gc 'use strict'; // Test that 'close' emits once and not twice when `emitClose: true` is set. @@ -18,8 +17,4 @@ const fileWriteStream = fs.createWriteStream(filepath, { }); fileReadStream.pipe(fileWriteStream); -fileWriteStream.on('close', common.mustCall(() => { - // TODO(lpinca): Remove the forced GC when - // https://github.com/nodejs/node/issues/54918 is fixed. - globalThis.gc({ type: 'major' }); -})); +fileWriteStream.on('close', common.mustCall()); diff --git a/test/parallel/test-net-write-fully-async-buffer.js b/test/parallel/test-net-write-fully-async-buffer.js index 4dfb905d23b69e..93074c3c49d6b6 100644 --- a/test/parallel/test-net-write-fully-async-buffer.js +++ b/test/parallel/test-net-write-fully-async-buffer.js @@ -23,7 +23,7 @@ const server = net.createServer(common.mustCall(function(conn) { } while (conn.write(Buffer.from(data))); - globalThis.gc({ type: 'major' }); + globalThis.gc({ type: 'minor' }); // The buffer allocated above should still be alive. } diff --git a/test/parallel/test-net-write-fully-async-hex-string.js b/test/parallel/test-net-write-fully-async-hex-string.js index c1ebe7e68b534e..2719ad6b5b5f80 100644 --- a/test/parallel/test-net-write-fully-async-hex-string.js +++ b/test/parallel/test-net-write-fully-async-hex-string.js @@ -21,7 +21,7 @@ const server = net.createServer(common.mustCall(function(conn) { } while (conn.write(data, 'hex')); - globalThis.gc({ type: 'major' }); + globalThis.gc({ type: 'minor' }); // The buffer allocated inside the .write() call should still be alive. } From 466e5415e4f1cf96ee9f9094ab097c948c7fba60 Mon Sep 17 00:00:00 2001 From: Joyee Cheung Date: Fri, 2 May 2025 16:11:06 +0200 Subject: [PATCH 2/4] src: add more debug logs and comments in NodePlatform --- src/debug_utils.h | 2 + src/node_platform.cc | 144 +++++++++++++++++++++++++++++++++++++++---- src/node_platform.h | 25 +++++++- 3 files changed, 156 insertions(+), 15 deletions(-) diff --git a/src/debug_utils.h b/src/debug_utils.h index d4391ac987ba5b..7f073e1ea8b37a 100644 --- a/src/debug_utils.h +++ b/src/debug_utils.h @@ -55,6 +55,8 @@ void NODE_EXTERN_PRIVATE FWrite(FILE* file, const std::string& str); V(MKSNAPSHOT) \ V(SNAPSHOT_SERDES) \ V(PERMISSION_MODEL) \ + V(PLATFORM_MINIMAL) \ + V(PLATFORM_VERBOSE) \ V(QUIC) enum class DebugCategory : unsigned int { diff --git a/src/node_platform.cc b/src/node_platform.cc index 50f5d4ef02aabd..0a8da9894f5309 100644 --- a/src/node_platform.cc +++ b/src/node_platform.cc @@ -13,6 +13,7 @@ using v8::Isolate; using v8::Object; using v8::Platform; using v8::Task; +using v8::TaskPriority; namespace { @@ -22,8 +23,31 @@ struct PlatformWorkerData { ConditionVariable* platform_workers_ready; int* pending_platform_workers; int id; + PlatformDebugLogLevel debug_log_level; }; +const char* GetTaskPriorityName(TaskPriority priority) { + switch (priority) { + case TaskPriority::kUserBlocking: + return "UserBlocking"; + case TaskPriority::kUserVisible: + return "UserVisible"; + case TaskPriority::kBestEffort: + return "BestEffort"; + default: + return "Unknown"; + } +} + +static void PrintSourceLocation(const v8::SourceLocation& location) { + auto loc = location.ToString(); + if (!loc.empty()) { + fprintf(stderr, " %s\n", loc.c_str()); + } else { + fprintf(stderr, " \n"); + } +} + static void PlatformWorkerThread(void* data) { uv_thread_setname("V8Worker"); std::unique_ptr @@ -40,8 +64,18 @@ static void PlatformWorkerThread(void* data) { worker_data->platform_workers_ready->Signal(lock); } + bool debug_log_enabled = + worker_data->debug_log_level != PlatformDebugLogLevel::kNone; + int id = worker_data->id; while (std::unique_ptr task = pending_worker_tasks->Lock().BlockingPop()) { + if (debug_log_enabled) { + fprintf(stderr, + "\nPlatformWorkerThread %d running task %p\n", + id, + task.get()); + fflush(stderr); + } task->Run(); pending_worker_tasks->Lock().NotifyOfCompletion(); } @@ -75,6 +109,12 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { void PostDelayedTask(std::unique_ptr task, double delay_in_seconds) { auto locked = tasks_.Lock(); + + // The delayed task scheuler is on is own thread with its own loop that + // runs the timers for the scheduled tasks to pop the original task back + // into the the worker task queue. This first pushes the tasks that + // schedules the timers into the local task queue that will be flushed + // by the local event loop. locked.Push(std::make_unique( this, std::move(task), delay_in_seconds)); uv_async_send(&flush_tasks_); @@ -109,6 +149,9 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { while (!tasks_to_run.empty()) { std::unique_ptr task = std::move(tasks_to_run.front()); tasks_to_run.pop(); + // This runs either the ScheduleTasks that scheduels the timers to + // pop the tasks back into the worker task runner queue, or the + // or the StopTasks to stop the timers and drop all the pending tasks. task->Run(); } } @@ -136,9 +179,9 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { ScheduleTask(DelayedTaskScheduler* scheduler, std::unique_ptr task, double delay_in_seconds) - : scheduler_(scheduler), - task_(std::move(task)), - delay_in_seconds_(delay_in_seconds) {} + : scheduler_(scheduler), + task_(std::move(task)), + delay_in_seconds_(delay_in_seconds) {} void Run() override { uint64_t delay_millis = llround(delay_in_seconds_ * 1000); @@ -173,15 +216,21 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { } uv_sem_t ready_; + // Task queue in the worker thread task runner, we push the delayed task back + // to it when the timer expires. TaskQueue* pending_worker_tasks_; + // Locally scheduled tasks to be poped into the worker task runner queue. + // It is flushed whenever the next closest timer expires. TaskQueue tasks_; uv_loop_t loop_; uv_async_t flush_tasks_; std::unordered_set timers_; }; -WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) { +WorkerThreadsTaskRunner::WorkerThreadsTaskRunner( + int thread_pool_size, PlatformDebugLogLevel debug_log_level) + : debug_log_level_(debug_log_level) { Mutex platform_workers_mutex; ConditionVariable platform_workers_ready; @@ -193,10 +242,13 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) { threads_.push_back(delayed_task_scheduler_->Start()); for (int i = 0; i < thread_pool_size; i++) { - PlatformWorkerData* worker_data = new PlatformWorkerData{ - &pending_worker_tasks_, &platform_workers_mutex, - &platform_workers_ready, &pending_platform_workers, i - }; + PlatformWorkerData* worker_data = + new PlatformWorkerData{&pending_worker_tasks_, + &platform_workers_mutex, + &platform_workers_ready, + &pending_platform_workers, + i, + debug_log_level_}; std::unique_ptr t { new uv_thread_t() }; if (uv_thread_create(t.get(), PlatformWorkerThread, worker_data) != 0) { @@ -238,8 +290,8 @@ int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const { } PerIsolatePlatformData::PerIsolatePlatformData( - Isolate* isolate, uv_loop_t* loop) - : isolate_(isolate), loop_(loop) { + Isolate* isolate, uv_loop_t* loop, PlatformDebugLogLevel debug_log_level) + : isolate_(isolate), loop_(loop), debug_log_level_(debug_log_level) { flush_tasks_ = new uv_async_t(); CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks)); flush_tasks_->data = static_cast(this); @@ -267,6 +319,15 @@ void PerIsolatePlatformData::PostTaskImpl(std::unique_ptr task, // the foreground task runner is being cleaned up by Shutdown(). In that // case, make sure we wait until the shutdown is completed (which leads // to flush_tasks_ == nullptr, and the task will be discarded). + if (debug_log_level_ != PlatformDebugLogLevel::kNone) { + fprintf(stderr, "\nPerIsolatePlatformData::PostTaskImpl %p", task.get()); + PrintSourceLocation(location); + if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) { + DumpNativeBacktrace(stderr); + } + fflush(stderr); + } + auto locked = foreground_tasks_.Lock(); if (flush_tasks_ == nullptr) return; locked.Push(std::move(task)); @@ -277,6 +338,18 @@ void PerIsolatePlatformData::PostDelayedTaskImpl( std::unique_ptr task, double delay_in_seconds, const v8::SourceLocation& location) { + if (debug_log_level_ != PlatformDebugLogLevel::kNone) { + fprintf(stderr, + "\nPerIsolatePlatformData::PostDelayedTaskImpl %p %f", + task.get(), + delay_in_seconds); + PrintSourceLocation(location); + if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) { + DumpNativeBacktrace(stderr); + } + fflush(stderr); + } + auto locked = foreground_delayed_tasks_.Lock(); if (flush_tasks_ == nullptr) return; std::unique_ptr delayed(new DelayedTask()); @@ -346,6 +419,16 @@ void PerIsolatePlatformData::DecreaseHandleCount() { NodePlatform::NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller, v8::PageAllocator* page_allocator) { + if (per_process::enabled_debug_list.enabled( + DebugCategory::PLATFORM_VERBOSE)) { + debug_log_level_ = PlatformDebugLogLevel::kVerbose; + } else if (per_process::enabled_debug_list.enabled( + DebugCategory::PLATFORM_MINIMAL)) { + debug_log_level_ = PlatformDebugLogLevel::kMinimal; + } else { + debug_log_level_ = PlatformDebugLogLevel::kNone; + } + if (tracing_controller != nullptr) { tracing_controller_ = tracing_controller; } else { @@ -362,8 +445,8 @@ NodePlatform::NodePlatform(int thread_pool_size, DCHECK_EQ(GetTracingController(), tracing_controller_); thread_pool_size = GetActualThreadPoolSize(thread_pool_size); - worker_thread_task_runner_ = - std::make_shared(thread_pool_size); + worker_thread_task_runner_ = std::make_shared( + thread_pool_size, debug_log_level_); } NodePlatform::~NodePlatform() { @@ -372,7 +455,8 @@ NodePlatform::~NodePlatform() { void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) { Mutex::ScopedLock lock(per_isolate_mutex_); - auto delegate = std::make_shared(isolate, loop); + auto delegate = + std::make_shared(isolate, loop, debug_log_level_); IsolatePlatformDelegate* ptr = delegate.get(); auto insertion = per_isolate_.emplace( isolate, @@ -527,6 +611,17 @@ void NodePlatform::PostTaskOnWorkerThreadImpl( v8::TaskPriority priority, std::unique_ptr task, const v8::SourceLocation& location) { + if (debug_log_level_ != PlatformDebugLogLevel::kNone) { + fprintf(stderr, + "\nNodePlatform::PostTaskOnWorkerThreadImpl %s %p", + GetTaskPriorityName(priority), + task.get()); + PrintSourceLocation(location); + if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) { + DumpNativeBacktrace(stderr); + } + fflush(stderr); + } worker_thread_task_runner_->PostTask(std::move(task)); } @@ -535,6 +630,18 @@ void NodePlatform::PostDelayedTaskOnWorkerThreadImpl( std::unique_ptr task, double delay_in_seconds, const v8::SourceLocation& location) { + if (debug_log_level_ != PlatformDebugLogLevel::kNone) { + fprintf(stderr, + "\nNodePlatform::PostDelayedTaskOnWorkerThreadImpl %s %p %f", + GetTaskPriorityName(priority), + task.get(), + delay_in_seconds); + PrintSourceLocation(location); + if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) { + DumpNativeBacktrace(stderr); + } + fflush(stderr); + } worker_thread_task_runner_->PostDelayedTask(std::move(task), delay_in_seconds); } @@ -564,6 +671,17 @@ std::unique_ptr NodePlatform::CreateJobImpl( v8::TaskPriority priority, std::unique_ptr job_task, const v8::SourceLocation& location) { + if (debug_log_level_ != PlatformDebugLogLevel::kNone) { + fprintf(stderr, + "\nNodePlatform::CreateJobImpl %s %p", + GetTaskPriorityName(priority), + job_task.get()); + PrintSourceLocation(location); + if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) { + DumpNativeBacktrace(stderr); + } + fflush(stderr); + } return v8::platform::NewDefaultJobHandle( this, priority, std::move(job_task), NumberOfWorkerThreads()); } diff --git a/src/node_platform.h b/src/node_platform.h index 6462f06f6983b2..e9553236d45f87 100644 --- a/src/node_platform.h +++ b/src/node_platform.h @@ -61,13 +61,22 @@ struct DelayedTask { std::shared_ptr platform_data; }; +enum class PlatformDebugLogLevel { + kNone = 0, + kMinimal = 1, + kVerbose = 2, +}; + // This acts as the foreground task runner for a given Isolate. class PerIsolatePlatformData : public IsolatePlatformDelegate, public v8::TaskRunner, public std::enable_shared_from_this { public: - PerIsolatePlatformData(v8::Isolate* isolate, uv_loop_t* loop); + PerIsolatePlatformData( + v8::Isolate* isolate, + uv_loop_t* loop, + PlatformDebugLogLevel debug_log_level = PlatformDebugLogLevel::kNone); ~PerIsolatePlatformData() override; std::shared_ptr GetForegroundTaskRunner() override; @@ -134,12 +143,14 @@ class PerIsolatePlatformData typedef std::unique_ptr DelayedTaskPointer; std::vector scheduled_delayed_tasks_; + PlatformDebugLogLevel debug_log_level_ = PlatformDebugLogLevel::kNone; }; // This acts as the single worker thread task runner for all Isolates. class WorkerThreadsTaskRunner { public: - explicit WorkerThreadsTaskRunner(int thread_pool_size); + explicit WorkerThreadsTaskRunner(int thread_pool_size, + PlatformDebugLogLevel debug_log_level); void PostTask(std::unique_ptr task); void PostDelayedTask(std::unique_ptr task, double delay_in_seconds); @@ -150,12 +161,21 @@ class WorkerThreadsTaskRunner { int NumberOfWorkerThreads() const; private: + // A queue shared by all threads. The consumers are the worker threads which + // take tasks from it to run in PlatformWorkerThread(). The producers can be + // any thread. Both the foreground thread and the worker threads can push + // tasks into the queue via v8::Platform::PostTaskOnWorkerThread() which + // eventually calls PostTask() on this class. When any thread calls + // v8::Platform::PostDelayedTaskOnWorkerThread(), the DelayedTaskScheduler + // thread will schedule a timer that pushes the delayed tasks back into this + // queue when the timer expires. TaskQueue pending_worker_tasks_; class DelayedTaskScheduler; std::unique_ptr delayed_task_scheduler_; std::vector> threads_; + PlatformDebugLogLevel debug_log_level_ = PlatformDebugLogLevel::kNone; }; class NodePlatform : public MultiIsolatePlatform { @@ -216,6 +236,7 @@ class NodePlatform : public MultiIsolatePlatform { v8::PageAllocator* page_allocator_; std::shared_ptr worker_thread_task_runner_; bool has_shut_down_ = false; + PlatformDebugLogLevel debug_log_level_ = PlatformDebugLogLevel::kNone; }; } // namespace node From d160455cd937664977cd77c7142a03ca7dc2246b Mon Sep 17 00:00:00 2001 From: Joyee Cheung Date: Fri, 2 May 2025 19:25:54 +0200 Subject: [PATCH 3/4] src: use priority queue to run worker tasks According to the documentation, the v8 tasks should be executed based on priority. Previously we always execute the tasks in FIFO order, this changes the NodePlatform implementation to execute the higher priority tasks first. The tasks used to schedule timers for the delayed tasks are run in FIFO order since priority is irrelavent for the timer scheduling part while the tasks unwrapped by the timer callbacks are still ordered by priority. --- src/node_platform.cc | 105 +++++++++++++++++++++++++++---------------- src/node_platform.h | 49 +++++++++++++++++--- 2 files changed, 109 insertions(+), 45 deletions(-) diff --git a/src/node_platform.cc b/src/node_platform.cc index 0a8da9894f5309..4320b74052ee73 100644 --- a/src/node_platform.cc +++ b/src/node_platform.cc @@ -18,7 +18,7 @@ using v8::TaskPriority; namespace { struct PlatformWorkerData { - TaskQueue* task_queue; + TaskQueue* task_queue; Mutex* platform_workers_mutex; ConditionVariable* platform_workers_ready; int* pending_platform_workers; @@ -53,7 +53,7 @@ static void PlatformWorkerThread(void* data) { std::unique_ptr worker_data(static_cast(data)); - TaskQueue* pending_worker_tasks = worker_data->task_queue; + TaskQueue* pending_worker_tasks = worker_data->task_queue; TRACE_EVENT_METADATA1("__metadata", "thread_name", "name", "PlatformWorkerThread"); @@ -67,16 +67,17 @@ static void PlatformWorkerThread(void* data) { bool debug_log_enabled = worker_data->debug_log_level != PlatformDebugLogLevel::kNone; int id = worker_data->id; - while (std::unique_ptr task = + while (std::unique_ptr entry = pending_worker_tasks->Lock().BlockingPop()) { if (debug_log_enabled) { fprintf(stderr, - "\nPlatformWorkerThread %d running task %p\n", + "\nPlatformWorkerThread %d running task %p %s\n", id, - task.get()); + entry->task.get(), + GetTaskPriorityName(entry->priority)); fflush(stderr); } - task->Run(); + entry->task->Run(); pending_worker_tasks->Lock().NotifyOfCompletion(); } } @@ -92,8 +93,8 @@ static int GetActualThreadPoolSize(int thread_pool_size) { class WorkerThreadsTaskRunner::DelayedTaskScheduler { public: - explicit DelayedTaskScheduler(TaskQueue* tasks) - : pending_worker_tasks_(tasks) {} + explicit DelayedTaskScheduler(TaskQueue* tasks) + : pending_worker_tasks_(tasks) {} std::unique_ptr Start() { auto start_thread = [](void* data) { @@ -107,16 +108,21 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { return t; } - void PostDelayedTask(std::unique_ptr task, double delay_in_seconds) { + void PostDelayedTask(v8::TaskPriority priority, + std::unique_ptr task, + double delay_in_seconds) { auto locked = tasks_.Lock(); + auto entry = std::make_unique(std::move(task), priority); + auto delayed = std::make_unique( + this, std::move(entry), delay_in_seconds); + // The delayed task scheuler is on is own thread with its own loop that // runs the timers for the scheduled tasks to pop the original task back // into the the worker task queue. This first pushes the tasks that // schedules the timers into the local task queue that will be flushed // by the local event loop. - locked.Push(std::make_unique( - this, std::move(task), delay_in_seconds)); + locked.Push(std::move(delayed)); uv_async_send(&flush_tasks_); } @@ -144,10 +150,12 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { DelayedTaskScheduler* scheduler = ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop); - std::queue> tasks_to_run = - scheduler->tasks_.Lock().PopAll(); + auto tasks_to_run = scheduler->tasks_.Lock().PopAll(); while (!tasks_to_run.empty()) { - std::unique_ptr task = std::move(tasks_to_run.front()); + // We have to use const_cast because std::priority_queue::top() does not + // return a movable item. + std::unique_ptr task = + std::move(const_cast&>(tasks_to_run.top())); tasks_to_run.pop(); // This runs either the ScheduleTasks that scheduels the timers to // pop the tasks back into the worker task runner queue, or the @@ -177,7 +185,7 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { class ScheduleTask : public Task { public: ScheduleTask(DelayedTaskScheduler* scheduler, - std::unique_ptr task, + std::unique_ptr task, double delay_in_seconds) : scheduler_(scheduler), task_(std::move(task)), @@ -194,7 +202,7 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { private: DelayedTaskScheduler* scheduler_; - std::unique_ptr task_; + std::unique_ptr task_; double delay_in_seconds_; }; @@ -205,20 +213,21 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { scheduler->TakeTimerTask(timer)); } - std::unique_ptr TakeTimerTask(uv_timer_t* timer) { - std::unique_ptr task(static_cast(timer->data)); + std::unique_ptr TakeTimerTask(uv_timer_t* timer) { + std::unique_ptr task_entry( + static_cast(timer->data)); uv_timer_stop(timer); uv_close(reinterpret_cast(timer), [](uv_handle_t* handle) { delete reinterpret_cast(handle); }); timers_.erase(timer); - return task; + return task_entry; } uv_sem_t ready_; // Task queue in the worker thread task runner, we push the delayed task back // to it when the timer expires. - TaskQueue* pending_worker_tasks_; + TaskQueue* pending_worker_tasks_; // Locally scheduled tasks to be poped into the worker task runner queue. // It is flushed whenever the next closest timer expires. @@ -264,13 +273,20 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner( } } -void WorkerThreadsTaskRunner::PostTask(std::unique_ptr task) { - pending_worker_tasks_.Lock().Push(std::move(task)); +void WorkerThreadsTaskRunner::PostTask(v8::TaskPriority priority, + std::unique_ptr task, + const v8::SourceLocation& location) { + auto entry = std::make_unique(std::move(task), priority); + pending_worker_tasks_.Lock().Push(std::move(entry)); } -void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr task, - double delay_in_seconds) { - delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds); +void WorkerThreadsTaskRunner::PostDelayedTask( + v8::TaskPriority priority, + std::unique_ptr task, + const v8::SourceLocation& location, + double delay_in_seconds) { + delayed_task_scheduler_->PostDelayedTask( + priority, std::move(task), delay_in_seconds); } void WorkerThreadsTaskRunner::BlockingDrain() { @@ -330,7 +346,9 @@ void PerIsolatePlatformData::PostTaskImpl(std::unique_ptr task, auto locked = foreground_tasks_.Lock(); if (flush_tasks_ == nullptr) return; - locked.Push(std::move(task)); + // All foreground tasks are treated as user blocking tasks. + locked.Push(std::make_unique( + std::move(task), v8::TaskPriority::kUserBlocking)); uv_async_send(flush_tasks_); } @@ -356,6 +374,8 @@ void PerIsolatePlatformData::PostDelayedTaskImpl( delayed->task = std::move(task); delayed->platform_data = shared_from_this(); delayed->timeout = delay_in_seconds; + // All foreground tasks are treated as user blocking tasks. + delayed->priority = v8::TaskPriority::kUserBlocking; locked.Push(std::move(delayed)); uv_async_send(flush_tasks_); } @@ -562,11 +582,13 @@ void NodePlatform::DrainTasks(Isolate* isolate) { bool PerIsolatePlatformData::FlushForegroundTasksInternal() { bool did_work = false; - std::queue> delayed_tasks_to_schedule = - foreground_delayed_tasks_.Lock().PopAll(); + auto delayed_tasks_to_schedule = foreground_delayed_tasks_.Lock().PopAll(); while (!delayed_tasks_to_schedule.empty()) { + // We have to use const_cast because std::priority_queue::top() does not + // return a movable item. std::unique_ptr delayed = - std::move(delayed_tasks_to_schedule.front()); + std::move(const_cast&>( + delayed_tasks_to_schedule.top())); delayed_tasks_to_schedule.pop(); did_work = true; @@ -591,17 +613,20 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() { }); } - std::queue> tasks; + TaskQueue::PriorityQueue tasks; { auto locked = foreground_tasks_.Lock(); tasks = locked.PopAll(); } while (!tasks.empty()) { - std::unique_ptr task = std::move(tasks.front()); + // We have to use const_cast because std::priority_queue::top() does not + // return a movable item. + std::unique_ptr entry = + std::move(const_cast&>(tasks.top())); tasks.pop(); did_work = true; - RunForegroundTask(std::move(task)); + RunForegroundTask(std::move(entry->task)); } return did_work; @@ -622,7 +647,7 @@ void NodePlatform::PostTaskOnWorkerThreadImpl( } fflush(stderr); } - worker_thread_task_runner_->PostTask(std::move(task)); + worker_thread_task_runner_->PostTask(priority, std::move(task), location); } void NodePlatform::PostDelayedTaskOnWorkerThreadImpl( @@ -642,8 +667,8 @@ void NodePlatform::PostDelayedTaskOnWorkerThreadImpl( } fflush(stderr); } - worker_thread_task_runner_->PostDelayedTask(std::move(task), - delay_in_seconds); + worker_thread_task_runner_->PostDelayedTask( + priority, std::move(task), location, delay_in_seconds); } IsolatePlatformDelegate* NodePlatform::ForIsolate(Isolate* isolate) { @@ -742,7 +767,8 @@ std::unique_ptr TaskQueue::Locked::Pop() { if (queue_->task_queue_.empty()) { return std::unique_ptr(nullptr); } - std::unique_ptr result = std::move(queue_->task_queue_.front()); + std::unique_ptr result = std::move( + std::move(const_cast&>(queue_->task_queue_.top()))); queue_->task_queue_.pop(); return result; } @@ -755,7 +781,8 @@ std::unique_ptr TaskQueue::Locked::BlockingPop() { if (queue_->stopped_) { return std::unique_ptr(nullptr); } - std::unique_ptr result = std::move(queue_->task_queue_.front()); + std::unique_ptr result = std::move( + std::move(const_cast&>(queue_->task_queue_.top()))); queue_->task_queue_.pop(); return result; } @@ -781,8 +808,8 @@ void TaskQueue::Locked::Stop() { } template -std::queue> TaskQueue::Locked::PopAll() { - std::queue> result; +TaskQueue::PriorityQueue TaskQueue::Locked::PopAll() { + TaskQueue::PriorityQueue result; result.swap(queue_->task_queue_); return result; } diff --git a/src/node_platform.h b/src/node_platform.h index e9553236d45f87..c751cdcda45c75 100644 --- a/src/node_platform.h +++ b/src/node_platform.h @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -19,9 +20,32 @@ class NodePlatform; class IsolateData; class PerIsolatePlatformData; +template +struct has_priority : std::false_type {}; + +template +struct has_priority().priority)>> + : std::true_type {}; + template class TaskQueue { public: + // If the entry type has a priority memeber, order the priority queue by + // that - higher priority first. Otherwise, maintain insertion order. + struct EntryCompare { + bool operator()(const std::unique_ptr& a, + const std::unique_ptr& b) const { + if constexpr (has_priority::value) { + return a->priority < b->priority; + } else { + return false; + } + } + }; + + using PriorityQueue = std::priority_queue, + std::vector>, + EntryCompare>; class Locked { public: void Push(std::unique_ptr task); @@ -30,7 +54,7 @@ class TaskQueue { void NotifyOfCompletion(); void BlockingDrain(); void Stop(); - std::queue> PopAll(); + PriorityQueue PopAll(); private: friend class TaskQueue; @@ -51,11 +75,19 @@ class TaskQueue { ConditionVariable tasks_drained_; int outstanding_tasks_; bool stopped_; - std::queue> task_queue_; + PriorityQueue task_queue_; +}; + +struct TaskQueueEntry { + std::unique_ptr task; + v8::TaskPriority priority; + TaskQueueEntry(std::unique_ptr t, v8::TaskPriority p) + : task(std::move(t)), priority(p) {} }; struct DelayedTask { std::unique_ptr task; + v8::TaskPriority priority; uv_timer_t timer; double timeout; std::shared_ptr platform_data; @@ -136,7 +168,7 @@ class PerIsolatePlatformData // When acquiring locks for both task queues, lock foreground_tasks_ // first then foreground_delayed_tasks_ to avoid deadlocks. - TaskQueue foreground_tasks_; + TaskQueue foreground_tasks_; TaskQueue foreground_delayed_tasks_; // Use a custom deleter because libuv needs to close the handle first. @@ -152,8 +184,13 @@ class WorkerThreadsTaskRunner { explicit WorkerThreadsTaskRunner(int thread_pool_size, PlatformDebugLogLevel debug_log_level); - void PostTask(std::unique_ptr task); - void PostDelayedTask(std::unique_ptr task, double delay_in_seconds); + void PostTask(v8::TaskPriority priority, + std::unique_ptr task, + const v8::SourceLocation& location); + void PostDelayedTask(v8::TaskPriority priority, + std::unique_ptr task, + const v8::SourceLocation& location, + double delay_in_seconds); void BlockingDrain(); void Shutdown(); @@ -169,7 +206,7 @@ class WorkerThreadsTaskRunner { // v8::Platform::PostDelayedTaskOnWorkerThread(), the DelayedTaskScheduler // thread will schedule a timer that pushes the delayed tasks back into this // queue when the timer expires. - TaskQueue pending_worker_tasks_; + TaskQueue pending_worker_tasks_; class DelayedTaskScheduler; std::unique_ptr delayed_task_scheduler_; From 3fff8ae3be9517ab22eee2ce0f2ff3e9f1624f1c Mon Sep 17 00:00:00 2001 From: Joyee Cheung Date: Fri, 2 May 2025 19:31:38 +0200 Subject: [PATCH 4/4] src: only block on user blocking worker tasks we should not be blocking on the worker tasks on the main thread in one go. Doing so leads to two problems: 1. If any of the worker tasks post another foreground task and wait for it to complete, and that foreground task is posted right after we flush the foreground task queue and before the foreground thread goes into sleep, we'll never be able to wake up to execute that foreground task and in turn the worker task will never complete, and we have a deadlock. 2. Worker tasks can be posted from any thread, not necessarily associated with the current isolate, and we can be blocking on a worker task that is associated with a completely unrelated isolate in the event loop. This is suboptimal. However, not blocking on the worker tasks at all can lead to loss of some critical user-blocking worker tasks e.g. wasm async compilation tasks, which should block the main thread until they are completed, as the documentation suggets. As a compromise, we currently only block on user-blocking tasks to reduce the chance of deadlocks while making sure that criticl user-blocking tasks are not lost. --- src/node_platform.cc | 54 ++++++++++++++++++++++++++++++++++---------- src/node_platform.h | 9 +++++--- 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/src/node_platform.cc b/src/node_platform.cc index 4320b74052ee73..a4def82142d8b7 100644 --- a/src/node_platform.cc +++ b/src/node_platform.cc @@ -78,7 +78,10 @@ static void PlatformWorkerThread(void* data) { fflush(stderr); } entry->task->Run(); - pending_worker_tasks->Lock().NotifyOfCompletion(); + // See NodePlatform::DrainTasks(). + if (entry->is_outstanding()) { + pending_worker_tasks->Lock().NotifyOfOutstandingCompletion(); + } } } @@ -209,8 +212,10 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { static void RunTask(uv_timer_t* timer) { DelayedTaskScheduler* scheduler = ContainerOf(&DelayedTaskScheduler::loop_, timer->loop); - scheduler->pending_worker_tasks_->Lock().Push( - scheduler->TakeTimerTask(timer)); + auto entry = scheduler->TakeTimerTask(timer); + bool is_outstanding = entry->is_outstanding(); + scheduler->pending_worker_tasks_->Lock().Push(std::move(entry), + is_outstanding); } std::unique_ptr TakeTimerTask(uv_timer_t* timer) { @@ -277,7 +282,8 @@ void WorkerThreadsTaskRunner::PostTask(v8::TaskPriority priority, std::unique_ptr task, const v8::SourceLocation& location) { auto entry = std::make_unique(std::move(task), priority); - pending_worker_tasks_.Lock().Push(std::move(entry)); + bool is_outstanding = entry->is_outstanding(); + pending_worker_tasks_.Lock().Push(std::move(entry), is_outstanding); } void WorkerThreadsTaskRunner::PostDelayedTask( @@ -574,7 +580,25 @@ void NodePlatform::DrainTasks(Isolate* isolate) { if (!per_isolate) return; do { - // Worker tasks aren't associated with an Isolate. + // FIXME(54918): we should not be blocking on the worker tasks on the + // main thread in one go. Doing so leads to two problems: + // 1. If any of the worker tasks post another foreground task and wait + // for it to complete, and that foreground task is posted right after + // we flush the foreground task queue and before the foreground thread + // goes into sleep, we'll never be able to wake up to execute that + // foreground task and in turn the worker task will never complete, and + // we have a deadlock. + // 2. Worker tasks can be posted from any thread, not necessarily associated + // with the current isolate, and we can be blocking on a worker task that + // is associated with a completely unrelated isolate in the event loop. + // This is suboptimal. + // + // However, not blocking on the worker tasks at all can lead to loss of some + // critical user-blocking worker tasks e.g. wasm async compilation tasks, + // which should block the main thread until they are completed, as the + // documentation suggets. As a compromise, we currently only block on + // user-blocking tasks to reduce the chance of deadlocks while making sure + // that criticl user-blocking tasks are not lost. worker_thread_task_runner_->BlockingDrain(); } while (per_isolate->FlushForegroundTasksInternal()); } @@ -748,16 +772,22 @@ v8::PageAllocator* NodePlatform::GetPageAllocator() { template TaskQueue::TaskQueue() - : lock_(), tasks_available_(), tasks_drained_(), - outstanding_tasks_(0), stopped_(false), task_queue_() { } + : lock_(), + tasks_available_(), + outstanding_tasks_drained_(), + outstanding_tasks_(0), + stopped_(false), + task_queue_() {} template TaskQueue::Locked::Locked(TaskQueue* queue) : queue_(queue), lock_(queue->lock_) {} template -void TaskQueue::Locked::Push(std::unique_ptr task) { - queue_->outstanding_tasks_++; +void TaskQueue::Locked::Push(std::unique_ptr task, bool outstanding) { + if (outstanding) { + queue_->outstanding_tasks_++; + } queue_->task_queue_.push(std::move(task)); queue_->tasks_available_.Signal(lock_); } @@ -788,16 +818,16 @@ std::unique_ptr TaskQueue::Locked::BlockingPop() { } template -void TaskQueue::Locked::NotifyOfCompletion() { +void TaskQueue::Locked::NotifyOfOutstandingCompletion() { if (--queue_->outstanding_tasks_ == 0) { - queue_->tasks_drained_.Broadcast(lock_); + queue_->outstanding_tasks_drained_.Broadcast(lock_); } } template void TaskQueue::Locked::BlockingDrain() { while (queue_->outstanding_tasks_ > 0) { - queue_->tasks_drained_.Wait(lock_); + queue_->outstanding_tasks_drained_.Wait(lock_); } } diff --git a/src/node_platform.h b/src/node_platform.h index c751cdcda45c75..cee61eecf1f864 100644 --- a/src/node_platform.h +++ b/src/node_platform.h @@ -48,10 +48,10 @@ class TaskQueue { EntryCompare>; class Locked { public: - void Push(std::unique_ptr task); + void Push(std::unique_ptr task, bool outstanding = false); std::unique_ptr Pop(); std::unique_ptr BlockingPop(); - void NotifyOfCompletion(); + void NotifyOfOutstandingCompletion(); void BlockingDrain(); void Stop(); PriorityQueue PopAll(); @@ -72,7 +72,7 @@ class TaskQueue { private: Mutex lock_; ConditionVariable tasks_available_; - ConditionVariable tasks_drained_; + ConditionVariable outstanding_tasks_drained_; int outstanding_tasks_; bool stopped_; PriorityQueue task_queue_; @@ -83,6 +83,9 @@ struct TaskQueueEntry { v8::TaskPriority priority; TaskQueueEntry(std::unique_ptr t, v8::TaskPriority p) : task(std::move(t)), priority(p) {} + inline bool is_outstanding() const { + return priority == v8::TaskPriority::kUserBlocking; + } }; struct DelayedTask {