From 15bc6ecc38d7baf75a1f4f3334a308c6983e26b0 Mon Sep 17 00:00:00 2001 From: richardhuo-nv Date: Wed, 3 Sep 2025 05:14:54 +0000 Subject: [PATCH 1/3] add llm args to the connector api Signed-off-by: richardhuo-nv fix Signed-off-by: richardhuo-nv --- examples/llm-api/llm_kv_cache_connector.py | 12 ++++++------ tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py | 8 ++++++-- .../_torch/pyexecutor/py_executor_creator.py | 5 +++-- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/examples/llm-api/llm_kv_cache_connector.py b/examples/llm-api/llm_kv_cache_connector.py index 599fab6f9ac..a16716496ca 100644 --- a/examples/llm-api/llm_kv_cache_connector.py +++ b/examples/llm-api/llm_kv_cache_connector.py @@ -15,7 +15,7 @@ from tensorrt_llm._torch.pyexecutor.kv_cache_connector import ( KvCacheConnectorScheduler, KvCacheConnectorWorker, SchedulerOutput) from tensorrt_llm.bindings.internal.batch_manager import LlmRequest -from tensorrt_llm.llmapi.llm_args import KvCacheConnectorConfig +from tensorrt_llm.llmapi.llm_args import KvCacheConnectorConfig, TorchLlmArgs # This is a simple example of the use of the KV cache connector. # It persists KV cache contents into a folder, and can load them back on subsequent runs. @@ -33,8 +33,8 @@ class PersistentKvCacheConnectorMetadata: class PersistentKvCacheConnectorWorker(KvCacheConnectorWorker): - def __init__(self): - super().__init__() + def __init__(self, llm_args: TorchLlmArgs): + super().__init__(llm_args) self.kv_cache_tensor = None @@ -80,10 +80,10 @@ def get_finished( class PersistentKvCacheConnectorLeader(KvCacheConnectorScheduler): - def __init__(self, tokens_per_block): - super().__init__() + def __init__(self, llm_args: TorchLlmArgs, tokens_per_block: int): + super().__init__(llm_args, tokens_per_block) - self.block_size = tokens_per_block + self.block_size = self._tokens_per_block self.pending_loads = {} self.cache_folder = os.environ.get(CONNECTOR_CACHE_FOLDER_KEY, diff --git a/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py b/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py index 9bec793a8c4..55416f34dda 100644 --- a/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py +++ b/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py @@ -47,6 +47,7 @@ from tensorrt_llm.bindings.internal.batch_manager import \ KvCacheConnectorManager as KvCacheConnectorManagerCpp from tensorrt_llm.bindings.internal.batch_manager import LlmRequest +from tensorrt_llm.llmapi.llm_args import TorchLlmArgs from .scheduler import ScheduledRequests @@ -80,7 +81,8 @@ class SchedulerOutput: class KvCacheConnectorWorker(ABC): - def __init__(self): + def __init__(self, llm_args: TorchLlmArgs): + self._llm_args = llm_args self._metadata = None super().__init__() @@ -160,7 +162,9 @@ def get_finished( class KvCacheConnectorScheduler(ABC): - def __init__(self): + def __init__(self, llm_args: TorchLlmArgs, tokens_per_block: int): + self._llm_args = llm_args + self._tokens_per_block = tokens_per_block super().__init__() @abstractmethod diff --git a/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py b/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py index 5a7502f844f..c518a993440 100644 --- a/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py +++ b/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py @@ -445,11 +445,12 @@ def drafting_loop_wrapper(model): # In this case, the worker may be dependent on the scheduler, or vice-versa. # To deal with cases like this, we instantiate them both concurrently. with ThreadPoolExecutor(max_workers=2) as executor: - connector_worker_task = executor.submit(worker_cls) + connector_worker_task = executor.submit(worker_cls, llm_args) if scheduler_cls is not None and rank == 0: connector_scheduler_task = executor.submit( - scheduler_cls, executor_config.tokens_per_block) + scheduler_cls, llm_args, + executor_config.tokens_per_block) connector_scheduler = connector_scheduler_task.result() else: connector_scheduler = None From 539f09a177248612738ea0f0da66eb538de8ceb1 Mon Sep 17 00:00:00 2001 From: richardhuo-nv Date: Wed, 3 Sep 2025 05:57:22 +0000 Subject: [PATCH 2/3] add to workers Signed-off-by: richardhuo-nv --- examples/llm-api/llm_kv_cache_connector.py | 4 ++-- tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py | 3 ++- tensorrt_llm/_torch/pyexecutor/py_executor_creator.py | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/examples/llm-api/llm_kv_cache_connector.py b/examples/llm-api/llm_kv_cache_connector.py index a16716496ca..cee3d213510 100644 --- a/examples/llm-api/llm_kv_cache_connector.py +++ b/examples/llm-api/llm_kv_cache_connector.py @@ -33,8 +33,8 @@ class PersistentKvCacheConnectorMetadata: class PersistentKvCacheConnectorWorker(KvCacheConnectorWorker): - def __init__(self, llm_args: TorchLlmArgs): - super().__init__(llm_args) + def __init__(self, llm_args: TorchLlmArgs, tokens_per_block: int): + super().__init__(llm_args, tokens_per_block) self.kv_cache_tensor = None diff --git a/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py b/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py index 55416f34dda..0a220a9af84 100644 --- a/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py +++ b/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py @@ -81,8 +81,9 @@ class SchedulerOutput: class KvCacheConnectorWorker(ABC): - def __init__(self, llm_args: TorchLlmArgs): + def __init__(self, llm_args: TorchLlmArgs, tokens_per_block: int): self._llm_args = llm_args + self._tokens_per_block = tokens_per_block self._metadata = None super().__init__() diff --git a/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py b/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py index c518a993440..31656595831 100644 --- a/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py +++ b/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py @@ -445,7 +445,8 @@ def drafting_loop_wrapper(model): # In this case, the worker may be dependent on the scheduler, or vice-versa. # To deal with cases like this, we instantiate them both concurrently. with ThreadPoolExecutor(max_workers=2) as executor: - connector_worker_task = executor.submit(worker_cls, llm_args) + connector_worker_task = executor.submit( + worker_cls, llm_args, executor_config.tokens_per_block) if scheduler_cls is not None and rank == 0: connector_scheduler_task = executor.submit( From 440af5fcfe99849485058638d970ef2305887068 Mon Sep 17 00:00:00 2001 From: richardhuo-nv Date: Sun, 7 Sep 2025 19:36:22 -0700 Subject: [PATCH 3/3] use tokens_per_block in kv_cache_config in TorchLlmArgs Signed-off-by: richardhuo-nv --- examples/llm-api/llm_kv_cache_connector.py | 10 +++++----- tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py | 6 ++---- tensorrt_llm/_torch/pyexecutor/py_executor_creator.py | 6 ++---- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/examples/llm-api/llm_kv_cache_connector.py b/examples/llm-api/llm_kv_cache_connector.py index cee3d213510..1eac9a9cd98 100644 --- a/examples/llm-api/llm_kv_cache_connector.py +++ b/examples/llm-api/llm_kv_cache_connector.py @@ -33,8 +33,8 @@ class PersistentKvCacheConnectorMetadata: class PersistentKvCacheConnectorWorker(KvCacheConnectorWorker): - def __init__(self, llm_args: TorchLlmArgs, tokens_per_block: int): - super().__init__(llm_args, tokens_per_block) + def __init__(self, llm_args: TorchLlmArgs): + super().__init__(llm_args) self.kv_cache_tensor = None @@ -80,10 +80,10 @@ def get_finished( class PersistentKvCacheConnectorLeader(KvCacheConnectorScheduler): - def __init__(self, llm_args: TorchLlmArgs, tokens_per_block: int): - super().__init__(llm_args, tokens_per_block) + def __init__(self, llm_args: TorchLlmArgs): + super().__init__(llm_args) - self.block_size = self._tokens_per_block + self.block_size = self._llm_args.kv_cache_config.tokens_per_block self.pending_loads = {} self.cache_folder = os.environ.get(CONNECTOR_CACHE_FOLDER_KEY, diff --git a/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py b/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py index 0a220a9af84..813b36112fa 100644 --- a/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py +++ b/tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py @@ -81,9 +81,8 @@ class SchedulerOutput: class KvCacheConnectorWorker(ABC): - def __init__(self, llm_args: TorchLlmArgs, tokens_per_block: int): + def __init__(self, llm_args: TorchLlmArgs): self._llm_args = llm_args - self._tokens_per_block = tokens_per_block self._metadata = None super().__init__() @@ -163,9 +162,8 @@ def get_finished( class KvCacheConnectorScheduler(ABC): - def __init__(self, llm_args: TorchLlmArgs, tokens_per_block: int): + def __init__(self, llm_args: TorchLlmArgs): self._llm_args = llm_args - self._tokens_per_block = tokens_per_block super().__init__() @abstractmethod diff --git a/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py b/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py index 31656595831..2ec87a1c1c5 100644 --- a/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py +++ b/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py @@ -445,13 +445,11 @@ def drafting_loop_wrapper(model): # In this case, the worker may be dependent on the scheduler, or vice-versa. # To deal with cases like this, we instantiate them both concurrently. with ThreadPoolExecutor(max_workers=2) as executor: - connector_worker_task = executor.submit( - worker_cls, llm_args, executor_config.tokens_per_block) + connector_worker_task = executor.submit(worker_cls, llm_args) if scheduler_cls is not None and rank == 0: connector_scheduler_task = executor.submit( - scheduler_cls, llm_args, - executor_config.tokens_per_block) + scheduler_cls, llm_args) connector_scheduler = connector_scheduler_task.result() else: connector_scheduler = None