Skip to content

Commit eada6ed

Browse files
author
皓聪
committed
feat: otel trace impl
Signed-off-by: Zhang Haotong <[email protected]>
1 parent 767879e commit eada6ed

File tree

12 files changed

+316
-5
lines changed

12 files changed

+316
-5
lines changed

requirements-dev.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,9 @@ ruff==0.9.4
3131
lm_eval[api]==0.4.8
3232
docstring_parser
3333
genai-perf==0.0.13
34+
typing_extensions
35+
# observation.tracing
36+
opentelemetry-sdk
37+
opentelemetry-api
38+
opentelemetry-exporter-otlp
39+
opentelemetry-semantic-conventions-ai

requirements.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ meson
6363
ninja
6464
etcd3
6565
blake3
66+
# observation.tracing
67+
opentelemetry-sdk
68+
opentelemetry-api
69+
opentelemetry-exporter-otlp
70+
opentelemetry-semantic-conventions-ai
6671
soundfile
6772
triton==3.3.1; platform_machine == "x86_64"
6873
tiktoken

tensorrt_llm/_utils.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
from enum import EnumMeta
2929
from functools import lru_cache, partial, wraps
3030
from pathlib import Path
31-
from typing import Any, Dict, List, Optional, Sequence, Union
31+
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
32+
from typing_extensions import ParamSpec
3233

3334
import numpy as np
3435
import nvtx
@@ -1127,3 +1128,16 @@ def set_prometheus_multiproc_dir() -> object:
11271128
os.environ["PROMETHEUS_MULTIPROC_DIR"] = prometheus_multiproc_dir.name
11281129
logger.info(
11291130
f"PROMETHEUS_MULTIPROC_DIR: {os.environ['PROMETHEUS_MULTIPROC_DIR']}")
1131+
1132+
1133+
P = ParamSpec("P")
1134+
1135+
# From: https://stackoverflow.com/a/4104188/2749989
1136+
def run_once(f: Callable[P, None]) -> Callable[P, None]:
1137+
def wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
1138+
if not wrapper.has_run: # type: ignore[attr-defined]
1139+
wrapper.has_run = True # type: ignore[attr-defined]
1140+
return f(*args, **kwargs)
1141+
1142+
wrapper.has_run = False # type: ignore[attr-defined]
1143+
return wrapper

tensorrt_llm/commands/serve.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def get_llm_args(model: str,
8686
trust_remote_code: bool = False,
8787
reasoning_parser: Optional[str] = None,
8888
fail_fast_on_attention_window_too_large: bool = False,
89+
otlp_traces_endpoint: Optional[str] = None,
8990
**llm_args_extra_dict: Any):
9091

9192
if gpus_per_node is None:
@@ -148,6 +149,7 @@ def get_llm_args(model: str,
148149
reasoning_parser,
149150
"fail_fast_on_attention_window_too_large":
150151
fail_fast_on_attention_window_too_large,
152+
"otlp_traces_endpoint": otlp_traces_endpoint,
151153
}
152154

153155
return llm_args, llm_args_extra_dict
@@ -285,6 +287,13 @@ def launch_server(host: str,
285287
help=
286288
"Exit with runtime error when attention window is too large to fit even a single sequence in the KV cache."
287289
)
290+
@click.option(
291+
"--otlp_traces_endpoint",
292+
type=str,
293+
default=None,
294+
help=
295+
"Target URL to which OpenTelemetry traces will be sent."
296+
)
288297
def serve(model: str, tokenizer: Optional[str], host: str, port: int,
289298
log_level: str, backend: str, max_beam_width: int,
290299
max_batch_size: int, max_num_tokens: int, max_seq_len: int,
@@ -295,7 +304,8 @@ def serve(model: str, tokenizer: Optional[str], host: str, port: int,
295304
extra_llm_api_options: Optional[str], reasoning_parser: Optional[str],
296305
metadata_server_config_file: Optional[str],
297306
server_role: Optional[str],
298-
fail_fast_on_attention_window_too_large: bool):
307+
fail_fast_on_attention_window_too_large: bool,
308+
otlp_traces_endpoint: Optional[str]):
299309
"""Running an OpenAI API compatible server
300310
301311
MODEL: model name | HF checkpoint path | TensorRT engine path
@@ -321,7 +331,8 @@ def serve(model: str, tokenizer: Optional[str], host: str, port: int,
321331
trust_remote_code=trust_remote_code,
322332
reasoning_parser=reasoning_parser,
323333
fail_fast_on_attention_window_too_large=
324-
fail_fast_on_attention_window_too_large)
334+
fail_fast_on_attention_window_too_large,
335+
otlp_traces_endpoint=otlp_traces_endpoint)
325336

326337
llm_args_extra_dict = {}
327338
if extra_llm_api_options is not None:

tensorrt_llm/executor/executor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import atexit
2+
from collections.abc import Mapping
23
import faulthandler
34
import multiprocessing
45
import platform
@@ -119,6 +120,7 @@ def generate_async(
119120
streaming: bool = False,
120121
kv_cache_retention_config: Optional[KvCacheRetentionConfig] = None,
121122
disaggregated_params: Optional[DisaggregatedParams] = None,
123+
trace_headers: Optional[Mapping[str, str]] = None,
122124
postproc_params: Optional[PostprocParams] = None,
123125
multimodal_params: Optional[MultimodalParams] = None,
124126
scheduling_params: Optional[SchedulingParams] = None,
@@ -144,6 +146,7 @@ def generate_async(
144146
streaming=streaming,
145147
kv_cache_retention_config=kv_cache_retention_config,
146148
disaggregated_params=disaggregated_params,
149+
trace_headers=trace_headers,
147150
multimodal_params=multimodal_params,
148151
scheduling_params=scheduling_params)
149152
result = self.submit(request)

tensorrt_llm/executor/request.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from collections.abc import Mapping
12
import os
23
from dataclasses import dataclass
34
from typing import List, Optional, Union
@@ -94,6 +95,7 @@ def __init__(
9495
streaming: bool = False,
9596
kv_cache_retention_config: Optional[KvCacheRetentionConfig] = None,
9697
disaggregated_params: Optional[DisaggregatedParams] = None,
98+
trace_headers: Optional[Mapping[str, str]] = None,
9799
postproc_params: Optional[PostprocParams] = None,
98100
multimodal_params: Optional[MultimodalParams] = None,
99101
scheduling_params: Optional[SchedulingParams] = None,
@@ -121,6 +123,7 @@ def __init__(
121123
self.kv_cache_retention_config = kv_cache_retention_config
122124
self.id: Optional[int] = None
123125
self.disaggregated_params = disaggregated_params
126+
self.trace_headers = trace_headers
124127
self.scheduling_params = scheduling_params
125128

126129
def set_id(self, id):

tensorrt_llm/executor/result.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from collections.abc import Mapping
23
import json
34
import weakref
45
from dataclasses import dataclass, field
@@ -7,6 +8,7 @@
78
Optional, TypeAlias, Union)
89
from weakref import WeakMethod
910

11+
from tensorrt_llm.llmapi.otel_tracing import SpanAttributes, SpanKind, extract_trace_context, global_otlp_tracer
1012
import torch
1113
import torch.nn.functional as F
1214

@@ -160,6 +162,7 @@ def __init__(self,
160162
self.decoding_iter = 0
161163
self._done = False
162164
self.metrics_dict = {}
165+
self.trace_headers = None
163166

164167
if has_event_loop():
165168
self.aqueue = AsyncQueue()
@@ -288,6 +291,7 @@ def _handle_sequence(self,
288291
raise ValueError(
289292
f"Unknown finish reason: {finish_reasons[src_idx]}")
290293
self.record_stats(output, req_perf_metrics_dict)
294+
self.do_tracing(output, req_perf_metrics_dict,req_perf_metrics_dict)
291295

292296
@nvtx_range_debug("handle_response",
293297
color="red",
@@ -388,6 +392,70 @@ def record_stats(self,
388392
metrics_stats.update(processed_metrics_stat)
389393
self.metrics_dict = metrics_stats
390394

395+
def do_tracing(
396+
self,
397+
output: CompletionOutput,
398+
req_perf_metrics_dict: Optional[dict[str, float]] = None,
399+
):
400+
if not global_otlp_tracer():
401+
return
402+
403+
metrics_dict = self.metrics_dict
404+
if not metrics_dict:
405+
# Insufficient request metrics available; trace generation aborted.
406+
return
407+
408+
trace_context = extract_trace_context(self.trace_headers)
409+
sampling_params = self.sampling_params
410+
with global_otlp_tracer().start_as_current_span(
411+
"llm_request",
412+
kind=SpanKind.SERVER,
413+
context=trace_context,
414+
start_time=int(
415+
req_perf_metrics_dict.get(RequestEventTiming.ARRIVAL_TIME, 0)
416+
),
417+
) as span:
418+
419+
def safe_set_attr(span, attr, value):
420+
if value is not None:
421+
span.set_attribute(attr, value)
422+
423+
e2e_time = metrics_dict.get(SupportedMetricNames.E2E, -1)
424+
safe_set_attr(
425+
span,
426+
SpanAttributes.GEN_AI_REQUEST_TEMPERATURE,
427+
sampling_params.temperature,
428+
)
429+
safe_set_attr(
430+
span, SpanAttributes.GEN_AI_REQUEST_TOP_P, sampling_params.top_p
431+
)
432+
safe_set_attr(
433+
span,
434+
SpanAttributes.GEN_AI_REQUEST_MAX_TOKENS,
435+
sampling_params.max_tokens,
436+
)
437+
safe_set_attr(span, SpanAttributes.GEN_AI_REQUEST_N, sampling_params.n)
438+
safe_set_attr(
439+
span,
440+
SpanAttributes.GEN_AI_USAGE_PROMPT_TOKENS,
441+
self.postproc_params.postproc_args.num_prompt_tokens,
442+
)
443+
safe_set_attr(
444+
span, SpanAttributes.GEN_AI_USAGE_COMPLETION_TOKENS, output.length
445+
)
446+
safe_set_attr(
447+
span,
448+
SpanAttributes.GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN,
449+
metrics_dict.get(SupportedMetricNames.TTFT, -1),
450+
)
451+
safe_set_attr(span, SpanAttributes.GEN_AI_LATENCY_E2E, e2e_time)
452+
safe_set_attr(span, SpanAttributes.GEN_AI_REQUEST_ID, self.id)
453+
safe_set_attr(
454+
span,
455+
SpanAttributes.GEN_AI_LATENCY_TIME_IN_QUEUE,
456+
metrics_dict.get(SupportedMetricNames.REQUEST_QUEUE_TIME, -1),
457+
)
458+
391459

392460
class DetokenizedGenerationResultBase(GenerationResultBase):
393461
''' The base class for the generation result with detokenization support. '''
@@ -498,6 +566,7 @@ def __init__(
498566
self.disaggregated_params = disaggregated_params
499567
# minimal sampling params needed for logprob calculation
500568
self._logprob_params = logprob_params
569+
self.trace_headers = generation_request.trace_headers
501570

502571
# for aborting the request
503572
self._executor: Optional[weakref.ReferenceType[

tensorrt_llm/llmapi/llm.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import atexit
2+
from collections.abc import Mapping
23
import json
34
import os
45
import shutil
@@ -15,6 +16,7 @@
1516
from tensorrt_llm.inputs.data import TextPrompt
1617
from tensorrt_llm.inputs.multimodal import MultimodalParams
1718
from tensorrt_llm.inputs.registry import DefaultInputProcessor
19+
from tensorrt_llm.llmapi.otel_tracing import init_tracer
1820

1921
from .._utils import nvtx_range_debug
2022
from ..bindings import executor as tllm
@@ -209,6 +211,13 @@ def __init__(self,
209211
self.mpi_session.shutdown()
210212
raise
211213

214+
try:
215+
if self.args.otlp_traces_endpoint:
216+
init_tracer("trt.llm",self.args.otlp_traces_endpoint)
217+
logger.info(f"Initialize otlp tracer success, endpont: {self.args.otlp_traces_endpoint}")
218+
except Exception as e:
219+
logger.error(f"Failed to initialize otlp tracer: {e}")
220+
212221
exception_handler.register(self, 'shutdown')
213222
atexit.register(LLM._shutdown_wrapper, weakref.ref(self))
214223

@@ -313,6 +322,7 @@ def generate_async(
313322
streaming: bool = False,
314323
kv_cache_retention_config: Optional[KvCacheRetentionConfig] = None,
315324
disaggregated_params: Optional[DisaggregatedParams] = None,
325+
trace_headers: Optional[Mapping[str, str]] = None,
316326
_postproc_params: Optional[PostprocParams] = None,
317327
scheduling_params: Optional[SchedulingParams] = None,
318328
) -> RequestOutput:
@@ -431,6 +441,7 @@ def generate_async(
431441
streaming=streaming,
432442
kv_cache_retention_config=kv_cache_retention_config,
433443
disaggregated_params=disaggregated_params,
444+
trace_headers=trace_headers,
434445
postproc_params=_postproc_params,
435446
multimodal_params=multimodal_params,
436447
scheduling_params=scheduling_params,

tensorrt_llm/llmapi/llm_args.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,11 @@ class BaseLlmArgs(StrictBaseModel):
13101310
json_schema_extra={"type": "Optional[MpiSession]"},
13111311
exclude=True,
13121312
alias="_mpi_session")
1313+
1314+
otlp_traces_endpoint: Optional[str] = Field(
1315+
default=None,
1316+
description="Target URL to which OpenTelemetry traces will be sent.",
1317+
alias="otlp_traces_endpoint")
13131318

13141319
backend: Optional[str] = Field(
13151320
default=None,

0 commit comments

Comments
 (0)