29
29
)
30
30
31
31
import numpy as np
32
- from pydantic import Field , computed_field , field_serializer , field_validator
32
+ from pydantic import (
33
+ Field ,
34
+ NonNegativeFloat ,
35
+ PositiveFloat ,
36
+ PositiveInt ,
37
+ computed_field ,
38
+ field_serializer ,
39
+ field_validator ,
40
+ )
33
41
42
+ from guidellm import settings
34
43
from guidellm .scheduler import (
35
44
AsyncConstantStrategy ,
36
45
AsyncPoissonStrategy ,
@@ -86,7 +95,7 @@ def __pydantic_schema_base_type__(cls) -> type[Profile]:
86
95
def create (
87
96
cls ,
88
97
rate_type : str ,
89
- rate : float | int | list [float | int ] | None ,
98
+ rate : list [float ] | None ,
90
99
random_seed : int = 42 ,
91
100
** kwargs : Any ,
92
101
) -> Profile :
@@ -112,7 +121,7 @@ def create(
112
121
def resolve_args (
113
122
cls ,
114
123
rate_type : str ,
115
- rate : float | int | list [float , int ] | None ,
124
+ rate : list [float ] | None ,
116
125
random_seed : int ,
117
126
** kwargs : Any ,
118
127
) -> dict [str , Any ]:
@@ -265,7 +274,7 @@ class SynchronousProfile(Profile):
265
274
def resolve_args (
266
275
cls ,
267
276
rate_type : str ,
268
- rate : float | int | list [float , int ] | None ,
277
+ rate : list [float ] | None ,
269
278
random_seed : int ,
270
279
** kwargs : Any ,
271
280
) -> dict [str , Any ]:
@@ -316,24 +325,22 @@ class ConcurrentProfile(Profile):
316
325
"""Fixed-concurrency strategy execution profile with configurable stream counts."""
317
326
318
327
type_ : Literal ["concurrent" ] = "concurrent" # type: ignore[assignment]
319
- streams : int | list [int ] = Field (
328
+ streams : list [PositiveInt ] = Field (
320
329
description = "Number of concurrent streams for request scheduling" ,
321
- gt = 0 ,
322
330
)
323
- startup_duration : float = Field (
331
+ startup_duration : NonNegativeFloat = Field (
324
332
default = 0.0 ,
325
333
description = (
326
334
"Duration in seconds for distributing startup requests "
327
335
"before completion-based timing"
328
336
),
329
- ge = 0 ,
330
337
)
331
338
332
339
@classmethod
333
340
def resolve_args (
334
341
cls ,
335
342
rate_type : str ,
336
- rate : float | int | list [float , int ] | None ,
343
+ rate : list [float ] | None ,
337
344
random_seed : int ,
338
345
** kwargs : Any ,
339
346
) -> dict [str , Any ]:
@@ -348,14 +355,13 @@ def resolve_args(
348
355
:raises ValueError: If rate is None.
349
356
"""
350
357
_ = (rate_type , random_seed ) # unused
351
- kwargs ["streams" ] = rate
358
+ kwargs ["streams" ] = [ int ( r ) for r in rate ] if rate else None
352
359
return kwargs
353
360
354
361
@property
355
362
def strategy_types (self ) -> list [StrategyType ]:
356
363
"""Get concurrent strategy types for each configured stream count."""
357
- num_strategies = len (self .streams ) if isinstance (self .streams , list ) else 1
358
- return [self .type_ ] * num_strategies
364
+ return [self .type_ ] * len (self .streams )
359
365
360
366
def next_strategy (
361
367
self ,
@@ -370,13 +376,12 @@ def next_strategy(
370
376
:return: ConcurrentStrategy with next stream count, or None if complete.
371
377
"""
372
378
_ = (prev_strategy , prev_benchmark ) # unused
373
- streams = self .streams if isinstance (self .streams , list ) else [self .streams ]
374
379
375
- if len (self .completed_strategies ) >= len (streams ):
380
+ if len (self .completed_strategies ) >= len (self . streams ):
376
381
return None
377
382
378
383
return ConcurrentStrategy (
379
- streams = streams [len (self .completed_strategies )],
384
+ streams = self . streams [len (self .completed_strategies )],
380
385
startup_duration = self .startup_duration ,
381
386
)
382
387
@@ -388,25 +393,22 @@ class ThroughputProfile(Profile):
388
393
"""
389
394
390
395
type_ : Literal ["throughput" ] = "throughput" # type: ignore[assignment]
391
- max_concurrency : int | None = Field (
396
+ max_concurrency : PositiveInt | None = Field (
392
397
default = None ,
393
398
description = "Maximum number of concurrent requests to schedule" ,
394
- gt = 0 ,
395
399
)
396
- startup_duration : float = Field (
397
- default = 0.0 ,
400
+ startup_duration : NonNegativeFloat = Field (
398
401
description = (
399
402
"Duration in seconds for distributing startup requests "
400
403
"before full throughput scheduling"
401
404
),
402
- ge = 0 ,
403
405
)
404
406
405
407
@classmethod
406
408
def resolve_args (
407
409
cls ,
408
410
rate_type : str ,
409
- rate : float | int | list [float , int ] | None ,
411
+ rate : list [float ] | None ,
410
412
random_seed : int ,
411
413
** kwargs : Any ,
412
414
) -> dict [str , Any ]:
@@ -422,8 +424,8 @@ def resolve_args(
422
424
_ = (rate_type , random_seed ) # unused
423
425
# Remap rate to max_concurrency, strip out random_seed
424
426
kwargs .pop ("random_seed" , None )
425
- if rate is not None :
426
- kwargs ["max_concurrency" ] = rate
427
+ if rate is not None and len ( rate ) > 0 :
428
+ kwargs ["max_concurrency" ] = rate [ 0 ]
427
429
return kwargs
428
430
429
431
@property
@@ -463,22 +465,19 @@ class AsyncProfile(Profile):
463
465
strategy_type : Literal ["constant" , "poisson" ] = Field (
464
466
description = "Type of asynchronous strategy pattern to use" ,
465
467
)
466
- rate : float | list [float ] = Field (
468
+ rate : list [PositiveFloat ] = Field (
467
469
description = "Request scheduling rate in requests per second" ,
468
- gt = 0 ,
469
470
)
470
- startup_duration : float = Field (
471
+ startup_duration : NonNegativeFloat = Field (
471
472
default = 0.0 ,
472
473
description = (
473
474
"Duration in seconds for distributing startup requests "
474
475
"to converge quickly to desired rate"
475
476
),
476
- ge = 0 ,
477
477
)
478
- max_concurrency : int | None = Field (
478
+ max_concurrency : PositiveInt | None = Field (
479
479
default = None ,
480
480
description = "Maximum number of concurrent requests to schedule" ,
481
- gt = 0 ,
482
481
)
483
482
random_seed : int = Field (
484
483
default = 42 ,
@@ -489,7 +488,7 @@ class AsyncProfile(Profile):
489
488
def resolve_args (
490
489
cls ,
491
490
rate_type : str ,
492
- rate : float | int | list [float , int ] | None ,
491
+ rate : list [float ] | None ,
493
492
random_seed : int ,
494
493
** kwargs : Any ,
495
494
) -> dict [str , Any ]:
@@ -523,7 +522,7 @@ def resolve_args(
523
522
@property
524
523
def strategy_types (self ) -> list [StrategyType ]:
525
524
"""Get async strategy types for each configured rate."""
526
- num_strategies = len (self .rate ) if isinstance ( self . rate , list ) else 1
525
+ num_strategies = len (self .rate )
527
526
return [self .strategy_type ] * num_strategies
528
527
529
528
def next_strategy (
@@ -541,12 +540,11 @@ def next_strategy(
541
540
:raises ValueError: If strategy_type is neither 'constant' nor 'poisson'.
542
541
"""
543
542
_ = (prev_strategy , prev_benchmark ) # unused
544
- rate = self .rate if isinstance (self .rate , list ) else [self .rate ]
545
543
546
- if len (self .completed_strategies ) >= len (rate ):
544
+ if len (self .completed_strategies ) >= len (self . rate ):
547
545
return None
548
546
549
- current_rate = rate [len (self .completed_strategies )]
547
+ current_rate = self . rate [len (self .completed_strategies )]
550
548
551
549
if self .strategy_type == "constant" :
552
550
return AsyncConstantStrategy (
@@ -577,18 +575,16 @@ class SweepProfile(Profile):
577
575
ge = 2 ,
578
576
)
579
577
strategy_type : Literal ["constant" , "poisson" ] = "constant"
580
- startup_duration : float = Field (
578
+ startup_duration : NonNegativeFloat = Field (
581
579
default = 0.0 ,
582
580
description = (
583
581
"Duration in seconds for distributing startup requests "
584
582
"to converge quickly to desired rate"
585
583
),
586
- ge = 0 ,
587
584
)
588
- max_concurrency : int | None = Field (
585
+ max_concurrency : PositiveInt | None = Field (
589
586
default = None ,
590
587
description = "Maximum number of concurrent requests to schedule" ,
591
- gt = 0 ,
592
588
)
593
589
random_seed : int = Field (
594
590
default = 42 ,
@@ -615,7 +611,7 @@ class SweepProfile(Profile):
615
611
def resolve_args (
616
612
cls ,
617
613
rate_type : str ,
618
- rate : float | int | list [float , int ] | None ,
614
+ rate : list [float ] | None ,
619
615
random_seed : int ,
620
616
** kwargs : Any ,
621
617
) -> dict [str , Any ]:
@@ -628,7 +624,8 @@ def resolve_args(
628
624
:param kwargs: Additional arguments to pass through.
629
625
:return: Dictionary of resolved arguments.
630
626
"""
631
- kwargs ["sweep_size" ] = kwargs .get ("sweep_size" , rate )
627
+ sweep_size_from_rate = int (rate [0 ]) if rate else settings .default_sweep_number
628
+ kwargs ["sweep_size" ] = kwargs .get ("sweep_size" , sweep_size_from_rate )
632
629
kwargs ["random_seed" ] = random_seed
633
630
if rate_type in ["constant" , "poisson" ]:
634
631
kwargs ["strategy_type" ] = rate_type
0 commit comments