- 
                Notifications
    You must be signed in to change notification settings 
- Fork 663
feat: Utilities for distributed leader-worker barriers #1429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| WalkthroughA new distributed leader-worker barrier synchronization mechanism using etcd has been introduced. This includes a new feature flag  Changes
 Sequence Diagram(s)sequenceDiagram
    participant Leader as LeaderBarrier
    participant Etcd as etcd
    participant Worker as WorkerBarrier
    Leader->>Etcd: Publish barrier data under barrier_id
    loop For each worker
        Worker->>Etcd: Wait for barrier data
        Worker->>Etcd: Register worker under barrier_id/workers/worker_id
    end
    Etcd-->>Leader: Notify when all workers have registered
    Leader->>Etcd: Signal completion or abort
    Worker->>Etcd: Wait for completion or abort signal
    Etcd-->>Worker: Return barrier result or error
Poem
 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
 Other keywords and placeholders
 Documentation and Community
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
lib/runtime/src/utils.rs (1)
18-19: Consider feature-gating the new module
leader_worker_barrierdepends on theetcdtransport. Compiling the crate without an ETCD backend will now always pull those deps in.
If “no-etcd” builds are still a supported target, guard the module behind the same feature that includes the transport, e.g.#[cfg(feature = "etcd")] pub mod leader_worker_barrier;and re-export conditionally from root
lib.rs.lib/runtime/src/utils/leader_worker_barrier.rs (2)
81-90: Unchecked UTF-8 conversion may panic
kv.key_str().unwrap()will panic if the key contains non-UTF-8 bytes. Prefer graceful handling:-let key = kv.key_str().unwrap().to_string(); +let key = String::from_utf8_lossy(kv.key()).into_owned();(or propagate an error).
120-123: Avoidunwrap()in library code
serde_json::to_vec(&()).unwrap()cannot fail today but usingexpect("serialize unit")(or proper error propagation) avoids panics if upstream behaviour changes.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
- lib/runtime/Cargo.toml(1 hunks)
- lib/runtime/src/utils.rs(1 hunks)
- lib/runtime/src/utils/leader_worker_barrier.rs(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: Build and Test - vllm
🔇 Additional comments (2)
lib/runtime/Cargo.toml (1)
30-31: Feature naming & CI gating check
testing-etcdis introduced correctly, but make sure:
- CI pipelines activate this feature only when an ETCD instance is available; otherwise tests will silently be skipped.
- If you ever merge a non-test feature set that doesn’t pull in
etcd-client, consider wrapping this feature behind a wideretcd/distributedfeature to avoid unconditional compilation.No code change required, just double-check the pipeline configuration.
lib/runtime/src/utils/leader_worker_barrier.rs (1)
314-323: Race condition whencompleteappears before worker keyIf the leader finishes extremely quickly,
completemay be written before the worker registration succeeds, yieldingAlreadyCompleted.
Document this behaviour or consider retrying registration until either:
- registration succeeds, or
- the barrier is already completed with a matching worker set.
5fa1d45    to
    b9ae957      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
lib/runtime/src/utils/leader_worker_barrier.rs (1)
105-112: Do not collapse everykv_createfailure intoBarrierIdNotUnique.The TODO is still unresolved – all ETCD errors are mapped to
BarrierIdNotUnique, masking network/auth failures and making troubleshooting impossible. Please inspect the returned error and convert only “already-exists” cases, propagating everything else asEtcdError(or a dedicated variant).
🧹 Nitpick comments (2)
lib/runtime/src/utils/leader_worker_barrier.rs (2)
81-85: Avoidunwrap()on possibly non-UTF-8 ETCD keys.
kv.key_str().unwrap()will panic if a rogue client writes non-UTF-8 keys under the same prefix. Convert the error intoEtcdErrorinstead:- let key = kv.key_str().unwrap().to_string(); + let key = kv + .key_str() + .map(|s| s.to_owned()) + .map_err(|e| LeaderWorkerBarrierError::EtcdError(e.into()))?;
60-67: Usetokio::time::timeoutinstead of re-creating a long sleep each loop.Recomputing
remaining_timeand spawning a full-length sleep on every iteration is wasteful and makes the select harder to read. A cleaner pattern:- tokio::select! { - Some(watch_event) = rx.recv() => { … } - _ = tokio::time::sleep(remaining_time) => { /* timeout */ } - } + let recv = tokio::time::timeout(remaining_time, rx.recv()); + match recv.await { + Ok(Some(watch_event)) => handle_watch_event(watch_event, &mut data)?, + Ok(None) => return Err(LeaderWorkerBarrierError::EtcdError(anyhow::anyhow!("watch closed"))), + Err(_) => {/* timed out, loop and check elapsed */}, + }Improves readability and removes the needless cancellation of the sleep future on every event.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
- lib/runtime/Cargo.toml(1 hunks)
- lib/runtime/src/utils.rs(1 hunks)
- lib/runtime/src/utils/leader_worker_barrier.rs(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- lib/runtime/src/utils.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- lib/runtime/Cargo.toml
🧰 Additional context used
🧠 Learnings (1)
lib/runtime/src/utils/leader_worker_barrier.rs (1)
Learnt from: jthomson04
PR: ai-dynamo/dynamo#1429
File: lib/runtime/src/utils/leader_worker_barrier.rs:69-72
Timestamp: 2025-06-08T03:12:03.964Z
Learning: In the leader-worker barrier implementation in lib/runtime/src/utils/leader_worker_barrier.rs, the `wait_for_key_count` function correctly uses exact equality (`==`) instead of greater-than-or-equal (`>=`) because worker IDs must be unique (enforced by etcd create-only operations), ensuring exactly the expected number of workers can register.
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: Build and Test - vllm
- GitHub Check: pre-merge-rust (.)
Summary by CodeRabbit