Skip to content

Commit 21205b8

Browse files
author
皓聪
committed
feat: otel trace impl
1 parent bb6ba8c commit 21205b8

File tree

11 files changed

+281
-6
lines changed

11 files changed

+281
-6
lines changed

requirements-dev.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,9 @@ ruff==0.9.4
3131
lm_eval[api]==0.4.8
3232
docstring_parser
3333
genai-perf==0.0.13
34+
typing_extensions
35+
# observation.tracing
36+
opentelemetry-sdk
37+
opentelemetry-api
38+
opentelemetry-exporter-otlp
39+
opentelemetry-semantic-conventions-ai

requirements.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,9 @@ ninja
5959
etcd3
6060
blake3
6161
llguidance==0.7.29
62+
typing_extensions
63+
# observation.tracing
64+
opentelemetry-sdk
65+
opentelemetry-api
66+
opentelemetry-exporter-otlp
67+
opentelemetry-semantic-conventions-ai

tensorrt_llm/commands/serve.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def get_llm_args(model: str,
8484
num_postprocess_workers: int = 0,
8585
trust_remote_code: bool = False,
8686
reasoning_parser: Optional[str] = None,
87+
otlp_traces_endpoint: Optional[str] = None,
8788
**llm_args_extra_dict: Any):
8889

8990
if gpus_per_node is None:
@@ -125,6 +126,7 @@ def get_llm_args(model: str,
125126
"num_postprocess_workers": num_postprocess_workers,
126127
"postprocess_tokenizer_dir": tokenizer or model,
127128
"reasoning_parser": reasoning_parser,
129+
"otlp_traces_endpoint": otlp_traces_endpoint,
128130
}
129131

130132
return llm_args, llm_args_extra_dict
@@ -249,6 +251,13 @@ def launch_server(host: str,
249251
default=None,
250252
help="Server role. Specify this value only if running in disaggregated mode."
251253
)
254+
@click.option(
255+
"--otlp_traces_endpoint",
256+
type=str,
257+
default=None,
258+
help=
259+
"Target URL to which OpenTelemetry traces will be sent."
260+
)
252261
def serve(model: str, tokenizer: Optional[str], host: str, port: int,
253262
log_level: str, backend: str, max_beam_width: int,
254263
max_batch_size: int, max_num_tokens: int, max_seq_len: int,
@@ -258,7 +267,8 @@ def serve(model: str, tokenizer: Optional[str], host: str, port: int,
258267
num_postprocess_workers: int, trust_remote_code: bool,
259268
extra_llm_api_options: Optional[str], reasoning_parser: Optional[str],
260269
metadata_server_config_file: Optional[str],
261-
server_role: Optional[str]):
270+
server_role: Optional[str],
271+
otlp_traces_endpoint: Optional[str]):
262272
"""Running an OpenAI API compatible server
263273
264274
MODEL: model name | HF checkpoint path | TensorRT engine path
@@ -281,7 +291,8 @@ def serve(model: str, tokenizer: Optional[str], host: str, port: int,
281291
free_gpu_memory_fraction=kv_cache_free_gpu_memory_fraction,
282292
num_postprocess_workers=num_postprocess_workers,
283293
trust_remote_code=trust_remote_code,
284-
reasoning_parser=reasoning_parser)
294+
reasoning_parser=reasoning_parser,
295+
otlp_traces_endpoint=otlp_traces_endpoint)
285296

286297
llm_args_extra_dict = {}
287298
if extra_llm_api_options is not None:

tensorrt_llm/executor/executor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import atexit
2+
from collections.abc import Mapping
23
import faulthandler
34
import multiprocessing
45
import platform
@@ -122,6 +123,7 @@ def generate_async(
122123
mrope_config: Optional[dict] = None,
123124
kv_cache_retention_config: Optional[KvCacheRetentionConfig] = None,
124125
disaggregated_params: Optional[DisaggregatedParams] = None,
126+
trace_headers: Optional[Mapping[str, str]] = None,
125127
postproc_params: Optional[PostprocParams] = None,
126128
return_perf_metrics: Optional[bool] = False) -> GenerationResult:
127129
"""Generate output for the given prompt token ids in the asynchronous mode.
@@ -149,7 +151,8 @@ def generate_async(
149151
mrope_config=mrope_config,
150152
kv_cache_retention_config=kv_cache_retention_config,
151153
disaggregated_params=disaggregated_params,
152-
return_perf_metrics=return_perf_metrics))
154+
return_perf_metrics=return_perf_metrics,
155+
trace_headers=trace_headers))
153156
return result
154157

155158
def generate(

tensorrt_llm/executor/request.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from collections.abc import Mapping
12
import os
23
from dataclasses import dataclass
34
from typing import List, Optional, Union
@@ -87,6 +88,7 @@ def __init__(
8788
mrope_config: Optional[dict] = None,
8889
kv_cache_retention_config: Optional[KvCacheRetentionConfig] = None,
8990
disaggregated_params: Optional[DisaggregatedParams] = None,
91+
trace_headers: Optional[Mapping[str, str]] = None,
9092
postproc_params: Optional[PostprocParams] = None,
9193
return_perf_metrics: Optional[bool] = False,
9294
):
@@ -114,6 +116,7 @@ def __init__(
114116
self.id: Optional[int] = None
115117
self.disaggregated_params = disaggregated_params
116118
self.return_perf_metrics = return_perf_metrics
119+
self.trace_headers = trace_headers
117120

118121
def set_id(self, id):
119122
assert self.id is None, f"Request ID is already set: {self.id}"

tensorrt_llm/executor/result.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from collections.abc import Mapping
23
import json
34
import weakref
45
from dataclasses import dataclass, field
@@ -7,6 +8,7 @@
78
Optional, TypeAlias, Union)
89
from weakref import WeakMethod
910

11+
from tensorrt_llm.llmapi.otel_tracing import SpanAttributes, SpanKind, extract_trace_context, global_otlp_tracer
1012
import torch
1113
import torch.nn.functional as F
1214

@@ -160,6 +162,7 @@ def __init__(self,
160162
self.decoding_iter = 0
161163
self._done = False
162164
self.metrics_dict = {}
165+
self.trace_headers = None
163166

164167
if has_event_loop():
165168
self.aqueue = AsyncQueue()
@@ -284,6 +287,7 @@ def _handle_sequence(self,
284287
raise ValueError(
285288
f"Unknown finish reason: {finish_reasons[src_idx]}")
286289
self.record_stats(output, req_perf_metrics_dict)
290+
self.do_tracing(output, req_perf_metrics_dict,req_perf_metrics_dict)
287291

288292
@nvtx_range_debug("handle_response",
289293
color="red",
@@ -375,6 +379,70 @@ def record_stats(self,
375379
metrics_stats.update(processed_metrics_stat)
376380
self.metrics_dict = metrics_stats
377381

382+
def do_tracing(
383+
self,
384+
output: CompletionOutput,
385+
req_perf_metrics_dict: Optional[dict[str, float]] = None,
386+
):
387+
if not global_otlp_tracer():
388+
return
389+
390+
metrics_dict = self.metrics_dict
391+
if not metrics_dict:
392+
# Insufficient request metrics available; trace generation aborted.
393+
return
394+
395+
trace_context = extract_trace_context(self.trace_headers)
396+
sampling_params = self.sampling_params
397+
with global_otlp_tracer().start_as_current_span(
398+
"llm_request",
399+
kind=SpanKind.SERVER,
400+
context=trace_context,
401+
start_time=int(
402+
req_perf_metrics_dict.get(RequestEventTiming.ARRIVAL_TIME, 0)
403+
),
404+
) as span:
405+
406+
def safe_set_attr(span, attr, value):
407+
if value is not None:
408+
span.set_attribute(attr, value)
409+
410+
e2e_time = metrics_dict.get(SupportedMetricNames.E2E, -1)
411+
safe_set_attr(
412+
span,
413+
SpanAttributes.GEN_AI_REQUEST_TEMPERATURE,
414+
sampling_params.temperature,
415+
)
416+
safe_set_attr(
417+
span, SpanAttributes.GEN_AI_REQUEST_TOP_P, sampling_params.top_p
418+
)
419+
safe_set_attr(
420+
span,
421+
SpanAttributes.GEN_AI_REQUEST_MAX_TOKENS,
422+
sampling_params.max_tokens,
423+
)
424+
safe_set_attr(span, SpanAttributes.GEN_AI_REQUEST_N, sampling_params.n)
425+
safe_set_attr(
426+
span,
427+
SpanAttributes.GEN_AI_USAGE_PROMPT_TOKENS,
428+
self.postproc_params.postproc_args.num_prompt_tokens,
429+
)
430+
safe_set_attr(
431+
span, SpanAttributes.GEN_AI_USAGE_COMPLETION_TOKENS, output.length
432+
)
433+
safe_set_attr(
434+
span,
435+
SpanAttributes.GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN,
436+
metrics_dict.get(SupportedMetricNames.TTFT, -1),
437+
)
438+
safe_set_attr(span, SpanAttributes.GEN_AI_LATENCY_E2E, e2e_time)
439+
safe_set_attr(span, SpanAttributes.GEN_AI_REQUEST_ID, self.id)
440+
safe_set_attr(
441+
span,
442+
SpanAttributes.GEN_AI_LATENCY_TIME_IN_QUEUE,
443+
metrics_dict.get(SupportedMetricNames.REQUEST_QUEUE_TIME, -1),
444+
)
445+
378446

379447
class DetokenizedGenerationResultBase(GenerationResultBase):
380448
''' The base class for the generation result with detokenization support. '''
@@ -462,6 +530,7 @@ def __init__(
462530
self.disaggregated_params = disaggregated_params
463531
# minimal sampling params needed for logprob calculation
464532
self._logprob_params = logprob_params
533+
self.trace_headers = generation_request.trace_headers
465534

466535
# for aborting the request
467536
self._executor: Optional[weakref.ReferenceType[

tensorrt_llm/llmapi/llm.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import atexit
2+
from collections.abc import Mapping
23
import json
34
import os
45
import shutil
@@ -14,6 +15,7 @@
1415

1516
from tensorrt_llm.inputs.data import TextPrompt
1617
from tensorrt_llm.inputs.registry import DefaultInputProcessor
18+
from tensorrt_llm.llmapi.otel_tracing import init_tracer
1719

1820
from .._utils import nvtx_range_debug
1921
from ..bindings import executor as tllm
@@ -207,6 +209,13 @@ def __init__(self,
207209
self.mpi_session.shutdown()
208210
raise
209211

212+
try:
213+
if self.args.otlp_traces_endpoint:
214+
init_tracer("trt.llm",self.args.otlp_traces_endpoint)
215+
logger.info(f"Initialize otlp tracer success, endpont: {self.args.otlp_traces_endpoint}")
216+
except Exception as e:
217+
logger.error(f"Failed to initialize otlp tracer: {e}")
218+
210219
exception_handler.register(self, 'shutdown')
211220
atexit.register(LLM._shutdown_wrapper, weakref.ref(self))
212221

@@ -305,6 +314,7 @@ def generate_async(
305314
streaming: bool = False,
306315
kv_cache_retention_config: Optional[KvCacheRetentionConfig] = None,
307316
disaggregated_params: Optional[DisaggregatedParams] = None,
317+
trace_headers: Optional[Mapping[str, str]] = None,
308318
_postproc_params: Optional[PostprocParams] = None,
309319
) -> RequestOutput:
310320
"""Generate output for the given prompt in the asynchronous mode.
@@ -413,6 +423,7 @@ def generate_async(
413423
mrope_config=mrope_config,
414424
kv_cache_retention_config=kv_cache_retention_config,
415425
disaggregated_params=disaggregated_params,
426+
trace_headers=trace_headers,
416427
postproc_params=_postproc_params,
417428
return_perf_metrics=self.args.return_perf_metrics,
418429
)

tensorrt_llm/llmapi/llm_args.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,6 +1001,11 @@ class BaseLlmArgs(BaseModel):
10011001
json_schema_extra={"type": "Optional[MpiSession]"},
10021002
exclude=True,
10031003
alias="_mpi_session")
1004+
1005+
otlp_traces_endpoint: Optional[str] = Field(
1006+
default=None,
1007+
description="Target URL to which OpenTelemetry traces will be sent.",
1008+
alias="otlp_traces_endpoint")
10041009

10051010
backend: Optional[str] = Field(
10061011
default=None,

0 commit comments

Comments
 (0)