@@ -40,6 +40,7 @@ def executor_queue(mock_dist):
4040 max_beam_width = 1 ,
4141 max_num_active_requests = 16 ,
4242 enable_iter_perf_stats = True ,
43+ batch_wait_timeout = 0.0 ,
4344 is_disaggregated = False )
4445
4546
@@ -52,6 +53,7 @@ def integration_queue(mock_dist):
5253 max_beam_width = 2 ,
5354 max_num_active_requests = 8 ,
5455 enable_iter_perf_stats = True ,
56+ batch_wait_timeout = 0.0 ,
5557 is_disaggregated = False )
5658
5759
@@ -215,6 +217,75 @@ def test_get_from_request_queue_with_timeout(executor_queue):
215217 assert elapsed < 0.2 # Should finish within timeout
216218
217219
220+ def test_get_from_request_queue_async_behavior (executor_queue ):
221+ """Test asynchronous behavior where requests arrive over time."""
222+ import threading
223+
224+ def add_requests_after_delay (delay , num_requests ):
225+ """Helper function to add requests after a delay."""
226+ time .sleep (delay )
227+ for i in range (num_requests ):
228+ item = RequestQueueItem (i + 10 , Mock ())
229+ executor_queue .request_queue .put (item )
230+
231+ # Test 1: Without batch_wait_timeout (should only get initial requests)
232+ executor_queue .batch_wait_timeout = 0.0
233+
234+ initial_requests = 3
235+ for i in range (initial_requests ):
236+ item = RequestQueueItem (i , Mock ())
237+ executor_queue .request_queue .put (item )
238+
239+ thread = threading .Thread (target = add_requests_after_delay , args = (0.05 , 2 ))
240+ thread .start ()
241+
242+ # Get requests immediately - should only get the initial ones
243+ start_time = time .time ()
244+ items = executor_queue ._get_from_request_queue (None )
245+ elapsed = time .time () - start_time
246+
247+ assert len (items ) == initial_requests
248+ assert elapsed < 0.1
249+ assert all (item .id < 10 for item in items )
250+
251+ thread .join ()
252+
253+ # Test 2: With batch_wait_timeout (should wait and get all requests)
254+ executor_queue .batch_wait_timeout = 0.2
255+
256+ # Clear the queue and add initial requests again
257+ while not executor_queue .request_queue .empty ():
258+ try :
259+ executor_queue .request_queue .get_nowait ()
260+ except queue .Empty :
261+ break
262+
263+ initial_requests = 2
264+ for i in range (initial_requests ):
265+ item = RequestQueueItem (i + 20 , Mock ())
266+ executor_queue .request_queue .put (item )
267+
268+ thread = threading .Thread (target = add_requests_after_delay , args = (0.05 , 3 ))
269+ thread .start ()
270+
271+ # Get requests with batch_wait_timeout - should wait and get all
272+ start_time = time .time ()
273+ items = executor_queue ._get_from_request_queue (None )
274+ elapsed = time .time () - start_time
275+
276+ # Should wait and return all requests
277+ assert len (items ) == initial_requests + 3
278+ assert elapsed >= 0.05
279+ assert elapsed < 0.3
280+
281+ initial_ids = {item .id for item in items if 20 <= item .id < 30 }
282+ delayed_ids = {item .id for item in items if 10 <= item .id < 20 }
283+ assert len (initial_ids ) == initial_requests
284+ assert len (delayed_ids ) == 3
285+
286+ thread .join ()
287+
288+
218289def test_get_from_waiting_queue (executor_queue ):
219290 """Test getting items from waiting queue."""
220291 # Add items to waiting queue
@@ -371,6 +442,7 @@ def attention_dp_queue(mock_dist_attention_dp):
371442 max_beam_width = 2 ,
372443 max_num_active_requests = 8 ,
373444 enable_iter_perf_stats = True ,
445+ batch_wait_timeout = 0.0 ,
374446 is_disaggregated = False )
375447 # Initialize all_ranks_num_active_requests
376448 return queue
0 commit comments