Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5a21312
add patch for sglang to call dynamo event manager
faradawn Jun 2, 2025
e699408
Merge branch 'main' into sglang-integration-event-manager
faradawn Jun 5, 2025
5dbf090
add default page size to avoid sglang having page size of 1
faradawn Jun 5, 2025
ace979b
Merge branch 'sglang-integration-event-manager' of github.com:faradaw…
faradawn Jun 5, 2025
f61a7c1
remove old patch
faradawn Jun 5, 2025
cfa1cb9
Merge branch 'main' of https://github.com/ai-dynamo/dynamo into sglan…
faradawn Jun 7, 2025
53ea353
1st draft of router
faradawn Jun 10, 2025
3c5781a
Merge branch 'ai-dynamo:main' into sglang-integration-event-manager
faradawn Jun 10, 2025
077ead1
got sglang router graph working
faradawn Jun 13, 2025
7f6f2f1
update sglang router example in readme
faradawn Jun 15, 2025
8507146
fix bindings readme
faradawn Jun 15, 2025
39fd160
update sglang docker commit
faradawn Jun 15, 2025
ab379c6
Merge branch 'main' of https://github.com/faradawn/dynamo into sglang…
faradawn Jun 15, 2025
e5b2cdc
made sglang router example working
faradawn Jun 15, 2025
127eac6
Merge branch 'main' of https://github.com/ai-dynamo/dynamo into sglan…
faradawn Jun 15, 2025
69bb71e
fixing input pasing to sglang
faradawn Jun 15, 2025
aac3b8f
redo frontend small change
faradawn Jun 16, 2025
45e7294
raw sglang frontend
faradawn Jun 16, 2025
29846bf
processing parsing preprocessed errro
faradawn Jun 16, 2025
8f821e1
Merge branch 'main' of https://github.com/ai-dynamo/dynamo into sglan…
faradawn Jun 16, 2025
e55aa27
Merge branch 'ai-dynamo:main' into sglang-integration-event-manager
faradawn Jun 20, 2025
bde922b
Merge branch 'sglang-integration-event-manager' of https://github.com…
faradawn Jun 20, 2025
a818bfb
revert to main branch, frontend and worker working
faradawn Jun 20, 2025
e2817e9
revert readme
faradawn Jun 20, 2025
9e669c7
add kv zmq publisher to worker, it is working
faradawn Jun 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions container/Dockerfile.sglang
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ RUN if [ "$ARCH" = "arm64" ]; then \

# Install sglang
# Once either 0.4.6post6 or 0.4.7 is released, we can switch back to using the published version
# This commit references a fix to add DP attention based routing along with other perf fixes https://github.com/sgl-project/sglang/pull/6884
ARG SGLANG_COMMIT="f1569876d54dd3b6601f5280f12652e9fbb1375c"
# This commit references multiple perf fixes for DP attention and NIXL https://github.com/sgl-project/sglang/pull/6780
# 6/2(ishan) - moving to ToT for performance purposes
# 6/12(faradawn) - this commit fixes ZMQ KV event publisher inside sglang
ARG SGLANG_COMMIT="777688b8929c877e4e28c2eac208d776abe4c3af"
RUN --mount=type=cache,target=/root/.cache/uv \
git clone https://github.com/sgl-project/sglang.git && \
cd sglang && \
Expand Down
1 change: 1 addition & 0 deletions examples/sglang/components/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def start_ingress_and_processor(self):
f"out={endpoint}",
"--http-port",
str(self.frontend_config.port),
"--router-mode", "kv",
],
stdout=None,
stderr=None,
Expand Down
88 changes: 85 additions & 3 deletions examples/sglang/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@
from utils.protocol import DisaggPreprocessedRequest, PreprocessedRequest
from utils.sglang import parse_sglang_args

from dynamo.llm import ModelType, register_llm
from dynamo.llm import (
ModelType,
register_llm,
WorkerMetricsPublisher,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
)
from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service

logger = logging.getLogger(__name__)
Expand All @@ -55,21 +61,77 @@ def __init__(self):
class_name = self.__class__.__name__
self.engine_args = parse_sglang_args(class_name, "")
self.engine = sgl.Engine(server_args=self.engine_args)

# Initialize metrics publisher
self.metrics_publisher = WorkerMetricsPublisher()

logger.info("=== new file SGLangWorker initialized with kv publisher")

logger.info("SGLangWorker initialized")
def _update_metrics(self):
"""Update metrics with current engine state"""
# TODO: Get actual metrics from SGLang engine
# For now, publish reasonable default values to keep KV router happy
self.metrics_publisher.publish(
request_active_slots=1, # Assume 1 active request during processing
request_total_slots=100,
kv_active_blocks=random.randint(0, 500), # Random for now
kv_total_blocks=1000,
num_requests_waiting=0, # TODO: get from engine queue
gpu_cache_usage_perc=random.uniform(0.1, 0.8), # Random for now
gpu_prefix_cache_hit_rate=random.uniform(0.0, 0.5), # Random for now
)

async def create_metrics_publisher_endpoint(self):
"""Create the `load_metrics` endpoint that the KV-router polls.

We must reuse the *current* component instance (available through
``dynamo_context["component"]``) because that instance already owns
the primary lease. Creating a fresh `Component` handle and passing
it to `create_endpoint` would block forever while waiting for the
lease, which is exactly the hang that was observed.
"""
component = dynamo_context["component"]
logger.info("=== me Creating metrics publisher endpoint with primary lease")
await self.metrics_publisher.create_endpoint(component)
logger.info("=== me Metrics publisher endpoint created")

@async_on_start
async def async_init(self):
runtime = dynamo_context["runtime"]
logger.info("Registering LLM for discovery")
logger.info("=== worker async_initRegistering LLM for discovery")
comp_ns, comp_name = SGLangWorker.dynamo_address() # type: ignore
endpoint = runtime.namespace(comp_ns).component(comp_name).endpoint("generate")
component = runtime.namespace(comp_ns).component(comp_name)

logger.info(f"=== new file register llm with kv block size 16, endpoint={endpoint}, model_path={self.engine_args.model_path}, served_model_name={self.engine_args.served_model_name}")
await register_llm(
ModelType.Backend,
endpoint,
self.engine_args.model_path,
self.engine_args.served_model_name,
kv_cache_block_size=self.engine_args.page_size,
)

logger.info(f"=== worker publishing initial metrics")

self.metrics_publisher.publish(
request_active_slots=0,
request_total_slots=1024, # TODO: get from SGLang engine config
kv_active_blocks=0,
kv_total_blocks=1024, # TODO: get from SGLang engine config
num_requests_waiting=0,
gpu_cache_usage_perc=0.0,
gpu_prefix_cache_hit_rate=0.0,
)

logger.info(f"=== worker donepublishing initial metrics")

# Create metrics publisher endpoint for KV router discovery
task = asyncio.create_task(self.create_metrics_publisher_endpoint())
task.add_done_callback(
lambda _: logger.info("=== worker metrics publisher endpoint created")
)

if self.engine_args.disaggregation_mode:
self.bootstrap_host, self.bootstrap_port = self._get_bootstrap_info()
comp_ns, comp_name = SGLangDecodeWorker.dynamo_address() # type: ignore
Expand All @@ -80,6 +142,22 @@ async def async_init(self):
.client()
)

# Configure ZMQ KV Event Publisher to relay KV events from SGLang to NATS
# The worker_id is set to the current lease ID so that the router can
# attribute KV-events to the correct backend worker.
zmq_config = ZmqKvEventPublisherConfig(
worker_id=endpoint.lease_id(),
kv_block_size=self.engine_args.page_size, # Keep in sync with register_llm above
)

# Keep a reference on the instance to avoid the publisher being garbage-collected.
self._kv_event_publisher = ZmqKvEventPublisher(
component=component,
config=zmq_config,
)

logger.info("=== worker ZMQ publisher created")

def _get_bootstrap_info(self):
"""
Bootstrap info is stored in the worker's tokenizer manager. We use it to
Expand Down Expand Up @@ -114,6 +192,10 @@ def _build_sampling_params(self, request: PreprocessedRequest) -> dict:

@endpoint()
async def generate(self, request: PreprocessedRequest):
# Update metrics for KV router
logger.info(f"=== worker generate request={request}")
self._update_metrics()

# TODO: maintain a mapping from SGLang's Ouput struct to LLMEngineOuput
sampling_params = self._build_sampling_params(request)

Expand Down
1 change: 1 addition & 0 deletions examples/sglang/configs/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ SGLangWorker:
model-path: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
served-model-name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
tp: 1
page-size: 16
trust-remote-code: true
skip-tokenizer-init: true
ServiceArgs:
Expand Down
2 changes: 1 addition & 1 deletion examples/sglang/utils/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ class DisaggPreprocessedRequest(BaseModel):
bootstrap_host: str
bootstrap_port: int
bootstrap_room: int
data_parallel_rank: Optional[int] = None
data_parallel_rank: Optional[int] = None
2 changes: 1 addition & 1 deletion examples/sglang/utils/sglang.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ def _reserve_disaggregation_bootstrap_port():
machine to avoid collisions.
"""
with reserve_free_port() as port:
return port
return port
2 changes: 1 addition & 1 deletion launch/dynamo-run/src/subprocess/sglang_inc.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,4 @@ def cmd_line_args():

if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
asyncio.run(worker())
Loading