66from  multiprocessing  import  Manager 
77from  queue  import  Empty  as  QueueEmpty 
88from  queue  import  Queue 
9+ from  threading  import  Event 
910from  typing  import  (
1011    Any ,
1112    Generic ,
@@ -126,7 +127,7 @@ async def run(
126127            ) as  executor ,
127128        ):
128129            requests_iter : Optional [Iterator [Any ]] =  None 
129-             futures , queues  =  await  self ._start_processes (
130+             futures , queues ,  stop_event  =  await  self ._start_processes (
130131                manager , executor , scheduling_strategy 
131132            )
132133            run_info , requests_iter , times_iter  =  self ._run_setup (
@@ -178,7 +179,7 @@ async def run(
178179                run_info = run_info ,
179180            )
180181
181-             await  self ._stop_processes (futures , queues . requests )
182+             await  self ._stop_processes (futures , stop_event )
182183
183184    async  def  _start_processes (
184185        self ,
@@ -188,6 +189,7 @@ async def _start_processes(
188189    ) ->  tuple [
189190        list [asyncio .Future ],
190191        MPQueues [RequestT , ResponseT ],
192+         Event ,
191193    ]:
192194        await  self .worker .prepare_multiprocessing ()
193195        queues : MPQueues [RequestT , ResponseT ] =  MPQueues (
@@ -197,6 +199,7 @@ async def _start_processes(
197199            times = manager .Queue (maxsize = scheduling_strategy .processing_requests_limit ),
198200            responses = manager .Queue (),
199201        )
202+         stop_event  =  manager .Event ()
200203
201204        num_processes  =  min (
202205            scheduling_strategy .processes_limit ,
@@ -226,6 +229,7 @@ async def _start_processes(
226229                    executor ,
227230                    self .worker .process_loop_asynchronous ,
228231                    queues ,
232+                     stop_event ,
229233                    False ,  # TODO: Make configurable 
230234                    requests_limit ,
231235                    id_ ,
@@ -234,7 +238,7 @@ async def _start_processes(
234238
235239        await  asyncio .sleep (0.1 )  # give time for processes to start 
236240
237-         return  futures , queues 
241+         return  futures , queues ,  stop_event 
238242
239243    def  _run_setup (
240244        self ,
@@ -369,10 +373,9 @@ def _check_result_ready(
369373    async  def  _stop_processes (
370374        self ,
371375        futures : list [asyncio .Future ],
372-         requests_queue :  Queue [ RequestSession [ RequestT ,  ResponseT ]] ,
376+         stop_event :  Event ,
373377    ):
374-         # FIXME: Need new method for stopping workers 
375-         for  _  in  futures :
376-             requests_queue .put (None )
378+         # stop all processes 
379+         stop_event .set ()
377380
378381        await  asyncio .gather (* futures )
0 commit comments