-
Notifications
You must be signed in to change notification settings - Fork 692
feat: Request Cancellation unary request support #3004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b767b9b to
0d21669
Compare
WalkthroughUpdates cancellation handling and logging across example middleware, Python tests, Rust LLM migration, and runtime network pipeline. Introduces a shared stream-end error constant, refines retry and context propagation via AsyncEngineContext, adjusts end-of-stream behavior on cancellation, and expands fault-tolerance tests to cover remote prefill and long-prompt scenarios. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant RetryManager
participant EngineCtx as AsyncEngineContext
participant Network as Network Pipeline
Client->>RetryManager: new_stream(request)
RetryManager->>EngineCtx: context.id(), link_child(...)
RetryManager->>Network: create stream (async)
loop Streaming
Network-->>RetryManager: next chunk | Err
alt Chunk
RetryManager-->>Client: yield chunk
else Err == STREAM_ERR_MSG
Note over RetryManager,Network: Stream ended early
RetryManager-->>Client: terminate stream
else Err == NoResponders
RetryManager->>Network: log + retry create stream
alt Recreated
Note over RetryManager: continue
else Failed
RetryManager-->>Client: propagate error/end
end
else Other Err
RetryManager-->>Client: propagate error/end
end
end
sequenceDiagram
autonumber
participant Egress as Egress AddressedRouter
participant Ctx as AsyncEngineContext (clone)
participant Ingress as Ingress PushHandler
Egress->>Ctx: check is_stopped()
alt Stopped (cancellation)
Egress-->>Ingress: None (graceful end)
Note right of Egress: debug: canceled
else Not complete_final
Egress-->>Ingress: Err(STREAM_ERR_MSG)
Note right of Egress: debug: unexpected early end
else complete_final
Egress-->>Ingress: None (normal end)
end
Ingress->>Ingress: if Err && msg == STREAM_ERR_MSG<br/>warn + set send_complete_final=false<br/>break
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Pre-merge checks✅ Passed checks (3 passed)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (11)
examples/custom_backend/cancellation/middle_server.py (1)
42-48: Propagate backend stream naturally; consider structured loggingThe simplified async-for correctly lets cancellation/errors propagate. Minor: switch prints to logging for consistency with the rest of the stack.
-import asyncio +import asyncio +import logging +logger = logging.getLogger(__name__) @@ - async for response in stream: - data = response.data() - print(f"Middle server: Forwarding response {data}") - yield data + async for response in stream: + data = response.data() + logger.info("Middle server: Forwarding response %s", data) + yield data - - print("Middle server: Backend stream ended") + logger.info("Middle server: Backend stream ended")lib/bindings/python/tests/test_cancellation/conftest.py (1)
114-116: Avoid fixed sleeps; wait on a condition to reduce flakinessSleeping 100ms can be racy under load. Prefer a short bounded poll for propagation.
- # Give time for cancellation to propagate before returning - await asyncio.sleep(0.1) + # Give time for cancellation to propagate before returning (bounded poll) + for _ in range(10): + await asyncio.sleep(0.01) + if context.is_stopped() or context.is_killed(): + breaklib/runtime/src/pipeline/network.rs (1)
41-43: Centralized message added; prefer a typed error over string matchingThe shared constant is useful, but downstream string-equality checks are brittle. Consider a structured error code (enum) surfaced via MaybeError/FinishReason to detect EOS reliably across crates.
-// Define stream error message constant -pub const STREAM_ERR_MSG: &str = "Stream ended before generation completed"; +/// Sentinel message indicating the transport observed a response stream that ended +/// before the LLM generation completed (e.g., client cancel or network cut). +/// NOTE: Prefer migrating to a typed error code in MaybeError/FinishReason to avoid +/// string comparisons across crate boundaries. +pub const STREAM_ERR_MSG: &str = "Stream ended before generation completed";tests/fault_tolerance/test_request_cancellation.py (8)
245-249: Configurable long-prompt path is useful; consider env-tunable sizeHard-coding an 8k multiplier can stress CI. Make it env-tunable to scale with resources.
-def send_request_and_cancel( +def send_request_and_cancel( request_type: str = "completion", timeout: int | float = 1, - use_long_prompt: bool = False, + use_long_prompt: bool = False, ):Additional change outside this hunk:
+LONG_PROMPT_FACTOR = int(os.getenv("DYN_LONG_PROMPT_FACTOR", "8000")) @@ - if use_long_prompt: - prompt += " Make sure it is" + " long" * 8000 + "!" + if use_long_prompt: + prompt += " Make sure it is " + ("long " * LONG_PROMPT_FACTOR) + "!"
254-256: Long prompt construction — trim leading space and avoid extra allocs (nit)Minor readability/alloc nit; the env-based suggestion above also addresses this.
- if use_long_prompt: - prompt += " Make sure it is" + " long" * 8000 + "!" + if use_long_prompt: + prompt += " Make sure it is " + ("long " * LONG_PROMPT_FACTOR) + "!"
341-346: Pattern selection per mode — good; add robustness to trailing metadataEndswith is fine; consider stripping trailing punctuation like '.' if logs add suffixes.
- cancellation_pattern = ( + cancellation_pattern = ( f"Aborted Remote Prefill Request ID: {request_id}" if assert_cancel_at_prefill else f"Aborted Request ID: {request_id}" )Optional robustness (outside hunk):
clean_line = strip_ansi_codes(line).strip().rstrip(".")
355-371: Prefill log checks — good addition; consider offsetting to reduce scan timeSince prefill logs can be large, track an offset like worker/frontend to avoid rescanning.
- prefill_worker_log_content = read_log_content(prefill_worker_process._log_path) + prefill_worker_log_content = read_log_content(prefill_worker_process._log_path) + # Optionally: maintain an offset per prefill process similar to worker/frontend.
452-454: Replace fixed sleeps with bounded polling on logsFixed sleeps cause flakes; poll for patterns with a timeout.
- # TODO: Need to wait for the next token to generate before seeing the - # cancellation on the logs. DIS-625 - time.sleep(0.5) + wait_for_log_pattern(worker._log_path, r"Aborted (Remote Prefill )?Request ID:", timeout=5)Additional helper outside hunks:
import time, re def wait_for_log_pattern(path: str, pattern: str, timeout: float = 5.0) -> None: deadline = time.time() + timeout regex = re.compile(pattern) last_len = 0 while time.time() < deadline: content = read_log_content(path) chunk = content[last_len:] if any(regex.search(strip_ansi_codes(l).strip()) for l in chunk.splitlines()): return last_len = len(content) time.sleep(0.1) pytest.fail(f"Timed out waiting for pattern: {pattern}")
507-509: Bounded polling over sleep (same rationale as above)Avoid fixed 0.5s waits; reuse wait_for_log_pattern.
- # TODO: Need to wait for the next token to generate before seeing the - # cancellation on the logs. DIS-625 - time.sleep(0.5) + wait_for_log_pattern(decode_worker._log_path, r"Aborted Request ID:", timeout=5)
547-550: Frontend readiness: replace 2s sleep with health/model readiness pollReplace static sleep with an explicit readiness check to reduce flakes.
- # TODO: Why the model is not immediately available at the frontend after - # health check returns success. - time.sleep(2) + wait_for_log_pattern(frontend._log_path, r"Router.+ready|Models endpoint ready", timeout=15)
559-563: Polling vs fixed 3s wait for prefill-cancel evidenceUse the helper to wait for the “New Prefill Request ID” and “Aborted Prefill Request ID” lines instead of sleeping.
- # TODO: Need to wait for prefill to generate first token before seeing - # the cancellation on the logs. DIS-625 - time.sleep(3) + wait_for_log_pattern(prefill_worker._log_path, r"New Prefill Request ID:", timeout=30)
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
examples/custom_backend/cancellation/middle_server.py(1 hunks)lib/bindings/python/tests/test_cancellation/conftest.py(1 hunks)lib/bindings/python/tests/test_cancellation/test_client_context_cancel.py(1 hunks)lib/bindings/python/tests/test_cancellation/test_example.py(1 hunks)lib/llm/src/migration.rs(11 hunks)lib/runtime/src/pipeline/network.rs(1 hunks)lib/runtime/src/pipeline/network/egress/addressed_router.rs(2 hunks)lib/runtime/src/pipeline/network/egress/push_router.rs(3 hunks)lib/runtime/src/pipeline/network/ingress/push_handler.rs(1 hunks)tests/fault_tolerance/test_request_cancellation.py(8 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: kthui
PR: ai-dynamo/dynamo#2500
File: lib/llm/src/migration.rs:58-77
Timestamp: 2025-08-27T17:56:14.690Z
Learning: In lib/llm/src/migration.rs, the cancellation visibility in the Migration operator is intentionally one-way - it checks engine_ctx.is_stopped()/is_killed() to stop pulling from streams but doesn't link newly created streams as child contexts to the parent. This is a conscious architectural decision with plans for future enhancement.
📚 Learning: 2025-06-13T22:32:05.022Z
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::client::RequestErrorKind::NoResponders, not async_nats::Error::NoResponders. Use err.downcast_ref::<async_nats::client::RequestError>() and then check request_err.kind() against RequestErrorKind::NoResponders.
Applied to files:
lib/runtime/src/pipeline/network/egress/push_router.rs
📚 Learning: 2025-06-13T22:32:05.022Z
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::error::RequestErrorKind::NoResponders. Use err.downcast_ref::<async_nats::error::RequestError>() and then check req_err.kind() against RequestErrorKind::NoResponders to handle this error properly.
Applied to files:
lib/runtime/src/pipeline/network/egress/push_router.rs
📚 Learning: 2025-08-27T17:56:14.690Z
Learnt from: kthui
PR: ai-dynamo/dynamo#2500
File: lib/llm/src/migration.rs:58-77
Timestamp: 2025-08-27T17:56:14.690Z
Learning: In lib/llm/src/migration.rs, the cancellation visibility in the Migration operator is intentionally one-way - it checks engine_ctx.is_stopped()/is_killed() to stop pulling from streams but doesn't link newly created streams as child contexts to the parent. This is a conscious architectural decision with plans for future enhancement.
Applied to files:
lib/runtime/src/pipeline/network/egress/addressed_router.rslib/llm/src/migration.rs
🧬 Code graph analysis (5)
lib/runtime/src/pipeline/network/egress/push_router.rs (4)
lib/llm/src/migration.rs (1)
err(138-138)lib/llm/src/protocols/common/llm_backend.rs (1)
err(160-166)lib/runtime/src/protocols/annotated.rs (1)
err(147-158)lib/runtime/src/protocols/maybe_error.rs (2)
err(23-23)err(49-51)
lib/runtime/src/pipeline/network/egress/addressed_router.rs (3)
lib/llm/src/protocols/common/llm_backend.rs (1)
from_err(156-158)lib/runtime/src/protocols/annotated.rs (1)
from_err(143-145)lib/runtime/src/protocols/maybe_error.rs (2)
from_err(20-20)from_err(44-48)
tests/fault_tolerance/test_request_cancellation.py (1)
tests/fault_tolerance/test_vllm_health_check.py (5)
download_model(126-158)DynamoFrontendProcess(20-45)DynamoWorkerProcess(48-123)get_pid(43-45)get_pid(103-105)
lib/runtime/src/pipeline/network/ingress/push_handler.rs (3)
lib/llm/src/protocols/common/llm_backend.rs (1)
err(160-166)lib/runtime/src/protocols/annotated.rs (1)
err(147-158)lib/runtime/src/protocols/maybe_error.rs (2)
err(23-23)err(49-51)
lib/llm/src/migration.rs (3)
lib/llm/src/http/client.rs (3)
context(316-318)context(352-354)with_id(84-92)lib/runtime/src/engine.rs (6)
context(175-175)context(251-253)context(268-270)context(274-276)context(449-452)context(464-466)lib/runtime/src/pipeline/context.rs (3)
context(226-228)context(310-312)with_id(69-76)
🔇 Additional comments (20)
lib/bindings/python/tests/test_cancellation/test_client_context_cancel.py (1)
44-46: Good assertion strengthening stream-end expectationAsserting exactly three items (0,1,2) tightens the contract and matches the new cancellation flow.
lib/bindings/python/tests/test_cancellation/test_example.py (1)
142-144: Assertion updated to reflect forwarding of response 2 — looks goodMatches the new middle-server behavior of forwarding through cancellation point.
tests/fault_tolerance/test_request_cancellation.py (8)
175-177: Timeout type widened — good improvementAllowing float timeouts is helpful for fine-grained cancellation timing.
210-211: Chat completion request: float timeout support — LGTMSignature change is consistent with completions and stream use.
313-314: Prefill-cancel flag addition improves specificityThe flag makes verification unambiguous for remote prefill vs decode. Nice.
353-353: Clearer failure message — LGTMBetter diagnostics on test failures.
372-385: Explicit prefill-cancel verification — LGTMDifferentiating prefill vs decode abort lines makes the test precise.
500-506: Decode-worker test messaging — LGTMClarifies which worker is expected to log cancellation.
556-556: Very short client timeout (0.1s) may be too aggressive on CIConsider 0.25–0.5s to avoid transport-level timeouts masking cancellation behavior.
- send_request_and_cancel("completion", timeout=0.1, use_long_prompt=True) + send_request_and_cancel("completion", timeout=0.25, use_long_prompt=True)Would you like me to push a commit to bump this and deflake?
563-569: End-to-end prefill cancel verification — LGTMThe assertions fully exercise the remote prefill path.
lib/llm/src/migration.rs (5)
61-61: LGTM! Context handling improved.The migration from string-based context ID to
Arc<dyn AsyncEngineContext>properly integrates with the broader cancellation support changes, enabling better request lifecycle management.
63-68: LGTM! Async closure simplifies stream creation.The conversion to an async closure eliminates complex lifetime management and improves readability while maintaining the same functionality.
111-121: Improved stream disconnection detection.The new error detection logic properly uses the centralized
STREAM_ERR_MSGconstant and correctly handles stream disconnection scenarios with appropriate logging and retry behavior.
134-135: Proper context linking for child requests.The new context propagation correctly establishes parent-child relationships between contexts, enabling proper cancellation propagation through the request hierarchy.
498-499: Test updates correctly reflect the new API.All test cases have been properly updated to use
Arc<Controller>wrapped contexts instead of string IDs, maintaining test coverage while adapting to the new context-based API.Also applies to: 537-538, 577-578, 618-619, 645-646, 698-699
lib/runtime/src/pipeline/network/egress/push_router.rs (2)
234-240: Stream error detection aligns with centralized error handling.The error detection logic correctly uses the shared
STREAM_ERR_MSGconstant with proper debug-level logging. The string comparison approach is consistent with the overall error handling strategy.
250-252: Appropriate debug logging for fault detection.Adding debug logging before reporting the instance down improves observability of fault detection behavior.
lib/runtime/src/pipeline/network/egress/addressed_router.rs (3)
95-95: Context clone enables proper cancellation handling.Creating a separate clone of the engine context allows for independent end-of-stream detection without affecting the original context's state.
225-232: Proper graceful termination for cancelled requests.The new logic correctly distinguishes between normal cancellation (
is_stopped()) and unexpected stream termination, providing appropriate handling for each case with clear debug logging.
234-236: Consistent error message for unexpected stream termination.Using the centralized
STREAM_ERR_MSGconstant ensures consistent error messaging across the codebase for stream termination scenarios.
0d21669 to
eac476d
Compare
Signed-off-by: Jacky <[email protected]>
Signed-off-by: Jacky <[email protected]>
Signed-off-by: Jacky <[email protected]>
… cancel Signed-off-by: Jacky <[email protected]>
Signed-off-by: Jacky <[email protected]>
Signed-off-by: Jacky <[email protected]>
eac476d to
b291647
Compare
lib/bindings/python/tests/test_cancellation/test_client_context_cancel.py
Show resolved
Hide resolved
whoisj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I do have a couple of questions.
|
FYI, the next PR #3102 after this is merged. |
keivenchang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this PR is already been reviewed and uploaded quite a bit and don't have anything major to add except small refactor comments.
Signed-off-by: Jacky <[email protected]>
Signed-off-by: Jacky <[email protected]>
Signed-off-by: Jacky <[email protected]>
Overview:
Support for unary request cancellation.
Details:
Where should the reviewer start?
Start with the Rust changes, and then move into vLLM test, and finally the unit test and examples.
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
N/A
Summary by CodeRabbit
Bug Fixes
Tests