1212#include " tracing/traced_value.h"
1313#include " util-inl.h"
1414
15+ #include < atomic>
1516#include < memory>
1617
1718struct node_napi_env__ : public napi_env__ {
@@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource {
137138 *v8::String::Utf8Value (env_->isolate, name)),
138139 thread_count(thread_count_),
139140 is_closing(false ),
141+ dispatch_state(kDispatchIdle ),
140142 context(context_),
141143 max_queue_size(max_queue_size_),
142144 env(env_),
@@ -176,10 +178,8 @@ class ThreadSafeFunction : public node::AsyncResource {
176178 return napi_closing;
177179 }
178180 } else {
179- if (uv_async_send (&async) != 0 ) {
180- return napi_generic_failure;
181- }
182181 queue.push (data);
182+ Send ();
183183 return napi_ok;
184184 }
185185 }
@@ -211,9 +211,7 @@ class ThreadSafeFunction : public node::AsyncResource {
211211 if (is_closing && max_queue_size > 0 ) {
212212 cond->Signal (lock);
213213 }
214- if (uv_async_send (&async) != 0 ) {
215- return napi_generic_failure;
216- }
214+ Send ();
217215 }
218216 }
219217
@@ -238,7 +236,6 @@ class ThreadSafeFunction : public node::AsyncResource {
238236 cond = std::make_unique<node::ConditionVariable>();
239237 }
240238 if (max_queue_size == 0 || cond) {
241- CHECK_EQ (0 , uv_idle_init (loop, &idle));
242239 return napi_ok;
243240 }
244241
@@ -263,21 +260,46 @@ class ThreadSafeFunction : public node::AsyncResource {
263260
264261 napi_status Unref () {
265262 uv_unref (reinterpret_cast <uv_handle_t *>(&async));
266- uv_unref (reinterpret_cast <uv_handle_t *>(&idle));
267263
268264 return napi_ok;
269265 }
270266
271267 napi_status Ref () {
272268 uv_ref (reinterpret_cast <uv_handle_t *>(&async));
273- uv_ref (reinterpret_cast <uv_handle_t *>(&idle));
274269
275270 return napi_ok;
276271 }
277272
278- void DispatchOne () {
273+ inline void * Context () {
274+ return context;
275+ }
276+
277+ protected:
278+ void Dispatch () {
279+ bool has_more = true ;
280+
281+ // Limit maximum synchronous iteration count to prevent event loop
282+ // starvation. See `src/node_messaging.cc` for an inspiration.
283+ unsigned int iterations_left = kMaxIterationCount ;
284+ while (has_more && --iterations_left != 0 ) {
285+ dispatch_state = kDispatchRunning ;
286+ has_more = DispatchOne ();
287+
288+ // Send() was called while we were executing the JS function
289+ if (dispatch_state.exchange (kDispatchIdle ) != kDispatchRunning ) {
290+ has_more = true ;
291+ }
292+ }
293+
294+ if (has_more) {
295+ Send ();
296+ }
297+ }
298+
299+ bool DispatchOne () {
279300 void * data = nullptr ;
280301 bool popped_value = false ;
302+ bool has_more = false ;
281303
282304 {
283305 node::Mutex::ScopedLock lock (this ->mutex );
@@ -302,9 +324,9 @@ class ThreadSafeFunction : public node::AsyncResource {
302324 cond->Signal (lock);
303325 }
304326 CloseHandlesAndMaybeDelete ();
305- } else {
306- CHECK_EQ (0 , uv_idle_stop (&idle));
307327 }
328+ } else {
329+ has_more = true ;
308330 }
309331 }
310332 }
@@ -322,6 +344,8 @@ class ThreadSafeFunction : public node::AsyncResource {
322344 call_js_cb (env, js_callback, context, data);
323345 });
324346 }
347+
348+ return has_more;
325349 }
326350
327351 void Finalize () {
@@ -335,10 +359,6 @@ class ThreadSafeFunction : public node::AsyncResource {
335359 EmptyQueueAndDelete ();
336360 }
337361
338- inline void * Context () {
339- return context;
340- }
341-
342362 void CloseHandlesAndMaybeDelete (bool set_closing = false ) {
343363 v8::HandleScope scope (env->isolate );
344364 if (set_closing) {
@@ -358,18 +378,20 @@ class ThreadSafeFunction : public node::AsyncResource {
358378 ThreadSafeFunction* ts_fn =
359379 node::ContainerOf (&ThreadSafeFunction::async,
360380 reinterpret_cast <uv_async_t *>(handle));
361- v8::HandleScope scope (ts_fn->env ->isolate );
362- ts_fn->env ->node_env ()->CloseHandle (
363- reinterpret_cast <uv_handle_t *>(&ts_fn->idle ),
364- [](uv_handle_t * handle) -> void {
365- ThreadSafeFunction* ts_fn =
366- node::ContainerOf (&ThreadSafeFunction::idle,
367- reinterpret_cast <uv_idle_t *>(handle));
368- ts_fn->Finalize ();
369- });
381+ ts_fn->Finalize ();
370382 });
371383 }
372384
385+ void Send () {
386+ // Ask currently running Dispatch() to make one more iteration
387+ unsigned char current_state = dispatch_state.fetch_or (kDispatchPending );
388+ if ((current_state & kDispatchRunning ) == kDispatchRunning ) {
389+ return ;
390+ }
391+
392+ CHECK_EQ (0 , uv_async_send (&async));
393+ }
394+
373395 // Default way of calling into JavaScript. Used when ThreadSafeFunction is
374396 // without a call_js_cb_.
375397 static void CallJs (napi_env env, napi_value cb, void * context, void * data) {
@@ -393,16 +415,10 @@ class ThreadSafeFunction : public node::AsyncResource {
393415 }
394416 }
395417
396- static void IdleCb (uv_idle_t * idle) {
397- ThreadSafeFunction* ts_fn =
398- node::ContainerOf (&ThreadSafeFunction::idle, idle);
399- ts_fn->DispatchOne ();
400- }
401-
402418 static void AsyncCb (uv_async_t * async) {
403419 ThreadSafeFunction* ts_fn =
404420 node::ContainerOf (&ThreadSafeFunction::async, async);
405- CHECK_EQ ( 0 , uv_idle_start (& ts_fn->idle , IdleCb) );
421+ ts_fn->Dispatch ( );
406422 }
407423
408424 static void Cleanup (void * data) {
@@ -411,14 +427,20 @@ class ThreadSafeFunction : public node::AsyncResource {
411427 }
412428
413429 private:
430+ static const unsigned char kDispatchIdle = 0 ;
431+ static const unsigned char kDispatchRunning = 1 << 0 ;
432+ static const unsigned char kDispatchPending = 1 << 1 ;
433+
434+ static const unsigned int kMaxIterationCount = 1000 ;
435+
414436 // These are variables protected by the mutex.
415437 node::Mutex mutex;
416438 std::unique_ptr<node::ConditionVariable> cond;
417439 std::queue<void *> queue;
418440 uv_async_t async;
419- uv_idle_t idle;
420441 size_t thread_count;
421442 bool is_closing;
443+ std::atomic_uchar dispatch_state;
422444
423445 // These are variables set once, upon creation, and then never again, which
424446 // means we don't need the mutex to read them.
0 commit comments