- 
                Notifications
    You must be signed in to change notification settings 
- Fork 663
feat: sglang integration of dynamo event manager #1323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: sglang integration of dynamo event manager #1323
Conversation
| 👋 Hi faradawn! Thank you for contributing to ai-dynamo/dynamo. Just a reminder: The  🚀 | 
| WalkthroughThe condition verifying the  Changes
 Sequence Diagram(s)sequenceDiagram
    participant Client
    participant Frontend
    participant Router
    participant SGLangWorker
    Client->>Frontend: Send request
    Frontend->>Router: Forward request (generate)
    Router->>SGLangWorker: Route request based on cache overlap and metrics
    SGLangWorker-->>Router: Response with worker ID and prefix hit rate
    Router-->>Frontend: Yield best worker info
    Frontend-->>Client: Return response
Possibly related PRs
 Poem
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
 Other keywords and placeholders
 Documentation and Community
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (3)
container/deps/sglang/sglang_v0.4.6-dynamo-kv-disagg-patch.patch (3)
43-49: Consider using SGLang-specific environment variable names.The code uses
VLLM_*environment variables in an SGLang context, which could cause confusion for users. Consider usingSGLANG_*prefixed variables or making this configurable.- namespace = namespace or os.getenv("VLLM_KV_NAMESPACE") or "default" - component = component or os.getenv("VLLM_KV_COMPONENT") or "sglang" - worker_id = worker_id if worker_id is not None else int( - os.getenv("VLLM_WORKER_ID", "0")) - lib_path = lib_path or os.getenv("VLLM_KV_CAPI_PATH") + namespace = namespace or os.getenv("SGLANG_KV_NAMESPACE") or "default" + component = component or os.getenv("SGLANG_KV_COMPONENT") or "sglang" + worker_id = worker_id if worker_id is not None else int( + os.getenv("SGLANG_WORKER_ID", "0")) + lib_path = lib_path or os.getenv("SGLANG_KV_CAPI_PATH")
156-159: Consider logging ignored events for debugging.Silently ignoring non-KVEventBatch events could mask configuration errors or unexpected usage patterns. Consider adding debug logging.
def publish(self, events: EventBatch) -> None: # type: ignore[override] # Only handle KVEventBatch; ignore others quietly if not isinstance(events, KVEventBatch): + logger.debug("Ignoring non-KVEventBatch event: %s", type(events).__name__) return
171-175: Document the design decision for AllBlocksCleared handling.The comment explains why
AllBlocksClearedevents are ignored, but this could be a source of confusion. Consider making this behavior more explicit.elif isinstance(ev, AllBlocksCleared): - # We could broadcast each removed block, but Radix clears by - # resetting cache; upstream workers will clear too. For now - # no-op. - continue + # AllBlocksCleared events are intentionally ignored because: + # 1. RadixCache clears by resetting the entire cache + # 2. Upstream workers will clear their caches independently + # 3. Broadcasting individual block removals would be inefficient + logger.debug("Ignoring AllBlocksCleared event (intentional no-op)") + continue
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- container/deps/sglang/sglang_v0.4.6-dynamo-kv-disagg-patch.patch(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build and Test - vllm
🔇 Additional comments (1)
container/deps/sglang/sglang_v0.4.6-dynamo-kv-disagg-patch.patch (1)
182-182:❓ Verification inconclusive
Verify registration key consistency across the codebase.
Ensure that the registration key "dynamo" is consistently used and documented throughout the codebase for proper event publisher discovery.
🏁 Script executed:
#!/bin/bash # Description: Search for references to "dynamo" event publisher key to ensure consistency # Search for any configuration or documentation mentioning "dynamo" publisher rg -i "dynamo.*publisher|publisher.*dynamo" --type py # Search for event publisher factory usage to understand expected keys rg "EventPublisherFactory" --type py -A 3 -B 3Length of output: 1322
🏁 Script executed:
#!/usr/bin/env bash # Locate the patch file patch_file=$(fd sglang_v0.4.6-dynamo-kv-disagg-patch.patch -t f) echo "Found patch file: $patch_file" # Show context around the inserted publisher registration (lines 160–220) sed -n '160,220p' "$patch_file"Length of output: 1348
🏁 Script executed:
#!/usr/bin/env bash # Find definition of EventPublisherFactory rg -n "class EventPublisherFactory" --type py # Find any usage of EventPublisherFactory.get or lookup in registry rg -n "EventPublisherFactory" --type py -A3 -B3Length of output: 95
Confirm new publisher registration in sglang’s EventPublisherFactory
The patch adds the following at the end of
container/deps/sglang/sglang_v0.4.6-dynamo-kv-disagg-patch.patch:+# Register new publisher +EventPublisherFactory._registry["dynamo"] = DynamoEventPublisherPlease manually verify that:
- The
EventPublisherFactoryclass in the sglang codebase defines an internal_registryand uses it for publisher lookup.- The key
"dynamo"exactly matches the lookup name used in methods likegetorcreate.- Any related documentation (README, code comments, configs) includes and explains the
"dynamo"registration key.
| + if lib_path is None: | ||
| + raise RuntimeError("VLLM_KV_CAPI_PATH environment variable not set") | ||
| + | ||
| + self._lib = CDLL(lib_path) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add security validation for library path and improve error handling.
Loading arbitrary C libraries from environment variables poses security risks. Additionally, the CDLL loading should have proper error handling.
         if lib_path is None:
             raise RuntimeError("VLLM_KV_CAPI_PATH environment variable not set")
+        # Validate library path exists and is readable
+        if not os.path.isfile(lib_path):
+            raise RuntimeError(f"C-API library not found: {lib_path}")
+        if not os.access(lib_path, os.R_OK):
+            raise RuntimeError(f"C-API library not readable: {lib_path}")
+
-        self._lib = CDLL(lib_path)
+        try:
+            self._lib = CDLL(lib_path)
+        except OSError as e:
+            raise RuntimeError(f"Failed to load C-API library {lib_path}: {e}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| + if lib_path is None: | |
| + raise RuntimeError("VLLM_KV_CAPI_PATH environment variable not set") | |
| + | |
| + self._lib = CDLL(lib_path) | |
| if lib_path is None: | |
| raise RuntimeError("VLLM_KV_CAPI_PATH environment variable not set") | |
| # Validate library path exists and is readable | |
| if not os.path.isfile(lib_path): | |
| raise RuntimeError(f"C-API library not found: {lib_path}") | |
| if not os.access(lib_path, os.R_OK): | |
| raise RuntimeError(f"C-API library not readable: {lib_path}") | |
| try: | |
| self._lib = CDLL(lib_path) | |
| except OSError as e: | |
| raise RuntimeError(f"Failed to load C-API library {lib_path}: {e}") | 
🤖 Prompt for AI Agents
In container/deps/sglang/sglang_v0.4.6-dynamo-kv-disagg-patch.patch around lines
48 to 51, add validation to ensure the library path from the environment
variable is safe and expected before loading it with CDLL. Also, wrap the CDLL
loading in a try-except block to catch and handle any exceptions, raising a
clear error if loading fails. This prevents security risks from arbitrary paths
and improves error handling.
| + num_blocks = 1 # SGLang RadixCache emits one block at a time | ||
| + | ||
| + token_arr = (c_uint32 * len(token_ids))(*token_ids) | ||
| + num_block_tokens_arr = (c_size_t * 1)(len(token_ids)) | ||
| + block_hash_arr = (c_uint64 * 1)(block_hashes[0]) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential index error and validate assumptions.
The code assumes block_hashes has at least one element without validation, and hardcodes num_blocks = 1 based on a comment about SGLang RadixCache behavior.
     def publish_stored(
         self,
         token_ids: List[int],
         block_hashes: List[int],
         parent_hash: Optional[int],
         block_size: int,
         lora_id: Optional[int] = None,
     ) -> None:
-        num_blocks = 1  # SGLang RadixCache emits one block at a time
+        if not block_hashes:
+            raise ValueError("block_hashes cannot be empty")
+        
+        num_blocks = 1  # SGLang RadixCache emits one block at a time
+        if len(block_hashes) != 1:
+            logger.warning("Expected exactly 1 block hash, got %d. Using first hash only.", len(block_hashes))
         token_arr = (c_uint32 * len(token_ids))(*token_ids)
         num_block_tokens_arr = (c_size_t * 1)(len(token_ids))
         block_hash_arr = (c_uint64 * 1)(block_hashes[0])📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| + num_blocks = 1 # SGLang RadixCache emits one block at a time | |
| + | |
| + token_arr = (c_uint32 * len(token_ids))(*token_ids) | |
| + num_block_tokens_arr = (c_size_t * 1)(len(token_ids)) | |
| + block_hash_arr = (c_uint64 * 1)(block_hashes[0]) | |
| def publish_stored( | |
| self, | |
| token_ids: List[int], | |
| block_hashes: List[int], | |
| parent_hash: Optional[int], | |
| block_size: int, | |
| lora_id: Optional[int] = None, | |
| ) -> None: | |
| if not block_hashes: | |
| raise ValueError("block_hashes cannot be empty") | |
| num_blocks = 1 # SGLang RadixCache emits one block at a time | |
| if len(block_hashes) != 1: | |
| logger.warning( | |
| "Expected exactly 1 block hash, got %d. Using first hash only.", | |
| len(block_hashes), | |
| ) | |
| token_arr = (c_uint32 * len(token_ids))(*token_ids) | |
| num_block_tokens_arr = (c_size_t * 1)(len(token_ids)) | |
| block_hash_arr = (c_uint64 * 1)(block_hashes[0]) | 
🤖 Prompt for AI Agents
In container/deps/sglang/sglang_v0.4.6-dynamo-kv-disagg-patch.patch around lines
107 to 111, the code assumes block_hashes has at least one element and hardcodes
num_blocks to 1 without validation. Add a check to ensure block_hashes is not
empty before accessing block_hashes[0], and validate or dynamically determine
num_blocks instead of hardcoding it to 1 to prevent potential index errors and
incorrect assumptions.
| + # block_size (tokens per block) must match allocator; default 16. | ||
| + from .dynamo_event_manager import KVCacheEventManager | ||
| + | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Move import to module level to avoid potential issues.
Importing modules inside __init__ methods is generally not recommended as it can cause circular import issues and makes dependencies less clear.
+from .dynamo_event_manager import KVCacheEventManager
+
 # Add DynamoEventPublisher
 class DynamoEventPublisher(EventPublisher):
     """Publisher that forwards events to Dynamo via the C-API bindings."""
     def __init__(self, kv_block_size: int = 16):
         # block_size (tokens per block) must match allocator; default 16.
-        from .dynamo_event_manager import KVCacheEventManager
         self._mgr = KVCacheEventManager(kv_block_size=kv_block_size)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| + # block_size (tokens per block) must match allocator; default 16. | |
| + from .dynamo_event_manager import KVCacheEventManager | |
| + | |
| from .dynamo_event_manager import KVCacheEventManager | |
| # Add DynamoEventPublisher | |
| class DynamoEventPublisher(EventPublisher): | |
| """Publisher that forwards events to Dynamo via the C-API bindings.""" | |
| def __init__(self, kv_block_size: int = 16): | |
| # block_size (tokens per block) must match allocator; default 16. | |
| self._mgr = KVCacheEventManager(kv_block_size=kv_block_size) | 
🤖 Prompt for AI Agents
In container/deps/sglang/sglang_v0.4.6-dynamo-kv-disagg-patch.patch around lines
152 to 154, the import of KVCacheEventManager is done inside a method or
function. Move this import statement to the top of the module file, outside of
any functions or classes, to avoid potential circular import issues and improve
clarity of dependencies.
| + for ev in events.events: | ||
| + if isinstance(ev, BlockStored): | ||
| + self._mgr.publish_stored( | ||
| + token_ids=ev.token_ids, | ||
| + block_hashes=ev.block_hashes, | ||
| + parent_hash=ev.parent_block_hash, | ||
| + block_size=ev.block_size, | ||
| + lora_id=ev.lora_id, | ||
| + ) | ||
| + elif isinstance(ev, BlockRemoved): | ||
| + self._mgr.publish_removed(ev.block_hashes) | ||
| + elif isinstance(ev, AllBlocksCleared): | ||
| + # We could broadcast each removed block, but Radix clears by | ||
| + # resetting cache; upstream workers will clear too. For now | ||
| + # no-op. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add validation for event data integrity.
The event handling forwards data to the C-API without validation. Consider adding basic checks to prevent potential crashes or invalid data.
         for ev in events.events:
             if isinstance(ev, BlockStored):
+                if not ev.token_ids:
+                    logger.warning("Skipping BlockStored event with empty token_ids")
+                    continue
+                if not ev.block_hashes:
+                    logger.warning("Skipping BlockStored event with empty block_hashes")
+                    continue
                 self._mgr.publish_stored(
                     token_ids=ev.token_ids,
                     block_hashes=ev.block_hashes,
                     parent_hash=ev.parent_block_hash,
                     block_size=ev.block_size,
                     lora_id=ev.lora_id,
                 )
             elif isinstance(ev, BlockRemoved):
+                if not ev.block_hashes:
+                    logger.warning("Skipping BlockRemoved event with empty block_hashes")
+                    continue
                 self._mgr.publish_removed(ev.block_hashes)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| + for ev in events.events: | |
| + if isinstance(ev, BlockStored): | |
| + self._mgr.publish_stored( | |
| + token_ids=ev.token_ids, | |
| + block_hashes=ev.block_hashes, | |
| + parent_hash=ev.parent_block_hash, | |
| + block_size=ev.block_size, | |
| + lora_id=ev.lora_id, | |
| + ) | |
| + elif isinstance(ev, BlockRemoved): | |
| + self._mgr.publish_removed(ev.block_hashes) | |
| + elif isinstance(ev, AllBlocksCleared): | |
| + # We could broadcast each removed block, but Radix clears by | |
| + # resetting cache; upstream workers will clear too. For now | |
| + # no-op. | |
| for ev in events.events: | |
| if isinstance(ev, BlockStored): | |
| if not ev.token_ids: | |
| logger.warning("Skipping BlockStored event with empty token_ids") | |
| continue | |
| if not ev.block_hashes: | |
| logger.warning("Skipping BlockStored event with empty block_hashes") | |
| continue | |
| self._mgr.publish_stored( | |
| token_ids=ev.token_ids, | |
| block_hashes=ev.block_hashes, | |
| parent_hash=ev.parent_block_hash, | |
| block_size=ev.block_size, | |
| lora_id=ev.lora_id, | |
| ) | |
| elif isinstance(ev, BlockRemoved): | |
| if not ev.block_hashes: | |
| logger.warning("Skipping BlockRemoved event with empty block_hashes") | |
| continue | |
| self._mgr.publish_removed(ev.block_hashes) | |
| elif isinstance(ev, AllBlocksCleared): | |
| # We could broadcast each removed block, but Radix clears by | |
| # resetting cache; upstream workers will clear too. For now | |
| # no-op. | 
🤖 Prompt for AI Agents
In container/deps/sglang/sglang_v0.4.6-dynamo-kv-disagg-patch.patch around lines
161 to 175, the event handling code forwards event data directly to the C-API
without validating the integrity of the data. To fix this, add basic validation
checks for required fields in each event type before calling the publish
methods. For example, verify that token_ids and block_hashes are not empty or
None, and that block_size is a positive integer. If validation fails, handle the
error gracefully, such as logging a warning and skipping the publish call to
prevent crashes or invalid data propagation.
| Hi @faradawn - thanks for this PR. Have you taken a look at the following? 
 I see what you're trying to do but I don't think we need to maintain a patch given those 2 PRs above. Given this info however - the current kv event emitting logic in SGL has a bug 
 Let me know if you have any thoughts here. It's on my TODO to figure out a solution for this. Happy to discuss and see if we can parallelize work here! | 
| Hi @ishandhanani , here are two ideas: Idea 1. Fix SGlang event publisher. Just before an event is pushed onto kv_event_queue, slice the node's value into fixed-size chunks of page_size and emit 1 BlockStored event per chunk. Idea 2: Fix Dynamo listener. Break a large block into chucks of page_size and simulate arrival of each page. You have more experience and I'd love to discuss and execute any plan. | 
| We don't want to continue using the C path, we should add a note that its deprecated for now. We have a python KVEventPublisher that is easier to use, we should use that. Also we want to avoid patches and instead commit mainline. That is what @ishandhanani is pointing out that there is work already committed so that we can pull events without needing patches. | 
| 
 I think option 1 is better. I think we want events emitted from SGLang to have a consistent block size. This allow us to maintain approximately the same tree just with our prefix tree having fixed block sizes. Do you want to take a stab at the PR to SGLang for this @faradawn? | 
| @ishandhanani sounds good! Is the ZMQ for kv event only used by Dynamo? I can take a look and create a PR to SGLang. | 
| 
 Great! I pinged you in the discord in case we wanted to discuss a bit more synchronously. At the moment yes - it is only used by Dynamo. However - any other framework that wants to build their own router based on ZMQ events could in theory use it. | 
| PR created for SGLang: sgl-project/sglang#6824. WIll keep you updated! | 
| If SGLang would accept it we could embed our own Publisher directly. Then we can skip the whole zmq stuff and publish directly from each process to NATS. For data parallel we have made a few small changes to vLLM that we should try to mirror in SGLang | 
| 
 Nice - let me take a look | 
…n/dynamo into sglang-integration-event-manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (2)
examples/sglang/components/frontend.py (1)
20-29:⚠️ Potential issueRemove now-unused
SGLangWorkerimport to unblock CI
SGLangWorkeris no longer referenced after switching the dependency toRouter, and the Ruff step already failed once for the same issue. Deleting the import keeps the import section clean and prevents the next lint/format cycle from rewriting this file again.-from components.worker import SGLangWorker -from fastapi import FastAPI +from fastapi import FastAPI🧰 Tools
🪛 Ruff (0.11.9)
20-20:
components.worker.SGLangWorkerimported but unusedRemove unused import:
components.worker.SGLangWorker(F401)
🪛 Pylint (3.3.7)
[refactor] 24-24: Use 'from dynamo import sdk' instead
(R0402)
examples/sglang/components/worker.py (1)
71-99:⚠️ Potential issueKV-publisher is initialised against the decode worker, not this worker
comp_ns, comp_nameis reassigned toSGLangDecodeWorker.dynamo_address()(lines 81-83).
Lines 94-97 then createcomponent_ref = runtime.namespace(comp_ns).component(comp_name), which therefore points to the decode worker, while the KV-events you want originate from the prefill/aggregate worker (this component).Create a separate pair of variables or call
self.__class__.dynamo_address()again when wiring up the publisher:- comp_ns, comp_name = SGLangDecodeWorker.dynamo_address() + comp_ns_dec, comp_name_dec = SGLangDecodeWorker.dynamo_address() self.decode_client = ( - await runtime.namespace(comp_ns) - .component(comp_name) + await runtime.namespace(comp_ns_dec) + .component(comp_name_dec) .endpoint("generate") .client() ) ... - component_ref = runtime.namespace(comp_ns).component(comp_name) + comp_ns_self, comp_name_self = self.__class__.dynamo_address() + component_ref = runtime.namespace(comp_ns_self).component(comp_name_self)🧰 Tools
🪛 Pylint (3.3.7)
[error] 71-71: Class 'SGLangWorker' has no 'dynamo_address' member
(E1101)
🧹 Nitpick comments (4)
examples/sglang/utils/protocol.py (2)
22-24: Narrow the type ofsampling_params
sampling_paramsis currently typed as a plaindict, which loses all static-type information and IDE assistance. If the structure is not fixed yet, at least annotate it asdict[str, Any](or introduce a dedicatedSamplingParamsmodel) so downstream code can rely on proper typing.-from pydantic import BaseModel, Field +from typing import Any +from pydantic import BaseModel, Field ... - sampling_params: dict + sampling_params: dict[str, Any]🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 22-22: Too few public methods (0/2)
(R0903)
27-28: PreferTokenIdTypefor consistency
tokens: list[int]diverges from the alias declared above (TokenIdType = int). Using the alias keeps future refactors (e.g., moving tonumpy.uint32) mechanical:-class Tokens(BaseModel): - tokens: list[int] +class Tokens(BaseModel): + tokens: list[TokenIdType]🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 27-27: Too few public methods (0/2)
(R0903)
examples/sglang/components/kv_router.py (2)
168-226:_cost_functionis doing too much – extract helpers18 local variables and mixed concerns (score conversion, normalisation, logging, final selection) make this block hard to reason about and unit-test. Consider:
_convert_block_scores()– blocks ➜ token hit-rate
_merge_metrics()– buildworker_metrics&max_waiting
_compute_logits()– linear combination & tie-breakingThis keeps each piece ≤ 30 LOC and removes the Pylint R0914 hit.
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 168-168: Too many local variables (18/15)
(R0914)
228-235: Empty logits fall back to""– propagate a valid worker insteadReturning an empty string forces the caller to add fallback logic. A safer default is a random available worker (keeps throughput) or raising so the caller can retry.
- return "", 0.0 + return random.choice(worker_ids), 0.0
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
- examples/sglang/components/frontend.py(2 hunks)
- examples/sglang/components/kv_router.py(1 hunks)
- examples/sglang/components/worker.py(3 hunks)
- examples/sglang/utils/protocol.py(1 hunks)
- examples/sglang/utils/sglang.py(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
examples/sglang/components/kv_router.py (6)
examples/sglang/components/worker.py (3)
SGLangWorker(56-216)
async_init(68-119)
generate(154-195)examples/sglang/utils/protocol.py (1)
Tokens(27-28)deploy/sdk/src/dynamo/sdk/lib/decorators.py (1)
async_on_start(95-99)deploy/sdk/src/dynamo/sdk/core/lib.py (2)
depends(121-146)
service(88-118)deploy/sdk/src/dynamo/sdk/lib/config.py (1)
as_args(81-132)lib/bindings/python/rust/llm/kv.rs (1)
get_metrics(405-428)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1323/merge) by faradawn.
examples/sglang/components/frontend.py
[error] 1-1: isort formatting failed and modified this file to fix import order.
[error] 17-25: ruff linting failed and modified this file to fix an unused import error.
examples/sglang/components/worker.py
[error] 1-1: isort formatting failed and modified this file to fix import order.
[error] 1-1: black formatting failed and reformatted this file to fix code style issues.
examples/sglang/utils/protocol.py
[error] 1-1: black formatting failed and reformatted this file to fix code style issues.
examples/sglang/utils/sglang.py
[error] 1-1: black formatting failed and reformatted this file to fix code style issues.
examples/sglang/components/kv_router.py
[error] 1-1: isort formatting failed and modified this file to fix import order.
[error] 1-1: black formatting failed and reformatted this file to fix code style issues.
🪛 Pylint (3.3.7)
examples/sglang/utils/protocol.py
[refactor] 22-22: Too few public methods (0/2)
(R0903)
[refactor] 27-27: Too few public methods (0/2)
(R0903)
[refactor] 31-31: Too few public methods (0/2)
(R0903)
[refactor] 35-35: Too few public methods (0/2)
(R0903)
[refactor] 39-39: Too few public methods (0/2)
(R0903)
examples/sglang/utils/sglang.py
[error] 20-20: No name 'srt' in module 'sglang'
(E0611)
examples/sglang/components/kv_router.py
[refactor] 55-55: Too few public methods (0/2)
(R0903)
[refactor] 168-168: Too many local variables (18/15)
(R0914)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: Build and Test - vllm
🔇 Additional comments (1)
examples/sglang/utils/sglang.py (1)
20-21: Verify import pathsglang.srt.server_argsStatic analysis flags
No name 'srt' in module 'sglang'. Ifsrtis an internal sub-package introduced downstream this is fine, but it is easy to mis-spell (srtvssrv/rst). Please re-check the actual package path in the SGLang wheel used in production.🧰 Tools
🪛 Pylint (3.3.7)
[error] 20-20: No name 'srt' in module 'sglang'
(E0611)
        
          
                examples/sglang/components/worker.py
              
                Outdated
          
        
      | # Derive KV block size from engine args, falling back to 16 | ||
| kv_block_size = getattr( | ||
| self.engine_args, "page_size", getattr(self.engine_args, "block_size", 16) | ||
| ) | ||
|  | ||
| zmq_config = ZmqKvEventPublisherConfig( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
page_size/block_size may arrive as string
argparse leaves CLI values as str when they come from the service config.
A manual int() cast guards against accidental string propagation to Rust bindings:
-            kv_block_size = getattr(
-                self.engine_args, "page_size", getattr(self.engine_args, "block_size", 16)
-            )
+            kv_block_size = int(
+                getattr(
+                    self.engine_args,
+                    "page_size",
+                    getattr(self.engine_args, "block_size", 16),
+                )
+            )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Derive KV block size from engine args, falling back to 16 | |
| kv_block_size = getattr( | |
| self.engine_args, "page_size", getattr(self.engine_args, "block_size", 16) | |
| ) | |
| zmq_config = ZmqKvEventPublisherConfig( | |
| # Derive KV block size from engine args, falling back to 16 | |
| kv_block_size = int( | |
| getattr( | |
| self.engine_args, | |
| "page_size", | |
| getattr(self.engine_args, "block_size", 16), | |
| ) | |
| ) | |
| zmq_config = ZmqKvEventPublisherConfig( | 
🤖 Prompt for AI Agents
In examples/sglang/components/worker.py around lines 98 to 103, the
kv_block_size is derived from engine_args.page_size or engine_args.block_size,
but these values may be strings due to argparse behavior. To fix this,
explicitly cast the retrieved value to int to ensure kv_block_size is always an
integer before passing it to Rust bindings.
| "--custom-router", | ||
| type=bool, | ||
| default=False, | ||
| help="Whether to use custom router or not", | ||
| ) | ||
| parser.add_argument( | ||
| "--router", | ||
| type=str, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boolean CLI flag should use action="store_true"
type=bool expects the user to pass an explicit value (--custom-router true), which is error-prone and breaks argparse’s “flag” semantics.  Replace with action="store_true" (and drop the default):
-    parser.add_argument(
-        "--custom-router",
-        type=bool,
-        default=False,
-        help="Whether to use custom router or not",
-    )
+    parser.add_argument(
+        "--custom-router",
+        action="store_true",
+        help="Enable custom router",
+    )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "--custom-router", | |
| type=bool, | |
| default=False, | |
| help="Whether to use custom router or not", | |
| ) | |
| parser.add_argument( | |
| "--router", | |
| type=str, | |
| parser.add_argument( | |
| "--custom-router", | |
| action="store_true", | |
| help="Enable custom router", | |
| ) | |
| parser.add_argument( | |
| "--router", | |
| type=str, | 
🤖 Prompt for AI Agents
In examples/sglang/components/kv_router.py around lines 90 to 97, the boolean
CLI flag for --custom-router incorrectly uses type=bool, requiring an explicit
value and breaking flag semantics. Change the argument to use
action="store_true" instead and remove the default value to make it a proper
flag that sets the value to True when present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
examples/sglang/components/kv_router.py (1)
92-96: Boolean flag should useaction="store_true"
Same issue flagged previously; usingtype=boolforces the caller to type--custom-router true. Switch to the canonical argparse style:- parser.add_argument( - "--custom-router", - type=bool, - default=False, - help="Whether to use custom router or not", - ) + parser.add_argument( + "--custom-router", + action="store_true", + help="Enable custom router", + )
🧹 Nitpick comments (4)
examples/sglang/utils/sglang.py (1)
16-25: Minor: initialise logging once
logging.getLogger(__name__)is fine, but withoutbasicConfignothing is emitted when this module is imported directly (common in unit tests). Consider adding a one-liner:logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s")or require the embedding application to configure logging explicitly.
🧰 Tools
🪛 Pylint (3.3.7)
[error] 20-20: No name 'srt' in module 'sglang'
(E0611)
examples/sglang/utils/protocol.py (1)
22-42:Tokensmodel name is plural but field is singularFor consistency with other request/response models consider renaming either the class or the field:
-class Tokens(BaseModel): - tokens: list[int] +class Tokens(BaseModel): + values: list[int](or rename class to
TokenList). This avoids.tokens.tokens-style attribute chains in downstream code.🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 22-22: Too few public methods (0/2)
(R0903)
[refactor] 27-27: Too few public methods (0/2)
(R0903)
[refactor] 31-31: Too few public methods (0/2)
(R0903)
[refactor] 35-35: Too few public methods (0/2)
(R0903)
[refactor] 39-39: Too few public methods (0/2)
(R0903)
examples/sglang/components/kv_router.py (2)
35-35: Left-over debug comment
# faradawn helloappears to be an accidental artefact – remove to keep the codebase clean.
171-232: Method exceeds reasonable complexity (18 local vars)Splitting
_cost_functioninto helpers (_translate_scores,_normalise_metrics,_combine_logits) will improve readability and ease unit-testing.🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 171-171: Too many local variables (18/15)
(R0914)
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
- container/Dockerfile.sglang(1 hunks)
- examples/sglang/README.md(1 hunks)
- examples/sglang/components/frontend.py(3 hunks)
- examples/sglang/components/kv_router.py(1 hunks)
- examples/sglang/components/worker.py(3 hunks)
- examples/sglang/configs/agg_router.yaml(1 hunks)
- examples/sglang/graphs/agg_router.py(1 hunks)
- examples/sglang/utils/protocol.py(1 hunks)
- examples/sglang/utils/sglang.py(2 hunks)
- lib/bindings/python/README.md(1 hunks)
✅ Files skipped from review due to trivial changes (5)
- examples/sglang/README.md
- lib/bindings/python/README.md
- examples/sglang/graphs/agg_router.py
- container/Dockerfile.sglang
- examples/sglang/configs/agg_router.yaml
🚧 Files skipped from review as they are similar to previous changes (2)
- examples/sglang/components/frontend.py
- examples/sglang/components/worker.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
examples/sglang/utils/sglang.py (1)
deploy/sdk/src/dynamo/sdk/cli/utils.py (1)
reserve_free_port(63-102)
🪛 Pylint (3.3.7)
examples/sglang/components/kv_router.py
[refactor] 57-57: Too few public methods (0/2)
(R0903)
[refactor] 171-171: Too many local variables (18/15)
(R0914)
examples/sglang/utils/protocol.py
[refactor] 22-22: Too few public methods (0/2)
(R0903)
[refactor] 27-27: Too few public methods (0/2)
(R0903)
[refactor] 31-31: Too few public methods (0/2)
(R0903)
[refactor] 35-35: Too few public methods (0/2)
(R0903)
[refactor] 39-39: Too few public methods (0/2)
(R0903)
examples/sglang/utils/sglang.py
[error] 20-20: No name 'srt' in module 'sglang'
(E0611)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1323/merge) by faradawn.
examples/sglang/components/kv_router.py
[error] 1-1: isort failed and modified this file to fix import sorting issues.
[error] 1-1: black formatting check failed and reformatted this file.
examples/sglang/utils/protocol.py
[error] 1-1: black formatting check failed and reformatted this file.
examples/sglang/utils/sglang.py
[error] 1-1: black formatting check failed and reformatted this file.
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Build and Test - vllm
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (.)
🔇 Additional comments (2)
examples/sglang/utils/protocol.py (1)
1-84: Black re-formatted this file – please commit the resultPre-merge pipeline indicates Black auto-formatted the file. Commit those changes (or run
make fmt) to unblock CI.🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 22-22: Too few public methods (0/2)
(R0903)
[refactor] 27-27: Too few public methods (0/2)
(R0903)
[refactor] 31-31: Too few public methods (0/2)
(R0903)
[refactor] 35-35: Too few public methods (0/2)
(R0903)
[refactor] 39-39: Too few public methods (0/2)
(R0903)
[refactor] 45-45: Too few public methods (0/2)
(R0903)
[refactor] 53-53: Too few public methods (0/2)
(R0903)
[refactor] 68-68: Too few public methods (0/2)
(R0903)
[refactor] 77-77: Too few public methods (0/2)
(R0903)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1323/merge) by faradawn.
[error] 1-1: black formatting check failed and reformatted this file.
examples/sglang/components/kv_router.py (1)
1-314: Formatting & import-order failures block CICI ran
isortandblackand rewrote this file. Please commit the formatted version (or run:isort examples/sglang/components/kv_router.py black examples/sglang/components/kv_router.py).
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 57-57: Too few public methods (0/2)
(R0903)
[refactor] 171-171: Too many local variables (18/15)
(R0914)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1323/merge) by faradawn.
[error] 1-1: isort failed and modified this file to fix import sorting issues.
[error] 1-1: black formatting check failed and reformatted this file.
| import logging | ||
| from argparse import Namespace | ||
|  | ||
| from sglang.srt.server_args import ServerArgs | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broken import – sglang.srt.server_args does not exist
Static-analysis already flags this; in the published sglang wheel the object lives at sglang.server_args.  Loading this file will therefore crash on worker start-up.
-from sglang.srt.server_args import ServerArgs
+from sglang.server_args import ServerArgs📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from sglang.srt.server_args import ServerArgs | |
| -from sglang.srt.server_args import ServerArgs | |
| +from sglang.server_args import ServerArgs | 
🧰 Tools
🪛 Pylint (3.3.7)
[error] 20-20: No name 'srt' in module 'sglang'
(E0611)
🤖 Prompt for AI Agents
In examples/sglang/utils/sglang.py at line 20, the import statement incorrectly
references sglang.srt.server_args, which does not exist. Update the import to
use sglang.server_args instead to match the actual module location and prevent
import errors on worker start-up.
| from components.worker import SGLangWorker | ||
| from utils.protocol import Tokens | ||
|  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import path may break outside examples package
from components.worker import SGLangWorker relies on the current working directory being examples/sglang.  Use an absolute import rooted at the project package (e.g. examples.sglang.components.worker) or add .__init__.py files to turn folders into packages.
🤖 Prompt for AI Agents
In examples/sglang/components/kv_router.py around lines 45 to 47, the import
statement uses a relative path that depends on the current working directory,
which can break outside the examples package. Change the import to use an
absolute import rooted at the project package, such as importing SGLangWorker
from examples.sglang.components.worker. Alternatively, ensure all relevant
directories have __init__.py files to make them proper packages and support
relative imports.
| def _cost_function( | ||
| self, | ||
| scores: OverlapScores | None, | ||
| metrics: AggregatedMetrics | None, | ||
| token_length: int, | ||
| ) -> Tuple[WorkerId, float]: | ||
| """Compute the best worker given scores/metrics. | ||
| Returns a tuple of ``(worker_id, estimated_prefix_hit_rate)``. | ||
| """ | ||
| logger.info("=== Cost function called") | ||
|  | ||
| worker_scores: dict[WorkerId, float] = {} | ||
| if scores: | ||
| for worker_id, score in scores.scores.items(): | ||
| # score is in *blocks*; convert to *tokens* hit-rate | ||
| worker_scores[worker_id] = ( | ||
| score * self.indexer.block_size() / token_length | ||
| ) | ||
| else: | ||
| logger.warning("Cannot get KV scores") | ||
|  | ||
| worker_metrics: dict[WorkerId, dict[str, float]] = {} | ||
| max_waiting = 0.0 | ||
| if metrics: | ||
| for endpoint in metrics.endpoints: | ||
| worker_id = endpoint.worker_id | ||
| worker_metrics[worker_id] = { | ||
| key: getattr(endpoint, key, self.default_metrics[key]) | ||
| for key in self.default_metrics.keys() | ||
| } | ||
| max_waiting = max( | ||
| max_waiting, worker_metrics[worker_id]["num_requests_waiting"] | ||
| ) | ||
| else: | ||
| logger.warning("Cannot get metrics") | ||
|  | ||
| # Consider *all* workers, even if metrics/scores missing | ||
| worker_ids = self.workers_client.instance_ids() | ||
|  | ||
| worker_logits: dict[WorkerId, float] = {} | ||
| for worker_id in worker_ids: | ||
| score = worker_scores.get(worker_id, 0.0) | ||
| metrics_dict = worker_metrics.get(worker_id, self.default_metrics) | ||
| gpu_cache_usage = metrics_dict["gpu_cache_usage_perc"] | ||
| normalized_waiting = ( | ||
| metrics_dict["num_requests_waiting"] / max_waiting | ||
| if max_waiting > 0 else 0.0 | ||
| ) | ||
|  | ||
| # Simple linear combination (higher is better) | ||
| worker_logits[worker_id] = 2 * score - gpu_cache_usage - normalized_waiting | ||
| logger.info( | ||
| "Formula for %s: %.3f = 2.0 * %.3f - %.3f - %.3f", | ||
| worker_id, | ||
| worker_logits[worker_id], | ||
| score, | ||
| gpu_cache_usage, | ||
| normalized_waiting, | ||
| ) | ||
|  | ||
| if not worker_logits or not any(worker_logits.values()): | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Cost function mixes incomparable scales – results may be biased
gpu_cache_usage_perc appears to be a value in [0,100] while score is a fraction 0-1 (after block → token normalisation).  Subtracting the raw percentage heavily outweighs the 2×cache-hit bonus, effectively turning the formula into “pick whoever uses 0 % GPU cache”.
Either:
- Convert gpu_cache_usage_percto[0,1]by dividing by 100, or
- Rescale all terms/weights based on empirical ranges (e.g. weight hit-rate by 100 as well).
Example fix:
-            gpu_cache_usage = metrics_dict["gpu_cache_usage_perc"]
+            gpu_cache_usage = metrics_dict["gpu_cache_usage_perc"] / 100.0and revisit the coefficients.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _cost_function( | |
| self, | |
| scores: OverlapScores | None, | |
| metrics: AggregatedMetrics | None, | |
| token_length: int, | |
| ) -> Tuple[WorkerId, float]: | |
| """Compute the best worker given scores/metrics. | |
| Returns a tuple of ``(worker_id, estimated_prefix_hit_rate)``. | |
| """ | |
| logger.info("=== Cost function called") | |
| worker_scores: dict[WorkerId, float] = {} | |
| if scores: | |
| for worker_id, score in scores.scores.items(): | |
| # score is in *blocks*; convert to *tokens* hit-rate | |
| worker_scores[worker_id] = ( | |
| score * self.indexer.block_size() / token_length | |
| ) | |
| else: | |
| logger.warning("Cannot get KV scores") | |
| worker_metrics: dict[WorkerId, dict[str, float]] = {} | |
| max_waiting = 0.0 | |
| if metrics: | |
| for endpoint in metrics.endpoints: | |
| worker_id = endpoint.worker_id | |
| worker_metrics[worker_id] = { | |
| key: getattr(endpoint, key, self.default_metrics[key]) | |
| for key in self.default_metrics.keys() | |
| } | |
| max_waiting = max( | |
| max_waiting, worker_metrics[worker_id]["num_requests_waiting"] | |
| ) | |
| else: | |
| logger.warning("Cannot get metrics") | |
| # Consider *all* workers, even if metrics/scores missing | |
| worker_ids = self.workers_client.instance_ids() | |
| worker_logits: dict[WorkerId, float] = {} | |
| for worker_id in worker_ids: | |
| score = worker_scores.get(worker_id, 0.0) | |
| metrics_dict = worker_metrics.get(worker_id, self.default_metrics) | |
| gpu_cache_usage = metrics_dict["gpu_cache_usage_perc"] | |
| normalized_waiting = ( | |
| metrics_dict["num_requests_waiting"] / max_waiting | |
| if max_waiting > 0 else 0.0 | |
| ) | |
| # Simple linear combination (higher is better) | |
| worker_logits[worker_id] = 2 * score - gpu_cache_usage - normalized_waiting | |
| logger.info( | |
| "Formula for %s: %.3f = 2.0 * %.3f - %.3f - %.3f", | |
| worker_id, | |
| worker_logits[worker_id], | |
| score, | |
| gpu_cache_usage, | |
| normalized_waiting, | |
| ) | |
| if not worker_logits or not any(worker_logits.values()): | |
| for worker_id in worker_ids: | |
| score = worker_scores.get(worker_id, 0.0) | |
| metrics_dict = worker_metrics.get(worker_id, self.default_metrics) | |
| - gpu_cache_usage = metrics_dict["gpu_cache_usage_perc"] | |
| + gpu_cache_usage = metrics_dict["gpu_cache_usage_perc"] / 100.0 | |
| normalized_waiting = ( | |
| metrics_dict["num_requests_waiting"] / max_waiting | |
| if max_waiting > 0 else 0.0 | |
| ) | |
| # Simple linear combination (higher is better) | |
| worker_logits[worker_id] = 2 * score - gpu_cache_usage - normalized_waiting | |
| logger.info( | |
| "Formula for %s: %.3f = 2.0 * %.3f - %.3f - %.3f", | |
| worker_id, | |
| worker_logits[worker_id], | |
| score, | |
| gpu_cache_usage, | |
| normalized_waiting, | |
| ) | 
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 171-171: Too many local variables (18/15)
(R0914)
🤖 Prompt for AI Agents
In examples/sglang/components/kv_router.py between lines 171 and 232, the cost
function subtracts gpu_cache_usage_perc directly, which is on a 0-100 scale,
from score values on a 0-1 scale, causing bias. Fix this by normalizing
gpu_cache_usage_perc to a 0-1 scale by dividing it by 100 before using it in the
formula. After this, revisit and adjust the coefficients in the linear
combination to ensure balanced weighting of all terms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (5)
examples/sglang/components/worker.py (1)
99-101:page_size/block_sizemay arrive as string
argparseleaves CLI values asstrwhen they come from the service config.
A manualint()cast guards against accidental string propagation to Rust bindings:- kv_block_size = getattr( - self.engine_args, "page_size", getattr(self.engine_args, "block_size", 64) - ) + kv_block_size = int( + getattr( + self.engine_args, + "page_size", + getattr(self.engine_args, "block_size", 64), + ) + )examples/sglang/components/kv_router.py (4)
37-56: Fix import formatting to resolve pipeline failure.The pipeline indicates that import sorting failed. Please run
isortto fix the import order:isort examples/sglang/components/kv_router.py black examples/sglang/components/kv_router.py
45-47: Import path may break outsideexamplespackage
from components.worker import SGLangWorkerrelies on the current working directory beingexamples/sglang. Use an absolute import rooted at the project package (e.g.examples.sglang.components.worker) or add__init__.pyfiles to turn folders into packages.
92-96: Boolean CLI flag should useaction="store_true"
type=boolexpects the user to pass an explicit value (--custom-router true), which is error-prone and breaksargparse's "flag" semantics. Replace withaction="store_true":- parser.add_argument( - "--custom-router", - type=bool, - default=False, - help="Whether to use custom router or not", - ) + parser.add_argument( + "--custom-router", + action="store_true", + help="Enable custom router", + )
169-230: Fix code formatting and consider refactoring complex method.The pipeline indicates that Black formatting failed. Please run
blackto fix code style issues throughout the file.Additionally, the
_cost_functionmethod has too many local variables (18/15 per Pylint), making it difficult to read and maintain.black examples/sglang/components/kv_router.pyConsider refactoring
_cost_functionby extracting helper methods:
_process_overlap_scores()to handle score conversion
_process_worker_metrics()to handle metrics aggregation
_compute_worker_logits()to handle the scoring logicCost function mixes incomparable scales – results may be biased
gpu_cache_usage_percappears to be a value in[0,100]whilescoreis a fraction0-1(after block → token normalisation). Subtracting the raw percentage heavily outweighs the 2×cache-hit bonus, effectively turning the formula into "pick whoever uses 0 % GPU cache".Either:
- Convert
gpu_cache_usage_percto[0,1]by dividing by 100, or- Rescale all terms/weights based on empirical ranges
Example fix:
- gpu_cache_usage = metrics_dict["gpu_cache_usage_perc"] + gpu_cache_usage = metrics_dict["gpu_cache_usage_perc"] / 100.0🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 169-169: Too many local variables (18/15)
(R0914)
🧹 Nitpick comments (2)
examples/sglang/components/frontend.py (1)
49-57: Simplify by removing unnecessary else after return.def get_http_binary_path(): """Find the HTTP binary path in SDK or fallback to 'http' command.""" sdk_path = Path(sdk.__file__) binary_path = sdk_path.parent / "cli/bin/http" if not binary_path.exists(): return "http" - else: - return str(binary_path) + return str(binary_path)🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 53-56: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
examples/sglang/components/kv_router.py (1)
35-35: Remove unnecessary comment.-# faradawn hello
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
- examples/sglang/components/frontend.py(5 hunks)
- examples/sglang/components/kv_router.py(1 hunks)
- examples/sglang/components/worker.py(5 hunks)
- examples/sglang/configs/agg_router.yaml(1 hunks)
- launch/dynamo-run/src/subprocess/sglang_inc.py(1 hunks)
- lib/bindings/python/README.md(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- lib/bindings/python/README.md
- launch/dynamo-run/src/subprocess/sglang_inc.py
- examples/sglang/configs/agg_router.yaml
🧰 Additional context used
🧬 Code Graph Analysis (1)
examples/sglang/components/frontend.py (4)
examples/sglang/components/worker.py (2)
SGLangWorker(56-218)
generate(154-197)examples/sglang/components/kv_router.py (2)
Router(118-313)
generate(279-313)examples/sglang/utils/protocol.py (4)
Tokens(27-28)
PreprocessedRequest(68-74)
StopConditions(45-50)
SamplingOptions(53-65)deploy/sdk/src/dynamo/sdk/lib/decorators.py (3)
async_on_start(95-99)
endpoint(68-92)
on_shutdown(102-106)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/1323/merge) by faradawn.
examples/sglang/components/worker.py
[error] 1-1: isort formatting failed and modified the file. Please run isort to fix import sorting.
[error] 1-1: Black formatting failed and reformatted the file. Please run 'black' to fix code style issues.
examples/sglang/components/frontend.py
[error] 1-1: isort formatting failed and modified the file. Please run isort to fix import sorting.
[error] 1-1: Black formatting failed and reformatted the file. Please run 'black' to fix code style issues.
[error] 19-37: Ruff linting errors found and fixed in this file. Please run 'ruff --fix' to apply fixes.
examples/sglang/components/kv_router.py
[error] 1-1: isort formatting failed and modified the file. Please run isort to fix import sorting.
[error] 1-1: Black formatting failed and reformatted the file. Please run 'black' to fix code style issues.
🪛 Ruff (0.11.9)
examples/sglang/components/frontend.py
25-25: utils.protocol.Tokens imported but unused
Remove unused import
(F401)
25-25: utils.protocol.StopConditions imported but unused
Remove unused import
(F401)
25-25: utils.protocol.SamplingOptions imported but unused
Remove unused import
(F401)
26-26: transformers.AutoTokenizer imported but unused
Remove unused import: transformers.AutoTokenizer
(F401)
27-27: json imported but unused
Remove unused import: json
(F401)
30-30: dynamo.sdk.async_on_start imported but unused
Remove unused import
(F401)
30-30: dynamo.sdk.dynamo_context imported but unused
Remove unused import
(F401)
🪛 Pylint (3.3.7)
examples/sglang/components/frontend.py
[refactor] 29-29: Use 'from dynamo import sdk' instead
(R0402)
[refactor] 53-56: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
examples/sglang/components/kv_router.py
[refactor] 57-57: Too few public methods (0/2)
(R0903)
[refactor] 169-169: Too many local variables (18/15)
(R0914)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Build and Test - vllm
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
| from dynamo.llm import ( | ||
| ModelType, | ||
| register_llm, | ||
| ZmqKvEventPublisher, | ||
| ZmqKvEventPublisherConfig, | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix import formatting issues.
The pipeline indicates that isort and Black formatting failed. Please run these commands to fix the formatting:
isort examples/sglang/components/worker.py
black examples/sglang/components/worker.py🤖 Prompt for AI Agents
In examples/sglang/components/worker.py around lines 38 to 43, the import
statements have formatting issues flagged by isort and Black. To fix this, run
the commands "isort examples/sglang/components/worker.py" followed by "black
examples/sglang/components/worker.py" to automatically sort and format the
imports according to the project's style guidelines.
…g-integration-event-manager
| Progress update: Router Workflow Problem: need a "Preprocessed request" (token id, etc) to pass into SGLang. But dynamo-run couldn't seem to do the translation. Solution: dynamo-run is able to preprocess the request. Just need to reset etcd to route to the correct endpoint. | 
| Dynamo router is working Client side:  | 
| Closing as this is now a duplicate of #1605 | 
Overview:
When SGLang is launched without an explicit page_size, it silently falls back to 1, producing variable-sized “blocks” that Dynamo discards.
This patch guarantees the two systems stay aligned.
Details:
first, In sglang_inc.py. When building ServerArgs we now set page_size = config.kv_block_size if it is positive, otherwise we inject a safe default of 16 tokens. This prevents SGLang from defaulting to page_size=1.
second, In SGLang-side, in a companion change, _record_store_event() now slices prompts into page_size chunks before emitting BlockStored events so every published block is exactly kv_block_size tokens. Link to PR: sgl-project/sglang#6824
This eliminates dropped KV-cache events caused by size mismatches.
Ensures cache-aware routing works out-of-the-box even when the user forgets --kv-block-size.
Keeps behaviour consistent across vLLM, TRT-LLM and SGLang back-ends.
Where should the reviewer start?
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Bug Fixes
page_sizeis properly set with a default value when configuration is missing or invalid, preventing unintended behavior.Chores