@@ -35,7 +35,7 @@ const {
3535const { exitCodes : { kUnsettledTopLevelAwait } } = internalBinding ( 'errors' ) ;
3636const { URL } = require ( 'internal/url' ) ;
3737const { canParse : URLCanParse } = internalBinding ( 'url' ) ;
38- const { receiveMessageOnPort, isMainThread } = require ( 'worker_threads' ) ;
38+ const { receiveMessageOnPort } = require ( 'worker_threads' ) ;
3939const {
4040 isAnyArrayBuffer,
4141 isArrayBufferView,
@@ -482,8 +482,6 @@ class HooksProxy {
482482 */
483483 #worker;
484484
485- #portToHooksThread;
486-
487485 /**
488486 * The last notification ID received from the worker. This is used to detect
489487 * if the worker has already sent a notification before putting the main
@@ -501,38 +499,26 @@ class HooksProxy {
501499 #isReady = false ;
502500
503501 constructor ( ) {
504- const { InternalWorker, hooksPort } = require ( 'internal/worker' ) ;
502+ const { InternalWorker } = require ( 'internal/worker' ) ;
503+ MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
504+
505505 const lock = new SharedArrayBuffer ( SHARED_MEMORY_BYTE_LENGTH ) ;
506506 this . #lock = new Int32Array ( lock ) ;
507507
508- if ( isMainThread ) {
509- // Main thread is the only one that creates the internal single hooks worker
510- this . #worker = new InternalWorker ( loaderWorkerId , {
511- stderr : false ,
512- stdin : false ,
513- stdout : false ,
514- trackUnmanagedFds : false ,
515- workerData : {
516- lock,
517- } ,
518- } ) ;
519- this . #worker. unref ( ) ; // ! Allows the process to eventually exit.
520- this . #worker. on ( 'exit' , process . exit ) ;
521- this . #portToHooksThread = this . #worker;
522- } else {
523- this . #portToHooksThread = hooksPort ;
524- }
508+ this . #worker = new InternalWorker ( loaderWorkerId , {
509+ stderr : false ,
510+ stdin : false ,
511+ stdout : false ,
512+ trackUnmanagedFds : false ,
513+ workerData : {
514+ lock,
515+ } ,
516+ } ) ;
517+ this . #worker. unref ( ) ; // ! Allows the process to eventually exit.
518+ this . #worker. on ( 'exit' , process . exit ) ;
525519 }
526520
527521 waitForWorker ( ) {
528- // There is one Hooks instance for each worker thread. But only one of these Hooks instances
529- // has an InternalWorker. That was the Hooks instance created for the main thread.
530- // It means for all Hooks instances that are not on the main thread => they are ready because they
531- // delegate to the single InternalWorker anyway.
532- if ( ! isMainThread ) {
533- return ;
534- }
535-
536522 if ( ! this . #isReady) {
537523 const { kIsOnline } = require ( 'internal/worker' ) ;
538524 if ( ! this . #worker[ kIsOnline ] ) {
@@ -549,37 +535,6 @@ class HooksProxy {
549535 }
550536 }
551537
552- #postMessageToWorker( method , type , transferList , args ) {
553- this . waitForWorker ( ) ;
554-
555- MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
556-
557- const {
558- port1 : fromHooksThread ,
559- port2 : toHooksThread ,
560- } = new MessageChannel ( ) ;
561-
562- // Pass work to the worker.
563- debug ( `post ${ type } message to worker` , { method, args, transferList } ) ;
564- const usedTransferList = [ toHooksThread ] ;
565- if ( transferList ) {
566- ArrayPrototypePushApply ( usedTransferList , transferList ) ;
567- }
568-
569- this . #portToHooksThread. postMessage (
570- {
571- __proto__ : null ,
572- args,
573- lock : this . #lock,
574- method,
575- port : toHooksThread ,
576- } ,
577- usedTransferList ,
578- ) ;
579-
580- return fromHooksThread ;
581- }
582-
583538 /**
584539 * Invoke a remote method asynchronously.
585540 * @param {string } method Method to invoke
@@ -588,7 +543,22 @@ class HooksProxy {
588543 * @returns {Promise<any> }
589544 */
590545 async makeAsyncRequest ( method , transferList , ...args ) {
591- const fromHooksThread = this . #postMessageToWorker( method , 'Async' , transferList , args ) ;
546+ this . waitForWorker ( ) ;
547+
548+ MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
549+ const asyncCommChannel = new MessageChannel ( ) ;
550+
551+ // Pass work to the worker.
552+ debug ( 'post async message to worker' , { method, args, transferList } ) ;
553+ const finalTransferList = [ asyncCommChannel . port2 ] ;
554+ if ( transferList ) {
555+ ArrayPrototypePushApply ( finalTransferList , transferList ) ;
556+ }
557+ this . #worker. postMessage ( {
558+ __proto__ : null ,
559+ method, args,
560+ port : asyncCommChannel . port2 ,
561+ } , finalTransferList ) ;
592562
593563 if ( this . #numberOfPendingAsyncResponses++ === 0 ) {
594564 // On the next lines, the main thread will await a response from the worker thread that might
@@ -597,11 +567,7 @@ class HooksProxy {
597567 // However we want to keep the process alive until the worker thread responds (or until the
598568 // event loop of the worker thread is also empty), so we ref the worker until we get all the
599569 // responses back.
600- if ( this . #worker) {
601- this . #worker. ref ( ) ;
602- } else {
603- this . #portToHooksThread. ref ( ) ;
604- }
570+ this . #worker. ref ( ) ;
605571 }
606572
607573 let response ;
@@ -610,26 +576,18 @@ class HooksProxy {
610576 await AtomicsWaitAsync ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION , this . #workerNotificationLastId) . value ;
611577 this . #workerNotificationLastId = AtomicsLoad ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION ) ;
612578
613- response = receiveMessageOnPort ( fromHooksThread ) ;
579+ response = receiveMessageOnPort ( asyncCommChannel . port1 ) ;
614580 } while ( response == null ) ;
615581 debug ( 'got async response from worker' , { method, args } , this . #lock) ;
616582
617583 if ( -- this . #numberOfPendingAsyncResponses === 0 ) {
618584 // We got all the responses from the worker, its job is done (until next time).
619- if ( this . #worker) {
620- this . #worker. unref ( ) ;
621- } else {
622- this . #portToHooksThread. unref ( ) ;
623- }
624- }
625-
626- if ( response . message . status === 'exit' ) {
627- process . exit ( response . message . body ) ;
585+ this . #worker. unref ( ) ;
628586 }
629587
630- fromHooksThread . close ( ) ;
631-
632- return this . #unwrapMessage ( response ) ;
588+ const body = this . #unwrapMessage ( response ) ;
589+ asyncCommChannel . port1 . close ( ) ;
590+ return body ;
633591 }
634592
635593 /**
@@ -640,7 +598,11 @@ class HooksProxy {
640598 * @returns {any }
641599 */
642600 makeSyncRequest ( method , transferList , ...args ) {
643- const fromHooksThread = this . #postMessageToWorker( method , 'Sync' , transferList , args ) ;
601+ this . waitForWorker ( ) ;
602+
603+ // Pass work to the worker.
604+ debug ( 'post sync message to worker' , { method, args, transferList } ) ;
605+ this . #worker. postMessage ( { __proto__ : null , method, args } , transferList ) ;
644606
645607 let response ;
646608 do {
@@ -649,17 +611,14 @@ class HooksProxy {
649611 AtomicsWait ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION , this . #workerNotificationLastId) ;
650612 this . #workerNotificationLastId = AtomicsLoad ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION ) ;
651613
652- response = receiveMessageOnPort ( fromHooksThread ) ;
614+ response = this . #worker . receiveMessageSync ( ) ;
653615 } while ( response == null ) ;
654616 debug ( 'got sync response from worker' , { method, args } ) ;
655617 if ( response . message . status === 'never-settle' ) {
656618 process . exit ( kUnsettledTopLevelAwait ) ;
657619 } else if ( response . message . status === 'exit' ) {
658620 process . exit ( response . message . body ) ;
659621 }
660-
661- fromHooksThread . close ( ) ;
662-
663622 return this . #unwrapMessage( response ) ;
664623 }
665624
0 commit comments