From 485bfedadae2c02be534b5dde2c41e1fa7b86dab Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Mon, 14 Jul 2025 16:37:42 -0700 Subject: [PATCH 01/11] Add add_logger API to AsyncLLM Signed-off-by: Seiji Eicher --- tests/v1/metrics/test_ray_metrics.py | 4 ++-- vllm/v1/engine/async_llm.py | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/tests/v1/metrics/test_ray_metrics.py b/tests/v1/metrics/test_ray_metrics.py index 0898ae65e7cd..e9bf3d9727fa 100644 --- a/tests/v1/metrics/test_ray_metrics.py +++ b/tests/v1/metrics/test_ray_metrics.py @@ -43,8 +43,8 @@ async def run(self): disable_log_stats=False, ) - engine = AsyncLLM.from_engine_args( - engine_args, stat_loggers=[RayPrometheusStatLogger]) + engine = AsyncLLM.from_engine_args(engine_args) + await engine.add_logger(RayPrometheusStatLogger) for i, prompt in enumerate(example_prompts): results = engine.generate( diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 3754570dfaaa..97f84bc24ccc 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -608,6 +608,25 @@ async def collective_rpc(self, return await self.engine_core.collective_rpc_async( method, timeout, args, kwargs) + async def add_logger(self, logger_factory: StatLoggerFactory) -> None: + if not self.log_stats: + raise RuntimeError( + "Stat logging is disabled. Set `disable_log_stats=False` " + "argument to enable.") + + engine_num = self.vllm_config.parallel_config.data_parallel_size + if len(self.stat_loggers) == 0: + self.stat_loggers = [[] for _ in range(engine_num)] + + logger_type = type(logger_factory) + for logger in self.stat_loggers[0]: + if type(logger) is logger_type: + raise KeyError( + f"Logger with type {logger_type} already exists.") + + for i, logger_list in enumerate(self.stat_loggers): + logger_list.append(logger_factory(self.vllm_config, i)) + @property def is_running(self) -> bool: # Is None before the loop is started. From ba96afc6534a917b46e5f1a6c2c0b253f429d2d3 Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Tue, 15 Jul 2025 17:05:21 -0700 Subject: [PATCH 02/11] Simplify by avoiding duplicate logger checks Signed-off-by: Seiji Eicher --- tests/v1/metrics/test_engine_logger_apis.py | 29 +++++++++++++++++++++ tests/v1/metrics/test_ray_metrics.py | 4 +-- vllm/v1/engine/async_llm.py | 6 ----- 3 files changed, 31 insertions(+), 8 deletions(-) create mode 100644 tests/v1/metrics/test_engine_logger_apis.py diff --git a/tests/v1/metrics/test_engine_logger_apis.py b/tests/v1/metrics/test_engine_logger_apis.py new file mode 100644 index 000000000000..c633a2d4a4f5 --- /dev/null +++ b/tests/v1/metrics/test_engine_logger_apis.py @@ -0,0 +1,29 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import pytest + +from vllm.v1.engine.async_llm import AsyncEngineArgs, AsyncLLM +from vllm.v1.metrics.loggers import PrometheusStatLogger + + +@pytest.mark.asyncio +async def test_async_llm_add_logger(): + # Minimal model config for test + model_name = "distilbert/distilgpt2" + dtype = "half" + engine_args = AsyncEngineArgs( + model=model_name, + dtype=dtype, + disable_log_stats=False, + enforce_eager=True, + ) + + # Force empty list to avoid default loggers + engine = AsyncLLM.from_engine_args(engine_args, stat_loggers=[]) + + # Add PrometheusStatLogger and verify no exception is raised + await engine.add_logger(PrometheusStatLogger) + + # Verify that logger is present in the first DP rank + assert len(engine.stat_loggers[0]) == 1 + assert isinstance(engine.stat_loggers[0][0], PrometheusStatLogger) \ No newline at end of file diff --git a/tests/v1/metrics/test_ray_metrics.py b/tests/v1/metrics/test_ray_metrics.py index e9bf3d9727fa..0898ae65e7cd 100644 --- a/tests/v1/metrics/test_ray_metrics.py +++ b/tests/v1/metrics/test_ray_metrics.py @@ -43,8 +43,8 @@ async def run(self): disable_log_stats=False, ) - engine = AsyncLLM.from_engine_args(engine_args) - await engine.add_logger(RayPrometheusStatLogger) + engine = AsyncLLM.from_engine_args( + engine_args, stat_loggers=[RayPrometheusStatLogger]) for i, prompt in enumerate(example_prompts): results = engine.generate( diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 97f84bc24ccc..30e01a43903e 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -618,12 +618,6 @@ async def add_logger(self, logger_factory: StatLoggerFactory) -> None: if len(self.stat_loggers) == 0: self.stat_loggers = [[] for _ in range(engine_num)] - logger_type = type(logger_factory) - for logger in self.stat_loggers[0]: - if type(logger) is logger_type: - raise KeyError( - f"Logger with type {logger_type} already exists.") - for i, logger_list in enumerate(self.stat_loggers): logger_list.append(logger_factory(self.vllm_config, i)) From 82a3a09e351f81f073a72dee080e51f0b02a2943 Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Tue, 22 Jul 2025 14:16:29 -0700 Subject: [PATCH 03/11] Adapt to #21257 Signed-off-by: Seiji Eicher --- tests/v1/metrics/test_engine_logger_apis.py | 49 ++++++++------ vllm/v1/engine/async_llm.py | 28 ++++---- vllm/v1/metrics/loggers.py | 71 +++++++++++++++------ 3 files changed, 95 insertions(+), 53 deletions(-) diff --git a/tests/v1/metrics/test_engine_logger_apis.py b/tests/v1/metrics/test_engine_logger_apis.py index c633a2d4a4f5..a37bab2ce54a 100644 --- a/tests/v1/metrics/test_engine_logger_apis.py +++ b/tests/v1/metrics/test_engine_logger_apis.py @@ -3,27 +3,34 @@ import pytest from vllm.v1.engine.async_llm import AsyncEngineArgs, AsyncLLM -from vllm.v1.metrics.loggers import PrometheusStatLogger +from vllm.v1.metrics.ray_wrappers import RayPrometheusStatLogger + +DEFAULT_ENGINE_ARGS = AsyncEngineArgs( + model="distilbert/distilgpt2", + dtype="half", + disable_log_stats=False, + enforce_eager=True, +) @pytest.mark.asyncio -async def test_async_llm_add_logger(): - # Minimal model config for test - model_name = "distilbert/distilgpt2" - dtype = "half" - engine_args = AsyncEngineArgs( - model=model_name, - dtype=dtype, - disable_log_stats=False, - enforce_eager=True, - ) - - # Force empty list to avoid default loggers - engine = AsyncLLM.from_engine_args(engine_args, stat_loggers=[]) - - # Add PrometheusStatLogger and verify no exception is raised - await engine.add_logger(PrometheusStatLogger) - - # Verify that logger is present in the first DP rank - assert len(engine.stat_loggers[0]) == 1 - assert isinstance(engine.stat_loggers[0][0], PrometheusStatLogger) \ No newline at end of file +async def test_async_llm_replace_default_loggers(): + # Empty stat_loggers removes default loggers + engine = AsyncLLM.from_engine_args(DEFAULT_ENGINE_ARGS, stat_loggers=[]) + await engine.add_logger(RayPrometheusStatLogger) + + # Verify that only this logger is present in shared loggers + assert len(engine.logger_manager.shared_loggers) == 1 + assert isinstance(engine.logger_manager.shared_loggers[0], + RayPrometheusStatLogger) + + +@pytest.mark.asyncio +async def test_async_llm_add_to_default_loggers(): + # Start with default loggers, including PrometheusStatLogger + engine = AsyncLLM.from_engine_args(DEFAULT_ENGINE_ARGS) + + # Add another PrometheusStatLogger subclass + await engine.add_logger(RayPrometheusStatLogger) + + assert len(engine.logger_manager.shared_loggers) == 2 diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index dee79206d7f6..f046e1749730 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -36,7 +36,8 @@ from vllm.v1.engine.parallel_sampling import ParentRequest from vllm.v1.engine.processor import Processor from vllm.v1.executor.abstract import Executor -from vllm.v1.metrics.loggers import StatLoggerFactory, StatLoggerManager +from vllm.v1.metrics.loggers import (DpSharedStatLoggerFactory, + StatLoggerFactory, StatLoggerManager) from vllm.v1.metrics.prometheus import shutdown_prometheus from vllm.v1.metrics.stats import IterationStats @@ -55,7 +56,8 @@ def __init__( use_cached_outputs: bool = False, log_requests: bool = True, start_engine_loop: bool = True, - stat_loggers: Optional[list[StatLoggerFactory]] = None, + stat_loggers: Optional[list[Union[StatLoggerFactory, + DpSharedStatLoggerFactory]]] = None, client_addresses: Optional[dict[str, str]] = None, client_index: int = 0, ) -> None: @@ -144,7 +146,8 @@ def from_vllm_config( vllm_config: VllmConfig, start_engine_loop: bool = True, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, - stat_loggers: Optional[list[StatLoggerFactory]] = None, + stat_loggers: Optional[list[Union[StatLoggerFactory, + DpSharedStatLoggerFactory]]] = None, disable_log_requests: bool = False, disable_log_stats: bool = False, client_addresses: Optional[dict[str, str]] = None, @@ -176,7 +179,8 @@ def from_engine_args( engine_args: AsyncEngineArgs, start_engine_loop: bool = True, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, - stat_loggers: Optional[list[StatLoggerFactory]] = None, + stat_loggers: Optional[list[Union[StatLoggerFactory, + DpSharedStatLoggerFactory]]] = None, ) -> "AsyncLLM": """Create an AsyncLLM from the EngineArgs.""" @@ -596,19 +600,17 @@ async def collective_rpc(self, return await self.engine_core.collective_rpc_async( method, timeout, args, kwargs) - async def add_logger(self, logger_factory: StatLoggerFactory) -> None: - if not self.log_stats: + async def add_logger( + self, logger_factory: Union[StatLoggerFactory, + DpSharedStatLoggerFactory] + ) -> None: + if self.logger_manager is None: raise RuntimeError( "Stat logging is disabled. Set `disable_log_stats=False` " - "argument to enable.") + "engine argument to enable.") - engine_num = self.vllm_config.parallel_config.data_parallel_size - if len(self.stat_loggers) == 0: - self.stat_loggers = [[] for _ in range(engine_num)] + self.logger_manager.add_logger(logger_factory) - for i, logger_list in enumerate(self.stat_loggers): - logger_list.append(logger_factory(self.vllm_config, i)) - async def wait_for_requests_to_drain(self, drain_timeout: int = 300): """Wait for all requests to be drained.""" start_time = time.time() diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 7f2556bab5a4..e2cbb33ac78c 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -20,6 +20,8 @@ logger = init_logger(__name__) StatLoggerFactory = Callable[[VllmConfig, int], "StatLoggerBase"] +DpSharedStatLoggerFactory = Callable[[VllmConfig, Optional[list[int]]], + "PrometheusStatLogger"] class StatLoggerBase(ABC): @@ -633,37 +635,67 @@ def __init__( self, vllm_config: VllmConfig, engine_idxs: Optional[list[int]] = None, - custom_stat_loggers: Optional[list[StatLoggerFactory]] = None, + custom_stat_loggers: Optional[list[Union[ + StatLoggerFactory, DpSharedStatLoggerFactory]]] = None, ): + """ + Initializes the StatLoggerManager. + + Args: + vllm_config (VllmConfig): The configuration object for vLLM. + engine_idxs (Optional[list[int]]): List of engine indices. If None, + defaults to [0]. + custom_stat_loggers (Optional[list[Union[ + StatLoggerFactory, DpSharedStatLoggerFactory + ]]]): + Optional list of custom stat logger factories to use. If None, + default loggers are used. + """ self.engine_idxs = engine_idxs if engine_idxs else [0] + self.vllm_config = vllm_config - factories: list[StatLoggerFactory] + factories: list[StatLoggerFactory] = [] + shared_logger_factories: list[DpSharedStatLoggerFactory] = [] if custom_stat_loggers is not None: - factories = custom_stat_loggers + for factory in custom_stat_loggers: + if isinstance(factory, type) and issubclass( + factory, PrometheusStatLogger): + shared_logger_factories.append(factory) # type: ignore + else: + factories.append(factory) # type: ignore else: - factories = [] if logger.isEnabledFor(logging.INFO): factories.append(LoggingStatLogger) + shared_logger_factories.append(PrometheusStatLogger) + + self.shared_loggers = [] + if len(shared_logger_factories) > 0: + for factory in shared_logger_factories: + self.shared_loggers.append(factory(vllm_config, engine_idxs)) + # engine_idx: StatLogger self.per_engine_logger_dict: dict[int, list[StatLoggerBase]] = {} - prometheus_factory = PrometheusStatLogger for engine_idx in self.engine_idxs: loggers: list[StatLoggerBase] = [] for logger_factory in factories: - # If we get a custom prometheus logger, use that - # instead. This is typically used for the ray case. - if (isinstance(logger_factory, type) - and issubclass(logger_factory, PrometheusStatLogger)): - prometheus_factory = logger_factory - continue loggers.append(logger_factory(vllm_config, engine_idx)) # type: ignore self.per_engine_logger_dict[engine_idx] = loggers - # For Prometheus, need to share the metrics between EngineCores. - # Each EngineCore's metrics are expressed as a unique label. - self.prometheus_logger = prometheus_factory(vllm_config, engine_idxs) + def add_logger( + self, logger_factory: Union[StatLoggerFactory, + DpSharedStatLoggerFactory] + ) -> None: + if (isinstance(logger_factory, type) + and issubclass(logger_factory, PrometheusStatLogger)): + self.shared_loggers.append( + logger_factory(self.vllm_config, + self.engine_idxs)) # type: ignore + else: + for engine_idx, logger_list in self.per_engine_logger_dict.items(): + logger_list.append(logger_factory(self.vllm_config, + engine_idx)) # type: ignore def record( self, @@ -678,8 +710,8 @@ def record( for logger in per_engine_loggers: logger.record(scheduler_stats, iteration_stats, engine_idx) - self.prometheus_logger.record(scheduler_stats, iteration_stats, - engine_idx) + for logger in self.shared_loggers: + logger.record(scheduler_stats, iteration_stats, engine_idx) def log(self): for per_engine_loggers in self.per_engine_logger_dict.values(): @@ -687,8 +719,9 @@ def log(self): logger.log() def log_engine_initialized(self): - self.prometheus_logger.log_engine_initialized() + for shared_logger in self.shared_loggers: + shared_logger.log_engine_initialized() for per_engine_loggers in self.per_engine_logger_dict.values(): - for logger in per_engine_loggers: - logger.log_engine_initialized() + for per_engine_logger in per_engine_loggers: + per_engine_logger.log_engine_initialized() From c5aa929f350aff20d01da1faf3e0959820f9aaae Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Tue, 12 Aug 2025 15:27:19 -0700 Subject: [PATCH 04/11] Clean revert to main Signed-off-by: Seiji Eicher --- vllm/v1/engine/async_llm.py | 27 ++++---------- vllm/v1/metrics/loggers.py | 71 ++++++++++--------------------------- 2 files changed, 26 insertions(+), 72 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index dd41cff08509..45f450291ab6 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -27,7 +27,7 @@ from vllm.transformers_utils.tokenizer import AnyTokenizer from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs from vllm.usage.usage_lib import UsageContext -from vllm.utils import Device, cancel_task_threadsafe, cdiv, deprecate_kwargs +from vllm.utils import Device, cdiv, deprecate_kwargs from vllm.v1.engine import EngineCoreRequest from vllm.v1.engine.core_client import EngineCoreClient from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError @@ -36,8 +36,7 @@ from vllm.v1.engine.parallel_sampling import ParentRequest from vllm.v1.engine.processor import Processor from vllm.v1.executor.abstract import Executor -from vllm.v1.metrics.loggers import (DpSharedStatLoggerFactory, - StatLoggerFactory, StatLoggerManager) +from vllm.v1.metrics.loggers import StatLoggerFactory, StatLoggerManager from vllm.v1.metrics.prometheus import shutdown_prometheus from vllm.v1.metrics.stats import IterationStats @@ -56,8 +55,7 @@ def __init__( use_cached_outputs: bool = False, log_requests: bool = True, start_engine_loop: bool = True, - stat_loggers: Optional[list[Union[StatLoggerFactory, - DpSharedStatLoggerFactory]]] = None, + stat_loggers: Optional[list[StatLoggerFactory]] = None, client_addresses: Optional[dict[str, str]] = None, client_count: int = 1, client_index: int = 0, @@ -191,8 +189,7 @@ def from_engine_args( engine_args: AsyncEngineArgs, start_engine_loop: bool = True, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, - stat_loggers: Optional[list[Union[StatLoggerFactory, - DpSharedStatLoggerFactory]]] = None, + stat_loggers: Optional[list[StatLoggerFactory]] = None, ) -> "AsyncLLM": """Create an AsyncLLM from the EngineArgs.""" @@ -222,7 +219,8 @@ def shutdown(self): if engine_core := getattr(self, "engine_core", None): engine_core.shutdown() - cancel_task_threadsafe(getattr(self, "output_handler", None)) + if handler := getattr(self, "output_handler", None): + handler.cancel() async def get_supported_tasks(self) -> tuple[SupportedTask, ...]: return await self.engine_core.get_supported_tasks_async() @@ -568,7 +566,7 @@ async def stop_profile(self) -> None: await self.engine_core.profile_async(False) async def reset_mm_cache(self) -> None: - self.processor.mm_registry.reset_processor_cache(self.model_config) + self.processor.mm_registry.reset_processor_cache() self.processor.mm_input_cache_client.reset() await self.engine_core.reset_mm_cache_async() @@ -614,17 +612,6 @@ async def collective_rpc(self, return await self.engine_core.collective_rpc_async( method, timeout, args, kwargs) - async def add_logger( - self, logger_factory: Union[StatLoggerFactory, - DpSharedStatLoggerFactory] - ) -> None: - if self.logger_manager is None: - raise RuntimeError( - "Stat logging is disabled. Set `disable_log_stats=False` " - "engine argument to enable.") - - self.logger_manager.add_logger(logger_factory) - async def wait_for_requests_to_drain(self, drain_timeout: int = 300): """Wait for all requests to be drained.""" start_time = time.time() diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 1a220314ec88..3b0616952bab 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -19,8 +19,6 @@ logger = init_logger(__name__) StatLoggerFactory = Callable[[VllmConfig, int], "StatLoggerBase"] -DpSharedStatLoggerFactory = Callable[[VllmConfig, Optional[list[int]]], - "PrometheusStatLogger"] class StatLoggerBase(ABC): @@ -636,67 +634,37 @@ def __init__( self, vllm_config: VllmConfig, engine_idxs: Optional[list[int]] = None, - custom_stat_loggers: Optional[list[Union[ - StatLoggerFactory, DpSharedStatLoggerFactory]]] = None, + custom_stat_loggers: Optional[list[StatLoggerFactory]] = None, ): - """ - Initializes the StatLoggerManager. - - Args: - vllm_config (VllmConfig): The configuration object for vLLM. - engine_idxs (Optional[list[int]]): List of engine indices. If None, - defaults to [0]. - custom_stat_loggers (Optional[list[Union[ - StatLoggerFactory, DpSharedStatLoggerFactory - ]]]): - Optional list of custom stat logger factories to use. If None, - default loggers are used. - """ self.engine_idxs = engine_idxs if engine_idxs else [0] - self.vllm_config = vllm_config - factories: list[StatLoggerFactory] = [] - shared_logger_factories: list[DpSharedStatLoggerFactory] = [] + factories: list[StatLoggerFactory] if custom_stat_loggers is not None: - for factory in custom_stat_loggers: - if isinstance(factory, type) and issubclass( - factory, PrometheusStatLogger): - shared_logger_factories.append(factory) # type: ignore - else: - factories.append(factory) # type: ignore + factories = custom_stat_loggers else: + factories = [] if logger.isEnabledFor(logging.INFO): factories.append(LoggingStatLogger) - shared_logger_factories.append(PrometheusStatLogger) - - self.shared_loggers = [] - if len(shared_logger_factories) > 0: - for factory in shared_logger_factories: - self.shared_loggers.append(factory(vllm_config, engine_idxs)) - # engine_idx: StatLogger self.per_engine_logger_dict: dict[int, list[StatLoggerBase]] = {} + prometheus_factory = PrometheusStatLogger for engine_idx in self.engine_idxs: loggers: list[StatLoggerBase] = [] for logger_factory in factories: + # If we get a custom prometheus logger, use that + # instead. This is typically used for the ray case. + if (isinstance(logger_factory, type) + and issubclass(logger_factory, PrometheusStatLogger)): + prometheus_factory = logger_factory + continue loggers.append(logger_factory(vllm_config, engine_idx)) # type: ignore self.per_engine_logger_dict[engine_idx] = loggers - def add_logger( - self, logger_factory: Union[StatLoggerFactory, - DpSharedStatLoggerFactory] - ) -> None: - if (isinstance(logger_factory, type) - and issubclass(logger_factory, PrometheusStatLogger)): - self.shared_loggers.append( - logger_factory(self.vllm_config, - self.engine_idxs)) # type: ignore - else: - for engine_idx, logger_list in self.per_engine_logger_dict.items(): - logger_list.append(logger_factory(self.vllm_config, - engine_idx)) # type: ignore + # For Prometheus, need to share the metrics between EngineCores. + # Each EngineCore's metrics are expressed as a unique label. + self.prometheus_logger = prometheus_factory(vllm_config, engine_idxs) def record( self, @@ -711,8 +679,8 @@ def record( for logger in per_engine_loggers: logger.record(scheduler_stats, iteration_stats, engine_idx) - for logger in self.shared_loggers: - logger.record(scheduler_stats, iteration_stats, engine_idx) + self.prometheus_logger.record(scheduler_stats, iteration_stats, + engine_idx) def log(self): for per_engine_loggers in self.per_engine_logger_dict.values(): @@ -720,9 +688,8 @@ def log(self): logger.log() def log_engine_initialized(self): - for shared_logger in self.shared_loggers: - shared_logger.log_engine_initialized() + self.prometheus_logger.log_engine_initialized() for per_engine_loggers in self.per_engine_logger_dict.values(): - for per_engine_logger in per_engine_loggers: - per_engine_logger.log_engine_initialized() + for logger in per_engine_loggers: + logger.log_engine_initialized() From 75254ec7bcf60fecbc6fb8ba0f251da1627f9633 Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Tue, 12 Aug 2025 16:57:26 -0700 Subject: [PATCH 05/11] disable_log_stats=True disables default loggers only Signed-off-by: Seiji Eicher --- tests/v1/metrics/test_engine_logger_apis.py | 81 +++++++++++++++------ vllm/v1/engine/async_llm.py | 9 ++- vllm/v1/metrics/loggers.py | 48 ++++++------ 3 files changed, 94 insertions(+), 44 deletions(-) diff --git a/tests/v1/metrics/test_engine_logger_apis.py b/tests/v1/metrics/test_engine_logger_apis.py index a37bab2ce54a..f4291da95f56 100644 --- a/tests/v1/metrics/test_engine_logger_apis.py +++ b/tests/v1/metrics/test_engine_logger_apis.py @@ -1,36 +1,75 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import copy + import pytest from vllm.v1.engine.async_llm import AsyncEngineArgs, AsyncLLM from vllm.v1.metrics.ray_wrappers import RayPrometheusStatLogger -DEFAULT_ENGINE_ARGS = AsyncEngineArgs( - model="distilbert/distilgpt2", - dtype="half", - disable_log_stats=False, - enforce_eager=True, -) + +@pytest.fixture +def log_stats_enabled_engine_args(): + """ + Shared fixture providing common AsyncEngineArgs configuration + used across multiple tests. + """ + return AsyncEngineArgs( + model="distilbert/distilgpt2", + dtype="half", + disable_log_stats=False, + enforce_eager=True, + ) + + +@pytest.fixture +def default_dp_shared_loggers_len(log_stats_enabled_engine_args): + """ + Fixture to provide the length of the default dp_shared_loggers + for AsyncLLM with no custom stat loggers. + """ + engine = AsyncLLM.from_engine_args(log_stats_enabled_engine_args, + stat_loggers=[]) + length = len(engine.logger_manager.dp_shared_loggers) + engine.shutdown() + return length @pytest.mark.asyncio -async def test_async_llm_replace_default_loggers(): - # Empty stat_loggers removes default loggers - engine = AsyncLLM.from_engine_args(DEFAULT_ENGINE_ARGS, stat_loggers=[]) - await engine.add_logger(RayPrometheusStatLogger) - - # Verify that only this logger is present in shared loggers - assert len(engine.logger_manager.shared_loggers) == 1 - assert isinstance(engine.logger_manager.shared_loggers[0], - RayPrometheusStatLogger) +async def test_async_llm_replace_default_loggers( + log_stats_enabled_engine_args, default_dp_shared_loggers_len): + """ + The default stats loggers should be used regardless of whether additional + custom ones are added. + """ + + engine = AsyncLLM.from_engine_args(log_stats_enabled_engine_args, + stat_loggers=[RayPrometheusStatLogger]) + assert len(engine.logger_manager.dp_shared_loggers + ) == default_dp_shared_loggers_len + 1 + engine.shutdown() @pytest.mark.asyncio -async def test_async_llm_add_to_default_loggers(): - # Start with default loggers, including PrometheusStatLogger - engine = AsyncLLM.from_engine_args(DEFAULT_ENGINE_ARGS) +async def test_async_llm_add_to_default_loggers(log_stats_enabled_engine_args): + """ + It's still possible to use custom stat loggers exclusively by passing + disable_log_stats=True in addition to a list of custom stat loggers. + """ + # Create engine_args with disable_log_stats=True for this test + disabled_log_engine_args = copy.deepcopy(log_stats_enabled_engine_args) + disabled_log_engine_args.disable_log_stats = True + + # Disable default loggers whilst passing a custom stat logger + engine = AsyncLLM.from_engine_args(disabled_log_engine_args, + stat_loggers=[RayPrometheusStatLogger]) + + # Only RayPrometheusStatLogger is available + assert len(engine.logger_manager.dp_shared_loggers) == 1 + assert isinstance(engine.logger_manager.dp_shared_loggers[0], + RayPrometheusStatLogger) - # Add another PrometheusStatLogger subclass - await engine.add_logger(RayPrometheusStatLogger) + # log_stats is still True, since custom stat loggers are used + assert engine.log_stats - assert len(engine.logger_manager.shared_loggers) == 2 + engine.shutdown() diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 45f450291ab6..7c35fdc456b2 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -93,7 +93,13 @@ def __init__( self.model_config = vllm_config.model_config self.vllm_config = vllm_config self.log_requests = log_requests - self.log_stats = log_stats + + self.log_stats = log_stats or (stat_loggers is not None) + disable_default_loggers = not log_stats and (stat_loggers is not None) + if disable_default_loggers: + logger.info( + "AsyncLLM created with log_stats=False and non-empty custom " + "logger list; enabling logging without default stat loggers") if self.model_config.skip_tokenizer_init: self.tokenizer = None @@ -132,6 +138,7 @@ def __init__( vllm_config=vllm_config, engine_idxs=self.engine_core.engine_ranks_managed, custom_stat_loggers=stat_loggers, + enable_default_loggers=not disable_default_loggers, ) self.logger_manager.log_engine_initialized() diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 3b0616952bab..80f42177914f 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -1,7 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project -import logging import time from abc import ABC, abstractmethod from typing import Callable, Optional, Union @@ -635,37 +634,41 @@ def __init__( vllm_config: VllmConfig, engine_idxs: Optional[list[int]] = None, custom_stat_loggers: Optional[list[StatLoggerFactory]] = None, + enable_default_loggers: bool = True, ): self.engine_idxs = engine_idxs if engine_idxs else [0] - factories: list[StatLoggerFactory] + factories: list[StatLoggerFactory] = [] if custom_stat_loggers is not None: - factories = custom_stat_loggers - else: - factories = [] - if logger.isEnabledFor(logging.INFO): - factories.append(LoggingStatLogger) + factories.extend(custom_stat_loggers) + + if enable_default_loggers: + factories.append(LoggingStatLogger) + + # For Prometheus, need to share the metrics between EngineCores. + # Each EngineCore's metrics are expressed as a unique label. + self.dp_shared_loggers = [] + if enable_default_loggers: + self.dp_shared_loggers.append( + PrometheusStatLogger(vllm_config, engine_idxs)) # engine_idx: StatLogger self.per_engine_logger_dict: dict[int, list[StatLoggerBase]] = {} - prometheus_factory = PrometheusStatLogger for engine_idx in self.engine_idxs: loggers: list[StatLoggerBase] = [] for logger_factory in factories: - # If we get a custom prometheus logger, use that - # instead. This is typically used for the ray case. + # If we get a custom prometheus logger, add that to the shared + # DP logger list. This is typically used for the ray case. if (isinstance(logger_factory, type) and issubclass(logger_factory, PrometheusStatLogger)): - prometheus_factory = logger_factory - continue - loggers.append(logger_factory(vllm_config, - engine_idx)) # type: ignore + self.dp_shared_loggers.append( + logger_factory(vllm_config, + engine_idxs)) # type: ignore + else: + loggers.append(logger_factory(vllm_config, + engine_idx)) # type: ignore self.per_engine_logger_dict[engine_idx] = loggers - # For Prometheus, need to share the metrics between EngineCores. - # Each EngineCore's metrics are expressed as a unique label. - self.prometheus_logger = prometheus_factory(vllm_config, engine_idxs) - def record( self, scheduler_stats: Optional[SchedulerStats], @@ -679,8 +682,8 @@ def record( for logger in per_engine_loggers: logger.record(scheduler_stats, iteration_stats, engine_idx) - self.prometheus_logger.record(scheduler_stats, iteration_stats, - engine_idx) + for logger in self.dp_shared_loggers: + logger.record(scheduler_stats, iteration_stats, engine_idx) def log(self): for per_engine_loggers in self.per_engine_logger_dict.values(): @@ -688,8 +691,9 @@ def log(self): logger.log() def log_engine_initialized(self): - self.prometheus_logger.log_engine_initialized() - for per_engine_loggers in self.per_engine_logger_dict.values(): for logger in per_engine_loggers: logger.log_engine_initialized() + + for logger in self.dp_shared_loggers: + logger.log_engine_initialized() From d2176241c559e1978d492cf25983b0024cf62223 Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Tue, 12 Aug 2025 17:02:07 -0700 Subject: [PATCH 06/11] Revert excess changes to async_llm.py and loggers.py Signed-off-by: Seiji Eicher --- vllm/v1/engine/async_llm.py | 7 +++---- vllm/v1/metrics/loggers.py | 3 ++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 7c35fdc456b2..cf5d16dc1cb0 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -27,7 +27,7 @@ from vllm.transformers_utils.tokenizer import AnyTokenizer from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs from vllm.usage.usage_lib import UsageContext -from vllm.utils import Device, cdiv, deprecate_kwargs +from vllm.utils import Device, cancel_task_threadsafe, cdiv, deprecate_kwargs from vllm.v1.engine import EngineCoreRequest from vllm.v1.engine.core_client import EngineCoreClient from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError @@ -226,8 +226,7 @@ def shutdown(self): if engine_core := getattr(self, "engine_core", None): engine_core.shutdown() - if handler := getattr(self, "output_handler", None): - handler.cancel() + cancel_task_threadsafe(getattr(self, "output_handler", None)) async def get_supported_tasks(self) -> tuple[SupportedTask, ...]: return await self.engine_core.get_supported_tasks_async() @@ -573,7 +572,7 @@ async def stop_profile(self) -> None: await self.engine_core.profile_async(False) async def reset_mm_cache(self) -> None: - self.processor.mm_registry.reset_processor_cache() + self.processor.mm_registry.reset_processor_cache(self.model_config) self.processor.mm_input_cache_client.reset() await self.engine_core.reset_mm_cache_async() diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 80f42177914f..7f2d94127994 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import logging import time from abc import ABC, abstractmethod from typing import Callable, Optional, Union @@ -642,7 +643,7 @@ def __init__( if custom_stat_loggers is not None: factories.extend(custom_stat_loggers) - if enable_default_loggers: + if enable_default_loggers and logger.isEnabledFor(logging.INFO): factories.append(LoggingStatLogger) # For Prometheus, need to share the metrics between EngineCores. From 95b9811f74787fdd7e85ffc0ef3136cffb30497c Mon Sep 17 00:00:00 2001 From: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com> Date: Tue, 12 Aug 2025 17:46:29 -0700 Subject: [PATCH 07/11] Apply suggestions from code review Co-authored-by: Nick Hill Signed-off-by: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com> --- vllm/v1/engine/async_llm.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index cf5d16dc1cb0..a676f977aa15 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -95,8 +95,7 @@ def __init__( self.log_requests = log_requests self.log_stats = log_stats or (stat_loggers is not None) - disable_default_loggers = not log_stats and (stat_loggers is not None) - if disable_default_loggers: + if self.log_stats and not log_stats: logger.info( "AsyncLLM created with log_stats=False and non-empty custom " "logger list; enabling logging without default stat loggers") @@ -138,7 +137,7 @@ def __init__( vllm_config=vllm_config, engine_idxs=self.engine_core.engine_ranks_managed, custom_stat_loggers=stat_loggers, - enable_default_loggers=not disable_default_loggers, + enable_default_loggers=log_stats, ) self.logger_manager.log_engine_initialized() From 04caee2e2d2d61e552d1499ef42b468cd8f62ab9 Mon Sep 17 00:00:00 2001 From: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com> Date: Tue, 2 Sep 2025 13:22:53 -0700 Subject: [PATCH 08/11] Apply suggestions from code review Co-authored-by: Nick Hill Signed-off-by: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com> --- vllm/v1/engine/async_llm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index a676f977aa15..ea7a5d627827 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -95,7 +95,7 @@ def __init__( self.log_requests = log_requests self.log_stats = log_stats or (stat_loggers is not None) - if self.log_stats and not log_stats: + if not log_stats and stats_loggers is not None: logger.info( "AsyncLLM created with log_stats=False and non-empty custom " "logger list; enabling logging without default stat loggers") From 716f71347de9a1b9d5bfe320bf9f8961c2e73e8e Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Tue, 2 Sep 2025 13:38:44 -0700 Subject: [PATCH 09/11] Restore self.prometheus_logger Signed-off-by: Seiji Eicher --- vllm/v1/engine/async_llm.py | 2 +- vllm/v1/metrics/loggers.py | 28 +++++++++++++++------------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index ea7a5d627827..b513d7c3e18e 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -95,7 +95,7 @@ def __init__( self.log_requests = log_requests self.log_stats = log_stats or (stat_loggers is not None) - if not log_stats and stats_loggers is not None: + if not log_stats and stat_loggers is not None: logger.info( "AsyncLLM created with log_stats=False and non-empty custom " "logger list; enabling logging without default stat loggers") diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 7f2d94127994..23dab1b1350e 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -655,21 +655,24 @@ def __init__( # engine_idx: StatLogger self.per_engine_logger_dict: dict[int, list[StatLoggerBase]] = {} + prometheus_factory = PrometheusStatLogger for engine_idx in self.engine_idxs: loggers: list[StatLoggerBase] = [] for logger_factory in factories: - # If we get a custom prometheus logger, add that to the shared - # DP logger list. This is typically used for the ray case. + # If we get a custom prometheus logger, use that + # instead. This is typically used for the ray case. if (isinstance(logger_factory, type) and issubclass(logger_factory, PrometheusStatLogger)): - self.dp_shared_loggers.append( - logger_factory(vllm_config, - engine_idxs)) # type: ignore - else: - loggers.append(logger_factory(vllm_config, - engine_idx)) # type: ignore + prometheus_factory = logger_factory + continue + loggers.append(logger_factory(vllm_config, + engine_idx)) # type: ignore self.per_engine_logger_dict[engine_idx] = loggers + # For Prometheus, need to share the metrics between EngineCores. + # Each EngineCore's metrics are expressed as a unique label. + self.prometheus_logger = prometheus_factory(vllm_config, engine_idxs) + def record( self, scheduler_stats: Optional[SchedulerStats], @@ -683,8 +686,8 @@ def record( for logger in per_engine_loggers: logger.record(scheduler_stats, iteration_stats, engine_idx) - for logger in self.dp_shared_loggers: - logger.record(scheduler_stats, iteration_stats, engine_idx) + self.prometheus_logger.record(scheduler_stats, iteration_stats, + engine_idx) def log(self): for per_engine_loggers in self.per_engine_logger_dict.values(): @@ -692,9 +695,8 @@ def log(self): logger.log() def log_engine_initialized(self): + self.prometheus_logger.log_engine_initialized() + for per_engine_loggers in self.per_engine_logger_dict.values(): for logger in per_engine_loggers: logger.log_engine_initialized() - - for logger in self.dp_shared_loggers: - logger.log_engine_initialized() From f4661dc6d0729a649ab71cafafb07f9c3803214c Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Wed, 3 Sep 2025 10:58:37 -0700 Subject: [PATCH 10/11] Remove remaining reference to dp_shared_loggers and update tests Signed-off-by: Seiji Eicher --- tests/v1/metrics/test_engine_logger_apis.py | 56 ++++++++++++--------- vllm/v1/metrics/loggers.py | 7 --- 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/tests/v1/metrics/test_engine_logger_apis.py b/tests/v1/metrics/test_engine_logger_apis.py index f4291da95f56..e6a4d0a2a2e8 100644 --- a/tests/v1/metrics/test_engine_logger_apis.py +++ b/tests/v1/metrics/test_engine_logger_apis.py @@ -8,6 +8,29 @@ from vllm.v1.metrics.ray_wrappers import RayPrometheusStatLogger +class DummyStatLogger: + """ + A dummy stat logger for testing purposes. + Implements the minimal interface expected by StatLoggerManager. + """ + + def __init__(self, vllm_config, engine_idx): + self.vllm_config = vllm_config + self.engine_idx = engine_idx + self.recorded = [] + self.logged = False + self.engine_initialized = False + + def record(self, scheduler_stats, iteration_stats, engine_idx): + self.recorded.append((scheduler_stats, iteration_stats, engine_idx)) + + def log(self): + self.logged = True + + def log_engine_initialized(self): + self.engine_initialized = True + + @pytest.fixture def log_stats_enabled_engine_args(): """ @@ -22,31 +45,17 @@ def log_stats_enabled_engine_args(): ) -@pytest.fixture -def default_dp_shared_loggers_len(log_stats_enabled_engine_args): - """ - Fixture to provide the length of the default dp_shared_loggers - for AsyncLLM with no custom stat loggers. - """ - engine = AsyncLLM.from_engine_args(log_stats_enabled_engine_args, - stat_loggers=[]) - length = len(engine.logger_manager.dp_shared_loggers) - engine.shutdown() - return length - - @pytest.mark.asyncio async def test_async_llm_replace_default_loggers( - log_stats_enabled_engine_args, default_dp_shared_loggers_len): + log_stats_enabled_engine_args): """ - The default stats loggers should be used regardless of whether additional - custom ones are added. + RayPrometheusStatLogger should replace the default PrometheusStatLogger """ engine = AsyncLLM.from_engine_args(log_stats_enabled_engine_args, stat_loggers=[RayPrometheusStatLogger]) - assert len(engine.logger_manager.dp_shared_loggers - ) == default_dp_shared_loggers_len + 1 + assert isinstance(engine.logger_manager.prometheus_logger, + RayPrometheusStatLogger) engine.shutdown() @@ -60,14 +69,13 @@ async def test_async_llm_add_to_default_loggers(log_stats_enabled_engine_args): disabled_log_engine_args = copy.deepcopy(log_stats_enabled_engine_args) disabled_log_engine_args.disable_log_stats = True - # Disable default loggers whilst passing a custom stat logger + # Disable default loggers; pass custom stat logger to the constructor engine = AsyncLLM.from_engine_args(disabled_log_engine_args, - stat_loggers=[RayPrometheusStatLogger]) + stat_loggers=[DummyStatLogger]) - # Only RayPrometheusStatLogger is available - assert len(engine.logger_manager.dp_shared_loggers) == 1 - assert isinstance(engine.logger_manager.dp_shared_loggers[0], - RayPrometheusStatLogger) + assert len(engine.logger_manager.per_engine_logger_dict[0]) == 1 + assert isinstance(engine.logger_manager.per_engine_logger_dict[0][0], + DummyStatLogger) # log_stats is still True, since custom stat loggers are used assert engine.log_stats diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 23dab1b1350e..4b508385604a 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -646,13 +646,6 @@ def __init__( if enable_default_loggers and logger.isEnabledFor(logging.INFO): factories.append(LoggingStatLogger) - # For Prometheus, need to share the metrics between EngineCores. - # Each EngineCore's metrics are expressed as a unique label. - self.dp_shared_loggers = [] - if enable_default_loggers: - self.dp_shared_loggers.append( - PrometheusStatLogger(vllm_config, engine_idxs)) - # engine_idx: StatLogger self.per_engine_logger_dict: dict[int, list[StatLoggerBase]] = {} prometheus_factory = PrometheusStatLogger From 95eae5d38d5bd0c66893b4f3e5de925b22397f83 Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Wed, 3 Sep 2025 17:40:46 -0700 Subject: [PATCH 11/11] Update test_customize_loggers with new behavior Signed-off-by: Seiji Eicher --- tests/v1/engine/test_async_llm.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/v1/engine/test_async_llm.py b/tests/v1/engine/test_async_llm.py index df04a14af70c..aca546600d0b 100644 --- a/tests/v1/engine/test_async_llm.py +++ b/tests/v1/engine/test_async_llm.py @@ -393,7 +393,7 @@ def __init__(self, vllm_config: VllmConfig, engine_index: int = 0): async def test_customize_loggers(monkeypatch): """Test that we can customize the loggers. If a customized logger is provided at the init, it should - be used directly. + be added to the default loggers. """ with monkeypatch.context() as m, ExitStack() as after: @@ -410,7 +410,8 @@ async def test_customize_loggers(monkeypatch): stat_loggers = engine.logger_manager.per_engine_logger_dict assert len(stat_loggers) == 1 - assert len(stat_loggers[0]) == 1 + assert len( + stat_loggers[0]) == 2 # LoggingStatLogger + MockLoggingStatLogger stat_loggers[0][0].log.assert_called_once()