1717from  loguru  import  logger 
1818
1919from  guidellm .config  import  settings 
20- from  guidellm .request .session  import  RequestSession 
2120from  guidellm .request .types  import  (
2221    RequestT ,
2322    ResponseT ,
2726    SchedulerRequestResult ,
2827    SchedulerResult ,
2928    SchedulerRunInfo ,
30-     WorkerProcessRequestTime ,
29+     WorkerProcessRequest ,
3130    WorkerProcessResult ,
3231)
3332from  guidellm .scheduler .strategy  import  SchedulingStrategy 
@@ -127,10 +126,14 @@ async def run(
127126            ) as  executor ,
128127        ):
129128            requests_iter : Optional [Iterator [Any ]] =  None 
129+             # TODO: Configurable delay and move somewhere more appropriate 
130+             scheduling_strategy .start_time  =  (
131+                 time .time ()
132+             )  # Add a small delay to allow processes to start 
130133            futures , queues , stop_event  =  await  self ._start_processes (
131134                manager , executor , scheduling_strategy 
132135            )
133-             run_info , requests_iter ,  times_iter  =  self ._run_setup (
136+             run_info , requests_iter  =  self ._run_setup (
134137                futures , scheduling_strategy , max_number , max_duration 
135138            )
136139            yield  SchedulerResult (
@@ -147,17 +150,16 @@ async def run(
147150
148151                    if  (
149152                        requests_iter  is  None 
150-                         and  run_info .completed_requests  >=  run_info .created_requests 
153+                         # FIXME: Need new way to handle max requests 
154+                         # and run_info.completed_requests >= run_info.created_requests 
151155                    ):
152156                        # we've exhausted all requests we've wanted to run 
153157                        # and yielded all responses 
154158                        break 
155159
156160                    requests_iter  =  self ._add_requests (
157161                        requests_iter ,
158-                         times_iter ,
159162                        queues .requests ,
160-                         queues .times ,
161163                        run_info ,
162164                    )
163165                    await  asyncio .sleep (0 )  # enable requests to start 
@@ -196,7 +198,6 @@ async def _start_processes(
196198            requests = manager .Queue (
197199                maxsize = scheduling_strategy .processing_requests_limit 
198200            ),
199-             times = manager .Queue (maxsize = scheduling_strategy .processing_requests_limit ),
200201            responses = manager .Queue (),
201202        )
202203        stop_event  =  manager .Event ()
@@ -229,10 +230,12 @@ async def _start_processes(
229230                    executor ,
230231                    self .worker .process_loop_asynchronous ,
231232                    queues ,
233+                     scheduling_strategy ,
232234                    stop_event ,
233235                    False ,  # TODO: Make configurable 
234236                    requests_limit ,
235237                    id_ ,
238+                     num_processes ,
236239                )
237240            )
238241
@@ -246,11 +249,9 @@ def _run_setup(
246249        scheduling_strategy : SchedulingStrategy ,
247250        max_number : Optional [int ],
248251        max_duration : Optional [float ],
249-     ) ->  tuple [SchedulerRunInfo , Iterator [Any ],  Iterator [ float ] ]:
252+     ) ->  tuple [SchedulerRunInfo , Iterator [Any ]]:
250253        requests_iter  =  iter (self .request_loader )
251-         start_time  =  time .time ()
252-         times_iter  =  iter (scheduling_strategy .request_times ())
253-         end_time  =  time .time () +  (max_duration  or  math .inf )
254+         end_time  =  scheduling_strategy .start_time  +  (max_duration  or  math .inf )
254255        end_number  =  max_number  or  math .inf 
255256
256257        try :
@@ -268,21 +269,19 @@ def _run_setup(
268269            )
269270
270271        info  =  SchedulerRunInfo (
271-             start_time = start_time ,
272+             start_time = scheduling_strategy . start_time ,
272273            end_time = end_time ,
273274            end_number = end_number ,
274275            processes = len (processes ),
275276            strategy = scheduling_strategy ,
276277        )
277278
278-         return  info , requests_iter ,  times_iter 
279+         return  info , requests_iter 
279280
280281    def  _add_requests (
281282        self ,
282283        requests_iter : Optional [Iterator [Any ]],
283-         times_iter : Iterator [float ],
284-         requests_queue : Queue [RequestSession [RequestT , ResponseT ]],
285-         times_queue : Queue [WorkerProcessRequestTime ],
284+         requests_queue : Queue [WorkerProcessRequest [RequestT , ResponseT ]],
286285        run_info : SchedulerRunInfo ,
287286    ) ->  Optional [Iterator [Any ]]:
288287        if  requests_iter  is  not   None :
@@ -296,24 +295,20 @@ def _add_requests(
296295                    if  run_info .created_requests  >=  run_info .end_number :
297296                        raise  StopIteration 
298297
298+                     if  time .time () >=  run_info .end_time :
299+                         raise  StopIteration 
300+ 
299301                    session  =  next (requests_iter )
300-                     requests_queue .put (session )
301-                     for  _  in  range (len (session )):
302-                         if  (
303-                             request_time  :=  next (times_iter )
304-                         ) >=  run_info .end_time  or  time .time () >=  run_info .end_time :
305-                             raise  StopIteration 
306- 
307-                         work_req  =  WorkerProcessRequestTime (
308-                             start_time = request_time ,
309-                             timeout_time = run_info .end_time ,
310-                             queued_time = time .time (),
311-                         )
312-                         times_queue .put (work_req )
313- 
314-                         run_info .created_requests  +=  1 
315-                         run_info .queued_requests  +=  1 
316-                         added_count  +=  1 
302+                     work_req  =  WorkerProcessRequest (
303+                         session = session ,
304+                         timeout_time = run_info .end_time ,
305+                         queued_time = time .time (),
306+                     )
307+                     requests_queue .put (work_req )
308+ 
309+                     run_info .created_requests  +=  len (session )
310+                     run_info .queued_requests  +=  len (session )
311+                     added_count  +=  len (session )
317312            except  StopIteration :
318313                # we've reached the limit number, limit time, or exhausted the requests 
319314                # set to None to stop adding more and tell the loop no more requests 
0 commit comments