Skip to content

Commit c4c2391

Browse files
joyeecheungaduh95
authored andcommitted
src: use priority queue to run worker tasks
According to the documentation, the v8 tasks should be executed based on priority. Previously we always execute the tasks in FIFO order, this changes the NodePlatform implementation to execute the higher priority tasks first. The tasks used to schedule timers for the delayed tasks are run in FIFO order since priority is irrelavent for the timer scheduling part while the tasks unwrapped by the timer callbacks are still ordered by priority. PR-URL: #58047 Refs: #47452 Refs: #54918 Reviewed-By: Stephen Belanger <[email protected]>
1 parent 3ac8c68 commit c4c2391

File tree

2 files changed

+108
-45
lines changed

2 files changed

+108
-45
lines changed

src/node_platform.cc

Lines changed: 66 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ using v8::TaskPriority;
1818
namespace {
1919

2020
struct PlatformWorkerData {
21-
TaskQueue<Task>* task_queue;
21+
TaskQueue<TaskQueueEntry>* task_queue;
2222
Mutex* platform_workers_mutex;
2323
ConditionVariable* platform_workers_ready;
2424
int* pending_platform_workers;
@@ -52,7 +52,7 @@ static void PlatformWorkerThread(void* data) {
5252
std::unique_ptr<PlatformWorkerData>
5353
worker_data(static_cast<PlatformWorkerData*>(data));
5454

55-
TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
55+
TaskQueue<TaskQueueEntry>* pending_worker_tasks = worker_data->task_queue;
5656
TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
5757
"PlatformWorkerThread");
5858

@@ -66,16 +66,17 @@ static void PlatformWorkerThread(void* data) {
6666
bool debug_log_enabled =
6767
worker_data->debug_log_level != PlatformDebugLogLevel::kNone;
6868
int id = worker_data->id;
69-
while (std::unique_ptr<Task> task =
69+
while (std::unique_ptr<TaskQueueEntry> entry =
7070
pending_worker_tasks->Lock().BlockingPop()) {
7171
if (debug_log_enabled) {
7272
fprintf(stderr,
73-
"\nPlatformWorkerThread %d running task %p\n",
73+
"\nPlatformWorkerThread %d running task %p %s\n",
7474
id,
75-
task.get());
75+
entry->task.get(),
76+
GetTaskPriorityName(entry->priority));
7677
fflush(stderr);
7778
}
78-
task->Run();
79+
entry->task->Run();
7980
pending_worker_tasks->Lock().NotifyOfCompletion();
8081
}
8182
}
@@ -91,8 +92,8 @@ static int GetActualThreadPoolSize(int thread_pool_size) {
9192

9293
class WorkerThreadsTaskRunner::DelayedTaskScheduler {
9394
public:
94-
explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
95-
: pending_worker_tasks_(tasks) {}
95+
explicit DelayedTaskScheduler(TaskQueue<TaskQueueEntry>* tasks)
96+
: pending_worker_tasks_(tasks) {}
9697

9798
std::unique_ptr<uv_thread_t> Start() {
9899
auto start_thread = [](void* data) {
@@ -106,16 +107,21 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
106107
return t;
107108
}
108109

109-
void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
110+
void PostDelayedTask(v8::TaskPriority priority,
111+
std::unique_ptr<Task> task,
112+
double delay_in_seconds) {
110113
auto locked = tasks_.Lock();
111114

115+
auto entry = std::make_unique<TaskQueueEntry>(std::move(task), priority);
116+
auto delayed = std::make_unique<ScheduleTask>(
117+
this, std::move(entry), delay_in_seconds);
118+
112119
// The delayed task scheuler is on is own thread with its own loop that
113120
// runs the timers for the scheduled tasks to pop the original task back
114121
// into the the worker task queue. This first pushes the tasks that
115122
// schedules the timers into the local task queue that will be flushed
116123
// by the local event loop.
117-
locked.Push(std::make_unique<ScheduleTask>(
118-
this, std::move(task), delay_in_seconds));
124+
locked.Push(std::move(delayed));
119125
uv_async_send(&flush_tasks_);
120126
}
121127

@@ -143,10 +149,12 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
143149
DelayedTaskScheduler* scheduler =
144150
ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
145151

146-
std::queue<std::unique_ptr<Task>> tasks_to_run =
147-
scheduler->tasks_.Lock().PopAll();
152+
auto tasks_to_run = scheduler->tasks_.Lock().PopAll();
148153
while (!tasks_to_run.empty()) {
149-
std::unique_ptr<Task> task = std::move(tasks_to_run.front());
154+
// We have to use const_cast because std::priority_queue::top() does not
155+
// return a movable item.
156+
std::unique_ptr<Task> task =
157+
std::move(const_cast<std::unique_ptr<Task>&>(tasks_to_run.top()));
150158
tasks_to_run.pop();
151159
// This runs either the ScheduleTasks that scheduels the timers to
152160
// pop the tasks back into the worker task runner queue, or the
@@ -176,7 +184,7 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
176184
class ScheduleTask : public Task {
177185
public:
178186
ScheduleTask(DelayedTaskScheduler* scheduler,
179-
std::unique_ptr<Task> task,
187+
std::unique_ptr<TaskQueueEntry> task,
180188
double delay_in_seconds)
181189
: scheduler_(scheduler),
182190
task_(std::move(task)),
@@ -193,7 +201,7 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
193201

194202
private:
195203
DelayedTaskScheduler* scheduler_;
196-
std::unique_ptr<Task> task_;
204+
std::unique_ptr<TaskQueueEntry> task_;
197205
double delay_in_seconds_;
198206
};
199207

@@ -204,20 +212,21 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
204212
scheduler->TakeTimerTask(timer));
205213
}
206214

207-
std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
208-
std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
215+
std::unique_ptr<TaskQueueEntry> TakeTimerTask(uv_timer_t* timer) {
216+
std::unique_ptr<TaskQueueEntry> task_entry(
217+
static_cast<TaskQueueEntry*>(timer->data));
209218
uv_timer_stop(timer);
210219
uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
211220
delete reinterpret_cast<uv_timer_t*>(handle);
212221
});
213222
timers_.erase(timer);
214-
return task;
223+
return task_entry;
215224
}
216225

217226
uv_sem_t ready_;
218227
// Task queue in the worker thread task runner, we push the delayed task back
219228
// to it when the timer expires.
220-
TaskQueue<Task>* pending_worker_tasks_;
229+
TaskQueue<TaskQueueEntry>* pending_worker_tasks_;
221230

222231
// Locally scheduled tasks to be poped into the worker task runner queue.
223232
// It is flushed whenever the next closest timer expires.
@@ -263,13 +272,20 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(
263272
}
264273
}
265274

266-
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
267-
pending_worker_tasks_.Lock().Push(std::move(task));
275+
void WorkerThreadsTaskRunner::PostTask(v8::TaskPriority priority,
276+
std::unique_ptr<v8::Task> task,
277+
const v8::SourceLocation& location) {
278+
auto entry = std::make_unique<TaskQueueEntry>(std::move(task), priority);
279+
pending_worker_tasks_.Lock().Push(std::move(entry));
268280
}
269281

270-
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
271-
double delay_in_seconds) {
272-
delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
282+
void WorkerThreadsTaskRunner::PostDelayedTask(
283+
v8::TaskPriority priority,
284+
std::unique_ptr<v8::Task> task,
285+
const v8::SourceLocation& location,
286+
double delay_in_seconds) {
287+
delayed_task_scheduler_->PostDelayedTask(
288+
priority, std::move(task), delay_in_seconds);
273289
}
274290

275291
void WorkerThreadsTaskRunner::BlockingDrain() {
@@ -326,7 +342,9 @@ void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
326342

327343
auto locked = foreground_tasks_.Lock();
328344
if (flush_tasks_ == nullptr) return;
329-
locked.Push(std::move(task));
345+
// All foreground tasks are treated as user blocking tasks.
346+
locked.Push(std::make_unique<TaskQueueEntry>(
347+
std::move(task), v8::TaskPriority::kUserBlocking));
330348
uv_async_send(flush_tasks_);
331349
}
332350

@@ -349,6 +367,8 @@ void PerIsolatePlatformData::PostDelayedTask(
349367
delayed->task = std::move(task);
350368
delayed->platform_data = shared_from_this();
351369
delayed->timeout = delay_in_seconds;
370+
// All foreground tasks are treated as user blocking tasks.
371+
delayed->priority = v8::TaskPriority::kUserBlocking;
352372
locked.Push(std::move(delayed));
353373
uv_async_send(flush_tasks_);
354374
}
@@ -553,11 +573,13 @@ void NodePlatform::DrainTasks(Isolate* isolate) {
553573
bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
554574
bool did_work = false;
555575

556-
std::queue<std::unique_ptr<DelayedTask>> delayed_tasks_to_schedule =
557-
foreground_delayed_tasks_.Lock().PopAll();
576+
auto delayed_tasks_to_schedule = foreground_delayed_tasks_.Lock().PopAll();
558577
while (!delayed_tasks_to_schedule.empty()) {
578+
// We have to use const_cast because std::priority_queue::top() does not
579+
// return a movable item.
559580
std::unique_ptr<DelayedTask> delayed =
560-
std::move(delayed_tasks_to_schedule.front());
581+
std::move(const_cast<std::unique_ptr<DelayedTask>&>(
582+
delayed_tasks_to_schedule.top()));
561583
delayed_tasks_to_schedule.pop();
562584

563585
did_work = true;
@@ -582,17 +604,20 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
582604
});
583605
}
584606

585-
std::queue<std::unique_ptr<Task>> tasks;
607+
TaskQueue<TaskQueueEntry>::PriorityQueue tasks;
586608
{
587609
auto locked = foreground_tasks_.Lock();
588610
tasks = locked.PopAll();
589611
}
590612

591613
while (!tasks.empty()) {
592-
std::unique_ptr<Task> task = std::move(tasks.front());
614+
// We have to use const_cast because std::priority_queue::top() does not
615+
// return a movable item.
616+
std::unique_ptr<TaskQueueEntry> entry =
617+
std::move(const_cast<std::unique_ptr<TaskQueueEntry>&>(tasks.top()));
593618
tasks.pop();
594619
did_work = true;
595-
RunForegroundTask(std::move(task));
620+
RunForegroundTask(std::move(entry->task));
596621
}
597622

598623
return did_work;
@@ -613,7 +638,7 @@ void NodePlatform::PostTaskOnWorkerThreadImpl(
613638
}
614639
fflush(stderr);
615640
}
616-
worker_thread_task_runner_->PostTask(std::move(task));
641+
worker_thread_task_runner_->PostTask(priority, std::move(task), location);
617642
}
618643

619644
void NodePlatform::PostDelayedTaskOnWorkerThreadImpl(
@@ -633,8 +658,8 @@ void NodePlatform::PostDelayedTaskOnWorkerThreadImpl(
633658
}
634659
fflush(stderr);
635660
}
636-
worker_thread_task_runner_->PostDelayedTask(std::move(task),
637-
delay_in_seconds);
661+
worker_thread_task_runner_->PostDelayedTask(
662+
priority, std::move(task), location, delay_in_seconds);
638663
}
639664

640665
IsolatePlatformDelegate* NodePlatform::ForIsolate(Isolate* isolate) {
@@ -733,7 +758,8 @@ std::unique_ptr<T> TaskQueue<T>::Locked::Pop() {
733758
if (queue_->task_queue_.empty()) {
734759
return std::unique_ptr<T>(nullptr);
735760
}
736-
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
761+
std::unique_ptr<T> result = std::move(
762+
std::move(const_cast<std::unique_ptr<T>&>(queue_->task_queue_.top())));
737763
queue_->task_queue_.pop();
738764
return result;
739765
}
@@ -746,7 +772,8 @@ std::unique_ptr<T> TaskQueue<T>::Locked::BlockingPop() {
746772
if (queue_->stopped_) {
747773
return std::unique_ptr<T>(nullptr);
748774
}
749-
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
775+
std::unique_ptr<T> result = std::move(
776+
std::move(const_cast<std::unique_ptr<T>&>(queue_->task_queue_.top())));
750777
queue_->task_queue_.pop();
751778
return result;
752779
}
@@ -772,8 +799,8 @@ void TaskQueue<T>::Locked::Stop() {
772799
}
773800

774801
template <class T>
775-
std::queue<std::unique_ptr<T>> TaskQueue<T>::Locked::PopAll() {
776-
std::queue<std::unique_ptr<T>> result;
802+
TaskQueue<T>::PriorityQueue TaskQueue<T>::Locked::PopAll() {
803+
TaskQueue<T>::PriorityQueue result;
777804
result.swap(queue_->task_queue_);
778805
return result;
779806
}

src/node_platform.h

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
55

66
#include <queue>
7+
#include <type_traits>
78
#include <unordered_map>
89
#include <vector>
910
#include <functional>
@@ -19,9 +20,32 @@ class NodePlatform;
1920
class IsolateData;
2021
class PerIsolatePlatformData;
2122

23+
template <typename, typename = void>
24+
struct has_priority : std::false_type {};
25+
26+
template <typename T>
27+
struct has_priority<T, std::void_t<decltype(std::declval<T>().priority)>>
28+
: std::true_type {};
29+
2230
template <class T>
2331
class TaskQueue {
2432
public:
33+
// If the entry type has a priority memeber, order the priority queue by
34+
// that - higher priority first. Otherwise, maintain insertion order.
35+
struct EntryCompare {
36+
bool operator()(const std::unique_ptr<T>& a,
37+
const std::unique_ptr<T>& b) const {
38+
if constexpr (has_priority<T>::value) {
39+
return a->priority < b->priority;
40+
} else {
41+
return false;
42+
}
43+
}
44+
};
45+
46+
using PriorityQueue = std::priority_queue<std::unique_ptr<T>,
47+
std::vector<std::unique_ptr<T>>,
48+
EntryCompare>;
2549
class Locked {
2650
public:
2751
void Push(std::unique_ptr<T> task);
@@ -30,7 +54,7 @@ class TaskQueue {
3054
void NotifyOfCompletion();
3155
void BlockingDrain();
3256
void Stop();
33-
std::queue<std::unique_ptr<T>> PopAll();
57+
PriorityQueue PopAll();
3458

3559
private:
3660
friend class TaskQueue;
@@ -51,11 +75,19 @@ class TaskQueue {
5175
ConditionVariable tasks_drained_;
5276
int outstanding_tasks_;
5377
bool stopped_;
54-
std::queue<std::unique_ptr<T>> task_queue_;
78+
PriorityQueue task_queue_;
79+
};
80+
81+
struct TaskQueueEntry {
82+
std::unique_ptr<v8::Task> task;
83+
v8::TaskPriority priority;
84+
TaskQueueEntry(std::unique_ptr<v8::Task> t, v8::TaskPriority p)
85+
: task(std::move(t)), priority(p) {}
5586
};
5687

5788
struct DelayedTask {
5889
std::unique_ptr<v8::Task> task;
90+
v8::TaskPriority priority;
5991
uv_timer_t timer;
6092
double timeout;
6193
std::shared_ptr<PerIsolatePlatformData> platform_data;
@@ -128,7 +160,7 @@ class PerIsolatePlatformData :
128160

129161
// When acquiring locks for both task queues, lock foreground_tasks_
130162
// first then foreground_delayed_tasks_ to avoid deadlocks.
131-
TaskQueue<v8::Task> foreground_tasks_;
163+
TaskQueue<TaskQueueEntry> foreground_tasks_;
132164
TaskQueue<DelayedTask> foreground_delayed_tasks_;
133165

134166
// Use a custom deleter because libuv needs to close the handle first.
@@ -144,8 +176,12 @@ class WorkerThreadsTaskRunner {
144176
explicit WorkerThreadsTaskRunner(int thread_pool_size,
145177
PlatformDebugLogLevel debug_log_level);
146178

147-
void PostTask(std::unique_ptr<v8::Task> task);
148-
void PostDelayedTask(std::unique_ptr<v8::Task> task,
179+
void PostTask(v8::TaskPriority priority,
180+
std::unique_ptr<v8::Task> task,
181+
const v8::SourceLocation& location);
182+
void PostDelayedTask(v8::TaskPriority priority,
183+
std::unique_ptr<v8::Task> task,
184+
const v8::SourceLocation& location,
149185
double delay_in_seconds);
150186

151187
void BlockingDrain();
@@ -162,7 +198,7 @@ class WorkerThreadsTaskRunner {
162198
// v8::Platform::PostDelayedTaskOnWorkerThread(), the DelayedTaskScheduler
163199
// thread will schedule a timer that pushes the delayed tasks back into this
164200
// queue when the timer expires.
165-
TaskQueue<v8::Task> pending_worker_tasks_;
201+
TaskQueue<TaskQueueEntry> pending_worker_tasks_;
166202

167203
class DelayedTaskScheduler;
168204
std::unique_ptr<DelayedTaskScheduler> delayed_task_scheduler_;

0 commit comments

Comments
 (0)