From 3f31168d3b539ecad7fc9c12b0119126586b1f9e Mon Sep 17 00:00:00 2001 From: Jialin Ouyang Date: Fri, 7 Nov 2025 10:08:14 -0800 Subject: [PATCH 1/3] Replace list[list[int]] with list[np.ndarray] Signed-off-by: Jialin Ouyang --- vllm/v1/core/sched/scheduler.py | 4 ++-- vllm/v1/outputs.py | 3 ++- vllm/v1/sample/rejection_sampler.py | 8 +++----- vllm/v1/spec_decode/eagle.py | 7 +++---- vllm/v1/spec_decode/ngram_proposer.py | 4 ++-- vllm/v1/spec_decode/suffix_decoding.py | 18 ++++++++++-------- vllm/v1/worker/gpu_model_runner.py | 25 +++++++++++++++---------- 7 files changed, 37 insertions(+), 32 deletions(-) diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index f558306e3b2f..b7364a0af130 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -939,8 +939,8 @@ def update_from_output( continue req_index = model_runner_output.req_id_to_index[req_id] - generated_token_ids = ( - sampled_token_ids[req_index] if sampled_token_ids else [] + generated_token_ids: list[int] = ( + sampled_token_ids[req_index].tolist() if sampled_token_ids else [] ) scheduled_spec_token_ids = ( diff --git a/vllm/v1/outputs.py b/vllm/v1/outputs.py index e7122ba33968..7eb0371ca814 100644 --- a/vllm/v1/outputs.py +++ b/vllm/v1/outputs.py @@ -5,6 +5,7 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, NamedTuple +import numpy as np import torch if TYPE_CHECKING: @@ -141,7 +142,7 @@ class ModelRunnerOutput: # num_generated_tokens is the number of tokens # generated in the current step. It can be different for # each request due to speculative/jump decoding. - sampled_token_ids: list[list[int]] + sampled_token_ids: list[np.ndarray] # [num_reqs, max_num_logprobs + 1] # [num_reqs, max_num_logprobs + 1] diff --git a/vllm/v1/sample/rejection_sampler.py b/vllm/v1/sample/rejection_sampler.py index 926305d25f56..f31a0cddda9a 100644 --- a/vllm/v1/sample/rejection_sampler.py +++ b/vllm/v1/sample/rejection_sampler.py @@ -3,6 +3,7 @@ from dataclasses import replace +import numpy as np import torch import torch.nn as nn @@ -204,7 +205,7 @@ def _get_logprobs_tensors( def parse_output( output_token_ids: torch.Tensor, vocab_size: int, - ) -> list[list[int]]: + ) -> list[np.ndarray]: """Parse the output of the rejection sampler. Args: output_token_ids: The sampled token IDs in shape @@ -220,10 +221,7 @@ def parse_output( valid_mask = (output_token_ids_np != PLACEHOLDER_TOKEN_ID) & ( output_token_ids_np < vocab_size ) - outputs = [ - row[valid_mask[i]].tolist() for i, row in enumerate(output_token_ids_np) - ] - return outputs + return [row[valid_mask[i]] for i, row in enumerate(output_token_ids_np)] def apply_logits_processors( self, diff --git a/vllm/v1/spec_decode/eagle.py b/vllm/v1/spec_decode/eagle.py index 75a4140fd655..0e2e101a799a 100644 --- a/vllm/v1/spec_decode/eagle.py +++ b/vllm/v1/spec_decode/eagle.py @@ -482,7 +482,7 @@ def propose( def prepare_next_token_ids_cpu( self, - sampled_token_ids: list[list[int]], + sampled_token_ids: list[np.ndarray], requests: dict[str, CachedRequestState], gpu_input_batch: InputBatch, num_scheduled_tokens: dict[str, int], @@ -497,7 +497,7 @@ def prepare_next_token_ids_cpu( req_ids = gpu_input_batch.req_ids next_token_ids: list[int] = [] for i, token_ids in enumerate(sampled_token_ids): - if token_ids: + if token_ids.shape[0] > 0: # Common case. next_token_id = token_ids[-1] else: @@ -508,10 +508,9 @@ def prepare_next_token_ids_cpu( seq_len = req_state.num_computed_tokens + num_scheduled_tokens[req_id] next_token_id = req_state.get_token_id(seq_len) next_token_ids.append(next_token_id) - next_token_ids = torch.tensor( + return torch.tensor( next_token_ids, dtype=torch.int32, device=self.input_ids.device ) - return next_token_ids def prepare_next_token_ids_padded( self, diff --git a/vllm/v1/spec_decode/ngram_proposer.py b/vllm/v1/spec_decode/ngram_proposer.py index e2f83cb24aa9..f0f870684b6c 100644 --- a/vllm/v1/spec_decode/ngram_proposer.py +++ b/vllm/v1/spec_decode/ngram_proposer.py @@ -131,7 +131,7 @@ def batch_propose( def propose( self, - sampled_token_ids: list[list[int]], + sampled_token_ids: list[np.ndarray], req_ids: list[str], num_tokens_no_spec: np.ndarray, token_ids_cpu: np.ndarray, @@ -140,7 +140,7 @@ def propose( # find which requests need ngram proposals valid_ngram_requests = [] for i, sampled_ids in enumerate(sampled_token_ids): - num_sampled_ids = len(sampled_ids) + num_sampled_ids = sampled_ids.shape[0] if not num_sampled_ids: # Skip speculative decoding. continue diff --git a/vllm/v1/spec_decode/suffix_decoding.py b/vllm/v1/spec_decode/suffix_decoding.py index 049e335db325..85f0c632a9af 100644 --- a/vllm/v1/spec_decode/suffix_decoding.py +++ b/vllm/v1/spec_decode/suffix_decoding.py @@ -1,5 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import numpy as np + from vllm.config import VllmConfig from vllm.v1.worker.gpu_input_batch import InputBatch @@ -32,31 +34,31 @@ def __init__(self, vllm_config: VllmConfig): def propose( self, input_batch: InputBatch, - sampled_token_ids: list[list[int]], - ) -> list[list[int]]: + sampled_token_ids: list[np.ndarray], + ) -> list[np.ndarray]: """ Propose speculative tokens for each request in the input batch. Suffix Decoding will speculate a dynamic number of tokens for each request every decoding step, so each entry in the returned list may have different lengths. """ - draft_token_ids: list[list[int]] = [] + draft_token_ids: list[np.ndarray] = [] for i, sampled_ids in enumerate(sampled_token_ids): if not sampled_ids: # Skip speculative decoding for partial prefills. - draft_token_ids.append([]) + draft_token_ids.append(np.array([])) continue # Skip requests that require sampling parameters that are not # supported with speculative decoding. req_id = input_batch.req_ids[i] if req_id in input_batch.spec_decode_unsupported_reqs: - draft_token_ids.append([]) + draft_token_ids.append(np.array([])) continue num_tokens = input_batch.num_tokens_no_spec[i] if num_tokens >= self.max_model_len: # Skip requests that have already reached the max model length. - draft_token_ids.append([]) + draft_token_ids.append(np.array([])) continue index = input_batch.req_id_to_index[req_id] @@ -70,7 +72,7 @@ def propose( self.suffix_cache.start_request(req_id, prompt_token_ids) # Append the newly sampled ids to the suffix cache for this request. - self.suffix_cache.add_active_response(req_id, sampled_ids) + self.suffix_cache.add_active_response(req_id, sampled_ids.tolist()) # Suffix decoding only uses the most recent tokens up to max_tree_depth, so # we extract the pattern from the end of the input. @@ -86,7 +88,7 @@ def propose( min_token_prob=self.min_token_prob, ) - draft_token_ids.append(draft.token_ids) + draft_token_ids.append(np.array(draft.token_ids)) # Stop requests that were not seen in the input batch. for req_id in ( diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 91c8efc17feb..32ac187f494e 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -2276,7 +2276,7 @@ def _bookkeeping_sync( ) -> tuple[ dict[str, int], LogprobsLists | None, - list[list[int]], + list[np.ndarray], dict[str, LogprobsTensors | None], list[str], dict[str, int], @@ -2302,6 +2302,7 @@ def _bookkeeping_sync( num_sampled_tokens = sampler_output.sampled_token_ids.shape[0] sampled_token_ids = sampler_output.sampled_token_ids invalid_req_indices = [] + valid_sampled_token_ids: list[np.ndarray] if not self.use_async_scheduling: # Get the valid generated tokens. max_gen_len = sampled_token_ids.shape[-1] @@ -2316,7 +2317,7 @@ def _bookkeeping_sync( ) # Mask out the sampled tokens that should not be sampled. for i in discard_sampled_tokens_req_indices: - valid_sampled_token_ids[int(i)].clear() + valid_sampled_token_ids[int(i)] = np.array((0,)) else: valid_sampled_token_ids = [] invalid_req_indices = discard_sampled_tokens_req_indices.tolist() @@ -2345,11 +2346,13 @@ def _bookkeeping_sync( ) for req_idx in range(num_sampled_tokens): if self.use_async_scheduling: - sampled_ids = [-1] if req_idx not in invalid_req_indices_set else None + sampled_ids = ( + np.array([-1]) if req_idx not in invalid_req_indices_set else None + ) else: sampled_ids = valid_sampled_token_ids[req_idx] - num_sampled_ids: int = len(sampled_ids) if sampled_ids else 0 + num_sampled_ids: int = sampled_ids.shape[0] if sampled_ids else 0 if cu_num_accepted_tokens is not None: cu_num_accepted_tokens.append( @@ -2658,7 +2661,9 @@ def sample_tokens( with record_function_or_nullcontext("Sample"): sampler_output = self._sample(logits, spec_decode_metadata) - def propose_draft_token_ids(sampled_token_ids): + def propose_draft_token_ids( + sampled_token_ids: torch.Tensor | list[np.ndarray], + ) -> None: assert spec_decode_common_attn_metadata is not None with record_function_or_nullcontext("Draft"): self._draft_token_ids = self.propose_draft_token_ids( @@ -2773,14 +2778,14 @@ def take_draft_token_ids(self) -> DraftTokenIds | None: def propose_draft_token_ids( self, scheduler_output: "SchedulerOutput", - sampled_token_ids: torch.Tensor | list[list[int]], + sampled_token_ids: torch.Tensor | list[np.ndarray], sampling_metadata: SamplingMetadata, hidden_states: torch.Tensor, sample_hidden_states: torch.Tensor, aux_hidden_states: list[torch.Tensor] | None, spec_decode_metadata: SpecDecodeMetadata | None, common_attn_metadata: CommonAttentionMetadata, - ) -> list[list[int]] | torch.Tensor: + ) -> torch.Tensor | list[np.ndarray]: num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens if self.speculative_config.method == "ngram": assert isinstance(sampled_token_ids, list) @@ -2812,7 +2817,7 @@ def propose_draft_token_ids( for num_draft, tokens in zip( spec_decode_metadata.num_draft_tokens, sampled_token_ids ): - indices.append(offset + len(tokens) - 1) + indices.append(offset + tokens.shape[0] - 1) offset += num_draft + 1 indices = torch.tensor(indices, device=self.device) hidden_states = sample_hidden_states[indices] @@ -4772,7 +4777,7 @@ def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]: return kv_cache_spec - def _to_list(self, sampled_token_ids: torch.Tensor) -> list[list[int]]: + def _to_list(self, sampled_token_ids: torch.Tensor) -> list[np.ndarray]: # This is a short term mitigation for issue mentioned in # https://github.com/vllm-project/vllm/issues/22754. # `tolist` would trigger a cuda wise stream sync, which @@ -4785,4 +4790,4 @@ def _to_list(self, sampled_token_ids: torch.Tensor) -> list[list[int]]: pinned.copy_(sampled_token_ids, non_blocking=True) self.transfer_event.record() self.transfer_event.synchronize() - return pinned.tolist() + return [row for row in pinned] From 3492b2cc9d49422e40201a2e080de38474ae6359 Mon Sep 17 00:00:00 2001 From: Jialin Ouyang Date: Fri, 7 Nov 2025 15:57:54 -0800 Subject: [PATCH 2/3] DEBUG LOGGING Signed-off-by: Jialin Ouyang --- vllm/v1/core/sched/scheduler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index b7364a0af130..f2d6151c31cd 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -895,6 +895,9 @@ def update_from_output( model_runner_output: ModelRunnerOutput, ) -> dict[int, EngineCoreOutputs]: sampled_token_ids = model_runner_output.sampled_token_ids + logger.info(f"===Jialin {len(model_runner_output.sampled_token_ids)=}") + if len(model_runner_output.sampled_token_ids) > 0: + logger.info(f"===Jialin {type(model_runner_output.sampled_token_ids[0])=}") logprobs = model_runner_output.logprobs prompt_logprobs_dict = model_runner_output.prompt_logprobs_dict num_scheduled_tokens = scheduler_output.num_scheduled_tokens @@ -940,7 +943,7 @@ def update_from_output( req_index = model_runner_output.req_id_to_index[req_id] generated_token_ids: list[int] = ( - sampled_token_ids[req_index].tolist() if sampled_token_ids else [] + sampled_token_ids[req_index] if sampled_token_ids else [] ) scheduled_spec_token_ids = ( From 52b9d7a910385b893be82718080355f8617ebc20 Mon Sep 17 00:00:00 2001 From: Jialin Ouyang Date: Fri, 7 Nov 2025 17:24:02 -0800 Subject: [PATCH 3/3] apply same changes to async scheduling Signed-off-by: Jialin Ouyang --- vllm/v1/core/sched/scheduler.py | 5 +---- vllm/v1/worker/gpu_model_runner.py | 8 +++++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index f2d6151c31cd..b7364a0af130 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -895,9 +895,6 @@ def update_from_output( model_runner_output: ModelRunnerOutput, ) -> dict[int, EngineCoreOutputs]: sampled_token_ids = model_runner_output.sampled_token_ids - logger.info(f"===Jialin {len(model_runner_output.sampled_token_ids)=}") - if len(model_runner_output.sampled_token_ids) > 0: - logger.info(f"===Jialin {type(model_runner_output.sampled_token_ids[0])=}") logprobs = model_runner_output.logprobs prompt_logprobs_dict = model_runner_output.prompt_logprobs_dict num_scheduled_tokens = scheduler_output.num_scheduled_tokens @@ -943,7 +940,7 @@ def update_from_output( req_index = model_runner_output.req_id_to_index[req_id] generated_token_ids: list[int] = ( - sampled_token_ids[req_index] if sampled_token_ids else [] + sampled_token_ids[req_index].tolist() if sampled_token_ids else [] ) scheduled_spec_token_ids = ( diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 32ac187f494e..03b25718d540 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -207,9 +207,11 @@ def get_output(self) -> ModelRunnerOutput: del self._logprobs_tensors del self._sampled_token_ids - valid_sampled_token_ids = self.sampled_token_ids_cpu.tolist() + valid_sampled_token_ids: list[np.ndarray] = [ + row for row in self.sampled_token_ids_cpu.numpy() + ] for i in self._invalid_req_indices: - valid_sampled_token_ids[i].clear() + valid_sampled_token_ids[i] = np.array([]) output = self._model_runner_output output.sampled_token_ids = valid_sampled_token_ids @@ -4790,4 +4792,4 @@ def _to_list(self, sampled_token_ids: torch.Tensor) -> list[np.ndarray]: pinned.copy_(sampled_token_ids, non_blocking=True) self.transfer_event.record() self.transfer_event.synchronize() - return [row for row in pinned] + return [row for row in pinned.numpy()]