Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions launch/dynamo-run/src/subprocess/vllm_v1_inc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
# Can also be used standalone: `python3 vllm_inc.py` - lots of optional cmd line params

# Setup checklist:
# - We are in a virtualenv with vllm installed. Must be newer than v0.9.0 (currently pre-release)
# 1f079540db5f1080a2f61a730da50d3009934c5a - this commit is working for me
# - We are in a virtualenv with vllm installed. V1 is compatible with v0.9.0
# Steps:
# git clone https://github.com/vllm-project/vllm.git
# cd vllm && git checkout 1f079540db5f1080a2f61a730da50d3009934c5a
# cd vllm && git checkout v0.9.0
# uv pip uninstall ai-dynamo-vllm
# VLLM_USE_PRECOMPILED=1 uv pip install --editable .

Expand All @@ -34,10 +33,10 @@
from vllm.v1.metrics.stats import IterationStats, SchedulerStats

from dynamo.llm import (
KvEventPublisherFromZmq,
KvEventPublisherFromZmqConfig,
KvMetricsPublisher,
ModelType,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
register_llm,
)
from dynamo.runtime import Component, DistributedRuntime, dynamo_worker
Expand Down Expand Up @@ -248,11 +247,11 @@ async def init(runtime: DistributedRuntime, config: Config):

logger.info("VllmWorker has been initialized")

zmq_config = KvEventPublisherFromZmqConfig(
zmq_config = ZmqKvEventPublisherConfig(
worker_id=endpoint.lease_id(), kv_block_size=engine_args.block_size
)

_ = KvEventPublisherFromZmq(component=component, config=zmq_config)
_ = ZmqKvEventPublisher(component=component, config=zmq_config)

handler = RequestHandler(component, engine_client, default_sampling_params)

Expand Down
4 changes: 2 additions & 2 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<llm::kv::AggregatedMetrics>()?;
m.add_class::<llm::kv::KvMetricsAggregator>()?;
m.add_class::<llm::kv::KvEventPublisher>()?;
m.add_class::<llm::kv::KvEventPublisherFromZmq>()?;
m.add_class::<llm::kv::KvEventPublisherFromZmqConfig>()?;
m.add_class::<llm::kv::ZmqKvEventPublisher>()?;
m.add_class::<llm::kv::ZmqKvEventPublisherConfig>()?;
m.add_class::<llm::kv::KvRecorder>()?;
m.add_class::<llm::nats::NatsQueue>()?;
m.add_class::<http::HttpService>()?;
Expand Down
14 changes: 7 additions & 7 deletions lib/bindings/python/rust/llm/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl KvMetricsPublisher {

#[pyclass]
#[derive(Clone)]
pub struct KvEventPublisherFromZmqConfig {
pub struct ZmqKvEventPublisherConfig {
#[pyo3(get, set)]
pub worker_id: i64,
#[pyo3(get, set)]
Expand All @@ -140,7 +140,7 @@ pub struct KvEventPublisherFromZmqConfig {
}

#[pymethods]
impl KvEventPublisherFromZmqConfig {
impl ZmqKvEventPublisherConfig {
#[new]
#[pyo3(signature = (
worker_id,
Expand All @@ -164,16 +164,16 @@ impl KvEventPublisherFromZmqConfig {
}

#[pyclass]
pub(crate) struct KvEventPublisherFromZmq {
inner: llm_rs::kv_router::publisher::KvEventPublisherFromZmq,
pub(crate) struct ZmqKvEventPublisher {
inner: llm_rs::kv_router::publisher::ZmqKvEventPublisher,
}

#[pymethods]
impl KvEventPublisherFromZmq {
impl ZmqKvEventPublisher {
#[new]
fn new(component: Component, config: KvEventPublisherFromZmqConfig) -> PyResult<Self> {
fn new(component: Component, config: ZmqKvEventPublisherConfig) -> PyResult<Self> {
let mut inner =
llm_rs::kv_router::publisher::KvEventPublisherFromZmq::new(config.kv_block_size);
llm_rs::kv_router::publisher::ZmqKvEventPublisher::new(config.kv_block_size);
inner.start_background_task(
component.inner,
config.worker_id,
Expand Down
10 changes: 5 additions & 5 deletions lib/bindings/python/src/dynamo/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ class KvEventPublisher:
"""
...

class KvEventPublisherFromZmqConfig:
class ZmqKvEventPublisherConfig:
def __init__(
self,
worker_id: int,
Expand All @@ -588,7 +588,7 @@ class KvEventPublisherFromZmqConfig:
zmq_topic: str = ""
) -> None:
"""
Configuration for the KvEventPublisherFromZmq.
Configuration for the ZmqKvEventPublisher.

:param worker_id: The worker ID.
:param kv_block_size: The block size for the key-value store.
Expand All @@ -597,10 +597,10 @@ class KvEventPublisherFromZmqConfig:
"""
...

class KvEventPublisherFromZmq:
def __init__(self, component: Component, config: KvEventPublisherFromZmqConfig) -> None:
class ZmqKvEventPublisher:
def __init__(self, component: Component, config: ZmqKvEventPublisherConfig) -> None:
"""
Initializes a new KvEventPublisherFromZmq instance.
Initializes a new ZmqKvEventPublisher instance.

:param component: The component to be used.
:param config: Configuration for the event publisher.
Expand Down
4 changes: 2 additions & 2 deletions lib/bindings/python/src/dynamo/llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
from dynamo._core import HttpError as HttpError
from dynamo._core import HttpService as HttpService
from dynamo._core import KvEventPublisher as KvEventPublisher
from dynamo._core import KvEventPublisherFromZmq as KvEventPublisherFromZmq
from dynamo._core import KvEventPublisherFromZmqConfig as KvEventPublisherFromZmqConfig
from dynamo._core import KvIndexer as KvIndexer
from dynamo._core import KvMetricsAggregator as KvMetricsAggregator
from dynamo._core import KvMetricsPublisher as KvMetricsPublisher
from dynamo._core import KvRecorder as KvRecorder
from dynamo._core import KvRouter as KvRouter
from dynamo._core import ModelType as ModelType
from dynamo._core import OverlapScores as OverlapScores
from dynamo._core import ZmqKvEventPublisher as ZmqKvEventPublisher
from dynamo._core import ZmqKvEventPublisherConfig as ZmqKvEventPublisherConfig
from dynamo._core import register_llm as register_llm

try:
Expand Down
4 changes: 3 additions & 1 deletion lib/llm/src/kv_router/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ pub struct WorkerSelectionResult {

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ForwardPassMetrics {
pub data_parallel_rank: Option<u32>, // backwards compatible
// https://lmsys.org/blog/2024-12-04-sglang-v0-4/#data-parallelism-attention-for-deepseek-models
// Data parallel ranks are semi-independent, so we need to track metrics at the DP level
pub data_parallel_rank: Option<u32>, // Optional for backwards compatibility
pub request_active_slots: u64,
pub request_total_slots: u64,
pub kv_active_blocks: u64,
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/kv_router/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ fn start_publish_task(
// For more info on zmq: https://zeromq.org/
// This publisher reads those events and publishes them to NATS
// The indexer will get the events from NATS and put them in the global prefix tree.
pub struct KvEventPublisherFromZmq {
pub struct ZmqKvEventPublisher {
kv_block_size: usize,
processor_handle: Option<tokio::task::JoinHandle<()>>,
zmq_handle: Option<tokio::task::JoinHandle<()>>,
zmq_token: Option<dynamo_runtime::CancellationToken>,
warning_count: Arc<AtomicU32>,
}

impl KvEventPublisherFromZmq {
impl ZmqKvEventPublisher {
pub fn new(kv_block_size: usize) -> Self {
Self {
kv_block_size,
Expand Down
Loading