Skip to content

Conversation

@kthui
Copy link
Contributor

@kthui kthui commented Sep 10, 2025

Overview:

Support for unary request cancellation.

Details:

  • Enable unary request cancellation in Rust.
  • Enable vLLM prefill request cancellation test.
  • Cover reading response after cancelling, in both unit test and examples.
  • Fix context.rs Controller async stopped() and killed() implementation.

Where should the reviewer start?

Start with the Rust changes, and then move into vLLM test, and finally the unit test and examples.

Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)

N/A

Summary by CodeRabbit

  • Bug Fixes

    • Improved reliability of request cancellation during streaming, ensuring graceful termination when canceled.
    • Standardized error messaging for unexpected stream endings.
    • Removed error swallowing in example middleware; added clearer end-of-stream logging.
  • Tests

    • Expanded coverage for cancellation, including long-prompt and remote prefill scenarios.
    • Updated expectations to reflect new logging and stream completion behavior.
    • Test helpers now support fractional timeouts for finer control.

@kthui kthui self-assigned this Sep 10, 2025
@copy-pr-bot
Copy link

copy-pr-bot bot commented Sep 10, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@github-actions github-actions bot added the feat label Sep 10, 2025
@kthui kthui force-pushed the jacky-ft-cancel-unary branch 2 times, most recently from b767b9b to 0d21669 Compare September 10, 2025 23:42
@kthui kthui marked this pull request as ready for review September 11, 2025 03:10
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 11, 2025

Walkthrough

Updates cancellation handling and logging across example middleware, Python tests, Rust LLM migration, and runtime network pipeline. Introduces a shared stream-end error constant, refines retry and context propagation via AsyncEngineContext, adjusts end-of-stream behavior on cancellation, and expands fault-tolerance tests to cover remote prefill and long-prompt scenarios.

Changes

Cohort / File(s) Summary
Example middleware cancellation
examples/custom_backend/cancellation/middle_server.py
Removed ValueError special-casing; now forwards backend stream responses directly; logs “Middle server: Backend stream ended” after stream completion.
Python bindings cancellation tests
lib/bindings/python/tests/test_cancellation/conftest.py, lib/bindings/python/tests/test_cancellation/test_client_context_cancel.py, lib/bindings/python/tests/test_cancellation/test_example.py
Adds brief delay to allow cancellation propagation; lets client stream finish naturally and asserts three responses; updates middle-server expectation to “Forwarding response 2”.
LLM migration retry/context flow
lib/llm/src/migration.rs
Switches from context_id String to Arc; links child contexts; simplifies async unfold streaming; uses network::STREAM_ERR_MSG; refines NoResponders retry handling; updates tests to pass context trait objects.
Runtime network pipeline
lib/runtime/src/pipeline/network.rs, lib/runtime/src/pipeline/network/egress/addressed_router.rs, lib/runtime/src/pipeline/network/egress/push_router.rs, lib/runtime/src/pipeline/network/ingress/push_handler.rs
Adds public STREAM_ERR_MSG; egress addressed_router: gracefully end stream on stopped context; error on unexpected early end using shared message; adds debug logs. Egress push_router and ingress push_handler: consume shared STREAM_ERR_MSG and streamline checks; add debug/warn logs and NoResponders handling.
Fault-tolerance cancellation tests
tests/fault_tolerance/test_request_cancellation.py
Supports float timeouts; optional ultra-long prompt; expanded verification for remote prefill cancellation (detect/abort patterns); adds vLLM prefill test path; refined log assertions and flow.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant RetryManager
  participant EngineCtx as AsyncEngineContext
  participant Network as Network Pipeline

  Client->>RetryManager: new_stream(request)
  RetryManager->>EngineCtx: context.id(), link_child(...)
  RetryManager->>Network: create stream (async)
  loop Streaming
    Network-->>RetryManager: next chunk | Err
    alt Chunk
      RetryManager-->>Client: yield chunk
    else Err == STREAM_ERR_MSG
      Note over RetryManager,Network: Stream ended early
      RetryManager-->>Client: terminate stream
    else Err == NoResponders
      RetryManager->>Network: log + retry create stream
      alt Recreated
        Note over RetryManager: continue
      else Failed
        RetryManager-->>Client: propagate error/end
      end
    else Other Err
      RetryManager-->>Client: propagate error/end
    end
  end
Loading
sequenceDiagram
  autonumber
  participant Egress as Egress AddressedRouter
  participant Ctx as AsyncEngineContext (clone)
  participant Ingress as Ingress PushHandler

  Egress->>Ctx: check is_stopped()
  alt Stopped (cancellation)
    Egress-->>Ingress: None (graceful end)
    Note right of Egress: debug: canceled
  else Not complete_final
    Egress-->>Ingress: Err(STREAM_ERR_MSG)
    Note right of Egress: debug: unexpected early end
  else complete_final
    Egress-->>Ingress: None (normal end)
  end

  Ingress->>Ingress: if Err && msg == STREAM_ERR_MSG<br/>warn + set send_complete_final=false<br/>break
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

I thump my paw—cancel, stop!
Streams now end with a tidy plop.
Contexts link, retries flow,
Shared messages tell us when to go.
Long prompts fade, logs align—
Bunny approves: all by design. 🐇✨

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Pre-merge checks

✅ Passed checks (3 passed)
Check name Status Explanation
Title Check ✅ Passed The title ("feat: Request Cancellation unary request support") directly describes the primary change—adding unary request cancellation support—and aligns with the PR objectives and diffs, so it communicates the main intent clearly despite slightly awkward word order.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Description Check ✅ Passed The PR description follows the repository template and includes the required sections (Overview, Details, Where should the reviewer start, and Related Issues) and clearly states the main objectives: enabling unary request cancellation in Rust, adding vLLM prefill cancellation tests, covering reading responses after cancellation, and fixing Controller stopped/killed behavior. It also gives a recommended review order. Because the change set touches multiple subsystems (Rust migration, network pipeline, Python bindings/tests, and fault-tolerance tests) the description is acceptable but could be more helpful by calling out the most important files and any public API/signature changes for reviewers.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (11)
examples/custom_backend/cancellation/middle_server.py (1)

42-48: Propagate backend stream naturally; consider structured logging

The simplified async-for correctly lets cancellation/errors propagate. Minor: switch prints to logging for consistency with the rest of the stack.

-import asyncio
+import asyncio
+import logging
+logger = logging.getLogger(__name__)
@@
-        async for response in stream:
-            data = response.data()
-            print(f"Middle server: Forwarding response {data}")
-            yield data
+        async for response in stream:
+            data = response.data()
+            logger.info("Middle server: Forwarding response %s", data)
+            yield data
-
-        print("Middle server: Backend stream ended")
+        logger.info("Middle server: Backend stream ended")
lib/bindings/python/tests/test_cancellation/conftest.py (1)

114-116: Avoid fixed sleeps; wait on a condition to reduce flakiness

Sleeping 100ms can be racy under load. Prefer a short bounded poll for propagation.

-        # Give time for cancellation to propagate before returning
-        await asyncio.sleep(0.1)
+        # Give time for cancellation to propagate before returning (bounded poll)
+        for _ in range(10):
+            await asyncio.sleep(0.01)
+            if context.is_stopped() or context.is_killed():
+                break
lib/runtime/src/pipeline/network.rs (1)

41-43: Centralized message added; prefer a typed error over string matching

The shared constant is useful, but downstream string-equality checks are brittle. Consider a structured error code (enum) surfaced via MaybeError/FinishReason to detect EOS reliably across crates.

-// Define stream error message constant
-pub const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
+/// Sentinel message indicating the transport observed a response stream that ended
+/// before the LLM generation completed (e.g., client cancel or network cut).
+/// NOTE: Prefer migrating to a typed error code in MaybeError/FinishReason to avoid
+/// string comparisons across crate boundaries.
+pub const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
tests/fault_tolerance/test_request_cancellation.py (8)

245-249: Configurable long-prompt path is useful; consider env-tunable size

Hard-coding an 8k multiplier can stress CI. Make it env-tunable to scale with resources.

-def send_request_and_cancel(
+def send_request_and_cancel(
     request_type: str = "completion",
     timeout: int | float = 1,
-    use_long_prompt: bool = False,
+    use_long_prompt: bool = False,
 ):

Additional change outside this hunk:

+LONG_PROMPT_FACTOR = int(os.getenv("DYN_LONG_PROMPT_FACTOR", "8000"))
@@
-    if use_long_prompt:
-        prompt += " Make sure it is" + " long" * 8000 + "!"
+    if use_long_prompt:
+        prompt += " Make sure it is " + ("long " * LONG_PROMPT_FACTOR) + "!"

254-256: Long prompt construction — trim leading space and avoid extra allocs (nit)

Minor readability/alloc nit; the env-based suggestion above also addresses this.

-    if use_long_prompt:
-        prompt += " Make sure it is" + " long" * 8000 + "!"
+    if use_long_prompt:
+        prompt += " Make sure it is " + ("long " * LONG_PROMPT_FACTOR) + "!"

341-346: Pattern selection per mode — good; add robustness to trailing metadata

Endswith is fine; consider stripping trailing punctuation like '.' if logs add suffixes.

-    cancellation_pattern = (
+    cancellation_pattern = (
         f"Aborted Remote Prefill Request ID: {request_id}"
         if assert_cancel_at_prefill
         else f"Aborted Request ID: {request_id}"
     )

Optional robustness (outside hunk):

clean_line = strip_ansi_codes(line).strip().rstrip(".")

355-371: Prefill log checks — good addition; consider offsetting to reduce scan time

Since prefill logs can be large, track an offset like worker/frontend to avoid rescanning.

-        prefill_worker_log_content = read_log_content(prefill_worker_process._log_path)
+        prefill_worker_log_content = read_log_content(prefill_worker_process._log_path)
+        # Optionally: maintain an offset per prefill process similar to worker/frontend.

452-454: Replace fixed sleeps with bounded polling on logs

Fixed sleeps cause flakes; poll for patterns with a timeout.

-                # TODO: Need to wait for the next token to generate before seeing the
-                #       cancellation on the logs. DIS-625
-                time.sleep(0.5)
+                wait_for_log_pattern(worker._log_path, r"Aborted (Remote Prefill )?Request ID:", timeout=5)

Additional helper outside hunks:

import time, re
def wait_for_log_pattern(path: str, pattern: str, timeout: float = 5.0) -> None:
    deadline = time.time() + timeout
    regex = re.compile(pattern)
    last_len = 0
    while time.time() < deadline:
        content = read_log_content(path)
        chunk = content[last_len:]
        if any(regex.search(strip_ansi_codes(l).strip()) for l in chunk.splitlines()):
            return
        last_len = len(content)
        time.sleep(0.1)
    pytest.fail(f"Timed out waiting for pattern: {pattern}")

507-509: Bounded polling over sleep (same rationale as above)

Avoid fixed 0.5s waits; reuse wait_for_log_pattern.

-                # TODO: Need to wait for the next token to generate before seeing the
-                #       cancellation on the logs. DIS-625
-                time.sleep(0.5)
+                wait_for_log_pattern(decode_worker._log_path, r"Aborted Request ID:", timeout=5)

547-550: Frontend readiness: replace 2s sleep with health/model readiness poll

Replace static sleep with an explicit readiness check to reduce flakes.

-                # TODO: Why the model is not immediately available at the frontend after
-                #       health check returns success.
-                time.sleep(2)
+                wait_for_log_pattern(frontend._log_path, r"Router.+ready|Models endpoint ready", timeout=15)

559-563: Polling vs fixed 3s wait for prefill-cancel evidence

Use the helper to wait for the “New Prefill Request ID” and “Aborted Prefill Request ID” lines instead of sleeping.

-                # TODO: Need to wait for prefill to generate first token before seeing
-                #       the cancellation on the logs. DIS-625
-                time.sleep(3)
+                wait_for_log_pattern(prefill_worker._log_path, r"New Prefill Request ID:", timeout=30)
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dcee4db and 0d21669.

📒 Files selected for processing (10)
  • examples/custom_backend/cancellation/middle_server.py (1 hunks)
  • lib/bindings/python/tests/test_cancellation/conftest.py (1 hunks)
  • lib/bindings/python/tests/test_cancellation/test_client_context_cancel.py (1 hunks)
  • lib/bindings/python/tests/test_cancellation/test_example.py (1 hunks)
  • lib/llm/src/migration.rs (11 hunks)
  • lib/runtime/src/pipeline/network.rs (1 hunks)
  • lib/runtime/src/pipeline/network/egress/addressed_router.rs (2 hunks)
  • lib/runtime/src/pipeline/network/egress/push_router.rs (3 hunks)
  • lib/runtime/src/pipeline/network/ingress/push_handler.rs (1 hunks)
  • tests/fault_tolerance/test_request_cancellation.py (8 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: kthui
PR: ai-dynamo/dynamo#2500
File: lib/llm/src/migration.rs:58-77
Timestamp: 2025-08-27T17:56:14.690Z
Learning: In lib/llm/src/migration.rs, the cancellation visibility in the Migration operator is intentionally one-way - it checks engine_ctx.is_stopped()/is_killed() to stop pulling from streams but doesn't link newly created streams as child contexts to the parent. This is a conscious architectural decision with plans for future enhancement.
📚 Learning: 2025-06-13T22:32:05.022Z
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::client::RequestErrorKind::NoResponders, not async_nats::Error::NoResponders. Use err.downcast_ref::<async_nats::client::RequestError>() and then check request_err.kind() against RequestErrorKind::NoResponders.

Applied to files:

  • lib/runtime/src/pipeline/network/egress/push_router.rs
📚 Learning: 2025-06-13T22:32:05.022Z
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::error::RequestErrorKind::NoResponders. Use err.downcast_ref::<async_nats::error::RequestError>() and then check req_err.kind() against RequestErrorKind::NoResponders to handle this error properly.

Applied to files:

  • lib/runtime/src/pipeline/network/egress/push_router.rs
📚 Learning: 2025-08-27T17:56:14.690Z
Learnt from: kthui
PR: ai-dynamo/dynamo#2500
File: lib/llm/src/migration.rs:58-77
Timestamp: 2025-08-27T17:56:14.690Z
Learning: In lib/llm/src/migration.rs, the cancellation visibility in the Migration operator is intentionally one-way - it checks engine_ctx.is_stopped()/is_killed() to stop pulling from streams but doesn't link newly created streams as child contexts to the parent. This is a conscious architectural decision with plans for future enhancement.

Applied to files:

  • lib/runtime/src/pipeline/network/egress/addressed_router.rs
  • lib/llm/src/migration.rs
🧬 Code graph analysis (5)
lib/runtime/src/pipeline/network/egress/push_router.rs (4)
lib/llm/src/migration.rs (1)
  • err (138-138)
lib/llm/src/protocols/common/llm_backend.rs (1)
  • err (160-166)
lib/runtime/src/protocols/annotated.rs (1)
  • err (147-158)
lib/runtime/src/protocols/maybe_error.rs (2)
  • err (23-23)
  • err (49-51)
lib/runtime/src/pipeline/network/egress/addressed_router.rs (3)
lib/llm/src/protocols/common/llm_backend.rs (1)
  • from_err (156-158)
lib/runtime/src/protocols/annotated.rs (1)
  • from_err (143-145)
lib/runtime/src/protocols/maybe_error.rs (2)
  • from_err (20-20)
  • from_err (44-48)
tests/fault_tolerance/test_request_cancellation.py (1)
tests/fault_tolerance/test_vllm_health_check.py (5)
  • download_model (126-158)
  • DynamoFrontendProcess (20-45)
  • DynamoWorkerProcess (48-123)
  • get_pid (43-45)
  • get_pid (103-105)
lib/runtime/src/pipeline/network/ingress/push_handler.rs (3)
lib/llm/src/protocols/common/llm_backend.rs (1)
  • err (160-166)
lib/runtime/src/protocols/annotated.rs (1)
  • err (147-158)
lib/runtime/src/protocols/maybe_error.rs (2)
  • err (23-23)
  • err (49-51)
lib/llm/src/migration.rs (3)
lib/llm/src/http/client.rs (3)
  • context (316-318)
  • context (352-354)
  • with_id (84-92)
lib/runtime/src/engine.rs (6)
  • context (175-175)
  • context (251-253)
  • context (268-270)
  • context (274-276)
  • context (449-452)
  • context (464-466)
lib/runtime/src/pipeline/context.rs (3)
  • context (226-228)
  • context (310-312)
  • with_id (69-76)
🔇 Additional comments (20)
lib/bindings/python/tests/test_cancellation/test_client_context_cancel.py (1)

44-46: Good assertion strengthening stream-end expectation

Asserting exactly three items (0,1,2) tightens the contract and matches the new cancellation flow.

lib/bindings/python/tests/test_cancellation/test_example.py (1)

142-144: Assertion updated to reflect forwarding of response 2 — looks good

Matches the new middle-server behavior of forwarding through cancellation point.

tests/fault_tolerance/test_request_cancellation.py (8)

175-177: Timeout type widened — good improvement

Allowing float timeouts is helpful for fine-grained cancellation timing.


210-211: Chat completion request: float timeout support — LGTM

Signature change is consistent with completions and stream use.


313-314: Prefill-cancel flag addition improves specificity

The flag makes verification unambiguous for remote prefill vs decode. Nice.


353-353: Clearer failure message — LGTM

Better diagnostics on test failures.


372-385: Explicit prefill-cancel verification — LGTM

Differentiating prefill vs decode abort lines makes the test precise.


500-506: Decode-worker test messaging — LGTM

Clarifies which worker is expected to log cancellation.


556-556: Very short client timeout (0.1s) may be too aggressive on CI

Consider 0.25–0.5s to avoid transport-level timeouts masking cancellation behavior.

-                send_request_and_cancel("completion", timeout=0.1, use_long_prompt=True)
+                send_request_and_cancel("completion", timeout=0.25, use_long_prompt=True)

Would you like me to push a commit to bump this and deflake?


563-569: End-to-end prefill cancel verification — LGTM

The assertions fully exercise the remote prefill path.

lib/llm/src/migration.rs (5)

61-61: LGTM! Context handling improved.

The migration from string-based context ID to Arc<dyn AsyncEngineContext> properly integrates with the broader cancellation support changes, enabling better request lifecycle management.


63-68: LGTM! Async closure simplifies stream creation.

The conversion to an async closure eliminates complex lifetime management and improves readability while maintaining the same functionality.


111-121: Improved stream disconnection detection.

The new error detection logic properly uses the centralized STREAM_ERR_MSG constant and correctly handles stream disconnection scenarios with appropriate logging and retry behavior.


134-135: Proper context linking for child requests.

The new context propagation correctly establishes parent-child relationships between contexts, enabling proper cancellation propagation through the request hierarchy.


498-499: Test updates correctly reflect the new API.

All test cases have been properly updated to use Arc<Controller> wrapped contexts instead of string IDs, maintaining test coverage while adapting to the new context-based API.

Also applies to: 537-538, 577-578, 618-619, 645-646, 698-699

lib/runtime/src/pipeline/network/egress/push_router.rs (2)

234-240: Stream error detection aligns with centralized error handling.

The error detection logic correctly uses the shared STREAM_ERR_MSG constant with proper debug-level logging. The string comparison approach is consistent with the overall error handling strategy.


250-252: Appropriate debug logging for fault detection.

Adding debug logging before reporting the instance down improves observability of fault detection behavior.

lib/runtime/src/pipeline/network/egress/addressed_router.rs (3)

95-95: Context clone enables proper cancellation handling.

Creating a separate clone of the engine context allows for independent end-of-stream detection without affecting the original context's state.


225-232: Proper graceful termination for cancelled requests.

The new logic correctly distinguishes between normal cancellation (is_stopped()) and unexpected stream termination, providing appropriate handling for each case with clear debug logging.


234-236: Consistent error message for unexpected stream termination.

Using the centralized STREAM_ERR_MSG constant ensures consistent error messaging across the codebase for stream termination scenarios.

@kthui kthui force-pushed the jacky-ft-cancel-unary branch from 0d21669 to eac476d Compare September 16, 2025 01:09
Copy link
Collaborator

@whoisj whoisj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I do have a couple of questions.

@kthui
Copy link
Contributor Author

kthui commented Sep 18, 2025

FYI, the next PR #3102 after this is merged.

Copy link
Contributor

@keivenchang keivenchang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this PR is already been reviewed and uploaded quite a bit and don't have anything major to add except small refactor comments.

@kthui kthui merged commit a8fd127 into main Sep 19, 2025
16 of 18 checks passed
@kthui kthui deleted the jacky-ft-cancel-unary branch September 19, 2025 05:26
hhzhang16 pushed a commit that referenced this pull request Sep 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants