Skip to content

Commit 79a45e2

Browse files
committed
src: add a threadsafe variant of SetImmediate()
Add a variant of `SetImmediate()` that can be called from any thread. This allows removing the `AsyncRequest` abstraction and replaces it with a more generic mechanism. PR-URL: #31386 Refs: openjs-foundation/summit#240 Reviewed-By: Gireesh Punathil <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Colin Ihrig <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent 29693b5 commit 79a45e2

File tree

3 files changed

+43
-7
lines changed

3 files changed

+43
-7
lines changed

src/env-inl.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,13 +748,15 @@ Environment::NativeImmediateQueue::Shift() {
748748
if (!head_)
749749
tail_ = nullptr; // The queue is now empty.
750750
}
751+
size_--;
751752
return ret;
752753
}
753754

754755
void Environment::NativeImmediateQueue::Push(
755756
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
756757
NativeImmediateCallback* prev_tail = tail_;
757758

759+
size_++;
758760
tail_ = cb.get();
759761
if (prev_tail != nullptr)
760762
prev_tail->set_next(std::move(cb));
@@ -774,6 +776,10 @@ void Environment::NativeImmediateQueue::ConcatMove(
774776
other.size_ = 0;
775777
}
776778

779+
size_t Environment::NativeImmediateQueue::size() const {
780+
return size_.load();
781+
}
782+
777783
template <typename Fn>
778784
void Environment::CreateImmediate(Fn&& cb, bool ref) {
779785
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
@@ -795,6 +801,17 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
795801
CreateImmediate(std::move(cb), false);
796802
}
797803

804+
template <typename Fn>
805+
void Environment::SetImmediateThreadsafe(Fn&& cb) {
806+
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
807+
std::move(cb), false);
808+
{
809+
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
810+
native_immediates_threadsafe_.Push(std::move(callback));
811+
}
812+
uv_async_send(&task_queues_async_);
813+
}
814+
798815
Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
799816
: refed_(refed) {}
800817

@@ -1164,7 +1181,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) {
11641181
inline void Environment::RegisterFinalizationGroupForCleanup(
11651182
v8::Local<v8::FinalizationGroup> group) {
11661183
cleanup_finalization_groups_.emplace_back(isolate(), group);
1167-
uv_async_send(&cleanup_finalization_groups_async_);
1184+
uv_async_send(&task_queues_async_);
11681185
}
11691186

11701187
size_t CleanupHookCallback::Hash::operator()(

src/env.cc

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -460,15 +460,16 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
460460
uv_check_init(event_loop(), &idle_check_handle_);
461461
uv_async_init(
462462
event_loop(),
463-
&cleanup_finalization_groups_async_,
463+
&task_queues_async_,
464464
[](uv_async_t* async) {
465465
Environment* env = ContainerOf(
466-
&Environment::cleanup_finalization_groups_async_, async);
466+
&Environment::task_queues_async_, async);
467467
env->CleanupFinalizationGroups();
468+
env->RunAndClearNativeImmediates();
468469
});
469470
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
470471
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
471-
uv_unref(reinterpret_cast<uv_handle_t*>(&cleanup_finalization_groups_async_));
472+
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
472473

473474
thread_stopper()->Install(
474475
this, static_cast<void*>(this), [](uv_async_t* handle) {
@@ -532,7 +533,7 @@ void Environment::RegisterHandleCleanups() {
532533
close_and_finish,
533534
nullptr);
534535
RegisterHandleCleanup(
535-
reinterpret_cast<uv_handle_t*>(&cleanup_finalization_groups_async_),
536+
reinterpret_cast<uv_handle_t*>(&task_queues_async_),
536537
close_and_finish,
537538
nullptr);
538539
}
@@ -663,6 +664,15 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
663664
"RunAndClearNativeImmediates", this);
664665
size_t ref_count = 0;
665666

667+
// It is safe to check .size() first, because there is a causal relationship
668+
// between pushes to the threadsafe and this function being called.
669+
// For the common case, it's worth checking the size first before establishing
670+
// a mutex lock.
671+
if (native_immediates_threadsafe_.size() > 0) {
672+
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
673+
native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_));
674+
}
675+
666676
NativeImmediateQueue queue;
667677
queue.ConcatMove(std::move(native_immediates_));
668678

@@ -1084,7 +1094,7 @@ void Environment::CleanupFinalizationGroups() {
10841094
if (try_catch.HasCaught() && !try_catch.HasTerminated())
10851095
errors::TriggerUncaughtException(isolate(), try_catch);
10861096
// Re-schedule the execution of the remainder of the queue.
1087-
uv_async_send(&cleanup_finalization_groups_async_);
1097+
uv_async_send(&task_queues_async_);
10881098
return;
10891099
}
10901100
}

src/env.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1199,6 +1199,9 @@ class Environment : public MemoryRetainer {
11991199
inline void SetImmediate(Fn&& cb);
12001200
template <typename Fn>
12011201
inline void SetUnrefImmediate(Fn&& cb);
1202+
template <typename Fn>
1203+
// This behaves like SetImmediate() but can be called from any thread.
1204+
inline void SetImmediateThreadsafe(Fn&& cb);
12021205
// This needs to be available for the JS-land setImmediate().
12031206
void ToggleImmediateRef(bool ref);
12041207

@@ -1284,7 +1287,7 @@ class Environment : public MemoryRetainer {
12841287
uv_idle_t immediate_idle_handle_;
12851288
uv_prepare_t idle_prepare_handle_;
12861289
uv_check_t idle_check_handle_;
1287-
uv_async_t cleanup_finalization_groups_async_;
1290+
uv_async_t task_queues_async_;
12881291
bool profiler_idle_notifier_started_ = false;
12891292

12901293
AsyncHooks async_hooks_;
@@ -1436,12 +1439,18 @@ class Environment : public MemoryRetainer {
14361439
// 'other' afterwards.
14371440
inline void ConcatMove(NativeImmediateQueue&& other);
14381441

1442+
// size() is atomic and may be called from any thread.
1443+
inline size_t size() const;
1444+
14391445
private:
1446+
std::atomic<size_t> size_ {0};
14401447
std::unique_ptr<NativeImmediateCallback> head_;
14411448
NativeImmediateCallback* tail_ = nullptr;
14421449
};
14431450

14441451
NativeImmediateQueue native_immediates_;
1452+
Mutex native_immediates_threadsafe_mutex_;
1453+
NativeImmediateQueue native_immediates_threadsafe_;
14451454

14461455
void RunAndClearNativeImmediates(bool only_refed = false);
14471456
static void CheckImmediate(uv_check_t* handle);

0 commit comments

Comments
 (0)