Skip to content

Commit 01e6683

Browse files
committed
Fixes
1 parent 105e3c3 commit 01e6683

File tree

2 files changed

+22
-20
lines changed

2 files changed

+22
-20
lines changed

tests/async_engine/api_server_async_engine.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""vllm.entrypoints.api_server with some extra logging for testing."""
2-
from typing import Any, Dict
2+
from typing import Any, Dict, Iterable
33

44
import uvicorn
55
from fastapi.responses import JSONResponse, Response
@@ -18,9 +18,10 @@ def __init__(self, *args, **kwargs):
1818
super().__init__(*args, **kwargs)
1919
self._num_aborts = 0
2020

21-
async def abort(self, request_id: str) -> None:
22-
await super().abort(request_id)
23-
self._num_aborts += 1
21+
async def _engine_abort(self, request_ids: Iterable[str]):
22+
ids = list(request_ids)
23+
self._num_aborts += len(ids)
24+
await super()._engine_abort(ids)
2425

2526
def testing_stats(self) -> Dict[str, Any]:
2627
return {"num_aborted_requests": self._num_aborts}

vllm/engine/async_llm_engine.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@ def put(self, item: Union[RequestOutput, EmbeddingRequestOutput,
8282
self._queue.put_nowait(item)
8383

8484
def finish(self, cancelled: bool = False) -> None:
85-
self._finished = True
86-
self._queue.put_nowait(
87-
asyncio.CancelledError if cancelled else STOP_ITERATION)
85+
if not self._finished:
86+
self._finished = True
87+
self._queue.put_nowait(
88+
asyncio.CancelledError if cancelled else STOP_ITERATION)
8889

8990
@property
9091
def finished(self) -> bool:
@@ -147,10 +148,11 @@ def process_request_output(self,
147148
# while the output was generated
148149
if (stream := self._request_streams.get(request_id)) is not None:
149150
stream.put(request_output)
150-
if request_output.finished:
151-
if verbose:
152-
logger.info("Finished request %s.", request_id)
153-
self.abort_request(request_id)
151+
if request_output.finished:
152+
stream.finish()
153+
154+
if verbose and request_output.finished:
155+
logger.info("Finished request %s.", request_id)
154156

155157
def process_exception(self,
156158
request_id: str,
@@ -198,12 +200,9 @@ def abort_request(self,
198200

199201
self._finished_requests.put_nowait(request_id)
200202

201-
if request_id not in self._request_streams or self._request_streams[
202-
request_id].finished:
203-
# The request has already finished or been aborted.
204-
return
205-
206-
self._request_streams[request_id].finish(cancelled=cancelled)
203+
stream = self._request_streams.get(request_id, None)
204+
if stream is not None:
205+
stream.finish(cancelled=cancelled)
207206

208207
def get_new_and_finished_requests(self) -> Tuple[List[Dict], Set[str]]:
209208
"""Get the new requests and finished requests to be
@@ -682,7 +681,9 @@ async def run_engine_loop(self):
682681
raise
683682
await asyncio.sleep(0)
684683

685-
def add_request(
684+
# This method does not need to be async, but kept that way
685+
# for backwards compatibility.
686+
async def add_request(
686687
self,
687688
request_id: str,
688689
inputs: PromptInputs,
@@ -787,7 +788,7 @@ async def generate(
787788
>>> # Process and return the final output
788789
>>> ...
789790
"""
790-
async for output in self.add_request(
791+
async for output in await self.add_request(
791792
request_id,
792793
inputs,
793794
sampling_params,
@@ -865,7 +866,7 @@ async def encode(
865866
>>> # Process and return the final output
866867
>>> ...
867868
"""
868-
async for output in self.add_request(
869+
async for output in await self.add_request(
869870
request_id,
870871
inputs,
871872
pooling_params,

0 commit comments

Comments
 (0)