|
2 | 2 | # SPDX-FileCopyrightText: Copyright contributors to the vLLM project |
3 | 3 |
|
4 | 4 | import asyncio |
| 5 | +import time |
5 | 6 | from collections.abc import Iterable |
6 | 7 | from dataclasses import dataclass |
7 | 8 | from typing import Any, Optional, Union, cast |
8 | 9 |
|
9 | 10 | import torch |
10 | 11 |
|
| 12 | +from vllm.config import ObservabilityConfig |
11 | 13 | from vllm.outputs import (CompletionOutput, PoolingOutput, |
12 | 14 | PoolingRequestOutput, RequestOutput) |
13 | 15 | from vllm.sampling_params import RequestOutputKind |
| 16 | +from vllm.tracing import (SpanAttributes, SpanKind, extract_trace_context, |
| 17 | + init_tracer) |
14 | 18 | from vllm.transformers_utils.tokenizer import AnyTokenizer |
15 | 19 | from vllm.transformers_utils.tokenizer_group import TokenizerGroup |
16 | 20 | from vllm.v1.engine import EngineCoreOutput, EngineCoreRequest, FinishReason |
@@ -274,16 +278,26 @@ def _new_pooling_output( |
274 | 278 | class OutputProcessor: |
275 | 279 | """Process EngineCoreOutputs into RequestOutputs.""" |
276 | 280 |
|
277 | | - def __init__( |
278 | | - self, |
279 | | - tokenizer: TokenizerGroup, |
280 | | - log_stats: bool, |
281 | | - ): |
| 281 | + def __init__(self, |
| 282 | + tokenizer: TokenizerGroup, |
| 283 | + log_stats: bool, |
| 284 | + observability_config: Optional[ObservabilityConfig] = None): |
282 | 285 | self.log_stats = log_stats |
283 | 286 | self.tokenizer = tokenizer |
284 | 287 | self.request_states: dict[str, RequestState] = {} |
285 | 288 | self.parent_requests: dict[str, ParentRequest] = {} |
286 | 289 | self.lora_states = LoRARequestStates() |
| 290 | + self.observability_config = observability_config |
| 291 | + |
| 292 | + self.tracer = None |
| 293 | + if (self.observability_config is not None |
| 294 | + and self.observability_config.otlp_traces_endpoint): |
| 295 | + self.tracer = init_tracer( |
| 296 | + "vllm.llm_engine", |
| 297 | + self.observability_config.otlp_traces_endpoint) |
| 298 | + |
| 299 | + def is_tracing_enabled(self) -> bool: |
| 300 | + return self.tracer is not None |
287 | 301 |
|
288 | 302 | def get_num_unfinished_requests(self): |
289 | 303 | return len(self.request_states) |
@@ -440,6 +454,65 @@ def process_outputs( |
440 | 454 | reqs_to_abort=reqs_to_abort, |
441 | 455 | ) |
442 | 456 |
|
| 457 | + def do_tracing(self, engine_core_output: EngineCoreOutput, |
| 458 | + req_state: RequestState, |
| 459 | + iteration_stats: Optional[IterationStats]): |
| 460 | + if (engine_core_output.finish_reason is None or iteration_stats is None |
| 461 | + or req_state is None or req_state.stats is None |
| 462 | + or self.tracer is None): |
| 463 | + return |
| 464 | + arrival_time_nano_seconds = int(req_state.stats.arrival_time * 1e9) |
| 465 | + |
| 466 | + trace_context = extract_trace_context(engine_core_output.trace_headers) |
| 467 | + with self.tracer.start_as_current_span( |
| 468 | + "llm_request", |
| 469 | + kind=SpanKind.SERVER, |
| 470 | + context=trace_context, |
| 471 | + start_time=arrival_time_nano_seconds) as span: |
| 472 | + metrics = req_state.stats |
| 473 | + ttft = metrics.first_token_ts - metrics.arrival_time |
| 474 | + e2e_time = time.time() - metrics.arrival_time |
| 475 | + # Queued interval is from first QUEUED event to first SCHEDULED |
| 476 | + queued_time = metrics.scheduled_ts - metrics.queued_ts |
| 477 | + |
| 478 | + # Prefill interval is from first SCHEDULED to first NEW_TOKEN |
| 479 | + # Any preemptions during prefill is included in the interval |
| 480 | + prefill_time = metrics.first_token_ts - metrics.scheduled_ts |
| 481 | + |
| 482 | + # Decode interval is from first NEW_TOKEN to last NEW_TOKEN |
| 483 | + # Any preemptions during decode are included |
| 484 | + decode_time = metrics.last_token_ts - metrics.first_token_ts |
| 485 | + |
| 486 | + # Inference interval is from first SCHEDULED to last NEW_TOKEN |
| 487 | + # Any preemptions during prefill or decode are included |
| 488 | + inference_time = metrics.last_token_ts - metrics.scheduled_ts |
| 489 | + span.set_attribute(SpanAttributes.GEN_AI_RESPONSE_MODEL, |
| 490 | + self.tokenizer.tokenizer_id) |
| 491 | + span.set_attribute(SpanAttributes.GEN_AI_REQUEST_ID, |
| 492 | + req_state.request_id) |
| 493 | + span.set_attribute(SpanAttributes.GEN_AI_REQUEST_MAX_TOKENS, |
| 494 | + req_state.max_tokens_param) |
| 495 | + span.set_attribute(SpanAttributes.GEN_AI_USAGE_PROMPT_TOKENS, |
| 496 | + len(req_state.prompt_token_ids)) |
| 497 | + span.set_attribute(SpanAttributes.GEN_AI_USAGE_COMPLETION_TOKENS, |
| 498 | + metrics.num_generation_tokens) |
| 499 | + span.set_attribute(SpanAttributes.GEN_AI_LATENCY_TIME_IN_QUEUE, |
| 500 | + metrics.queued_ts - metrics.arrival_time) |
| 501 | + span.set_attribute( |
| 502 | + SpanAttributes.GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN, ttft) |
| 503 | + span.set_attribute(SpanAttributes.GEN_AI_LATENCY_E2E, e2e_time) |
| 504 | + span.set_attribute(SpanAttributes.GEN_AI_LATENCY_TIME_IN_QUEUE, |
| 505 | + queued_time) |
| 506 | + span.set_attribute( |
| 507 | + SpanAttributes.GEN_AI_LATENCY_TIME_IN_MODEL_PREFILL, |
| 508 | + prefill_time) |
| 509 | + span.set_attribute( |
| 510 | + SpanAttributes.GEN_AI_LATENCY_TIME_IN_MODEL_DECODE, |
| 511 | + decode_time) |
| 512 | + span.set_attribute( |
| 513 | + SpanAttributes.GEN_AI_LATENCY_TIME_IN_MODEL_INFERENCE, |
| 514 | + inference_time) |
| 515 | + |
443 | 516 | def _update_stats_from_output(self, req_state: RequestState, |
444 | 517 | engine_core_output: EngineCoreOutput, |
445 | 518 | engine_core_timestamp: Optional[float], |
|
0 commit comments