- 
                Notifications
    You must be signed in to change notification settings 
- Fork 663
feat: FT downed worker instance tracking and skipping #1424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c76b633    to
    e939769      
    Compare
  
    * Add instance inhibition to Client * [PoC] Retry a failed request if the worker is unreachable * Remove request data copy * Refactor instance inhibit and restart algorithm
1ef7325    to
    e4d5b0a      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comments. let's discuss and we can work toward approval. we don't have to address everything in this PR.
12bab91    to
    54f3b07      
    Compare
  
    | WalkthroughThe changes introduce fault-tolerant instance selection to the client and push router components. The  Changes
 Sequence Diagram(s)sequenceDiagram
    participant Caller
    participant PushRouter
    participant Client
    participant Etcd
    Caller->>PushRouter: send_request(request)
    PushRouter->>Client: instances_avail()
    Client->>Etcd: fetch_instances()
    Etcd-->>Client: [instances]
    Client-->>PushRouter: [filtered_instances]
    PushRouter->>PushRouter: select_instance()
    PushRouter->>Instance: send(request)
    alt No responders error
        PushRouter->>Client: report_instance_down(instance_id)
    end
    Instance-->>PushRouter: response
    PushRouter-->>Caller: response
Poem
 📜 Recent review detailsConfiguration used: .coderabbit.yaml 📒 Files selected for processing (1)
 🚧 Files skipped from review as they are similar to previous changes (1)
 ⏰ Context from checks skipped due to timeout of 90000ms (4)
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
 Other keywords and placeholders
 Documentation and Community
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
lib/runtime/src/component/client.rs (2)
137-169: ConsiderInstant+ configurable TTL & separate cleanup pass
instances_avail:
- Uses
SystemTime(wall-clock). Clock jumps can incorrectly expire or extend inhibitions.std::time::Instantis monotonic and better suited.- Hard-codes
ETCD_LEASE_TTL = 10s. Making this configurable (field or const generic) eases tuning per deployment.- Mutates
inhibitedwhile iterating withfilter_map, which is fine but slightly obscures intent. Doing a short upfront retain of fresh entries, then filtering, is clearer and keeps mutation separate from read path.Optional but recommended:
const DEFAULT_TTL: Duration = Duration::from_secs(10); pub async fn instances_avail(&self) -> Vec<Instance> { let now = Instant::now(); let mut inhibited = self.instance_inhibited.lock().await; // purge stale entries first inhibited.retain(|_, ts| now.duration_since(*ts) <= DEFAULT_TTL); self.instances() .into_iter() .filter(|inst| !inhibited.contains_key(&inst.id())) .collect() }You’d store
Instanttimestamps in the map:inhibited.insert(instance_id, Instant::now());This tightens correctness and slightly improves readability.
178-182: Minor: Avoid silent overwrite when reporting the same instance rapidly
inhibited.insert(instance_id, now)overwrites any existing timestamp, potentially resetting the TTL indefinitely if the same failing instance keeps returning quickly.If the goal is to keep the earliest failure time, prefer
entry().or_insert(now); if the goal is to extend the TTL, keep as-is. Clarify intent with a comment or adjust accordingly.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
- lib/runtime/src/component/client.rs(5 hunks)
- lib/runtime/src/pipeline/network/egress/push_router.rs(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: Build and Test - vllm
| Generally, looks good. Couple questions: 
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synced offline. I think we can merge now and continue working on suggested optimizations
Update the list of active instances in a background thread, and use [arc-swap](https://crates.io/crates/arc-swap) to make the new list active. This is broadly similar to updating an atomic pointer. This makes the router's `client.random(request)` and `client.round_robin(request)` lock-free and usually wait-free. Lock was introduced here: #1424
Update the list of active instances in a background thread, and use [arc-swap](https://crates.io/crates/arc-swap) to make the new list active. This is broadly similar to updating an atomic pointer. This makes the router's `client.random(request)` and `client.round_robin(request)` lock-free and usually wait-free. Lock was introduced here: #1424
Overview:
Track unreachable instances at the router level, and skip the downed instances for future request before ETCD can discover the failure.
Details:
The goal of this change is to drastically reduce the number of request failures, due to downed instance(s), before the availability can be updated from ETCD lease.
Test setup:
The Client is sending a request to the Processor every 0.1 seconds, for a total of 100 requests over 10 seconds. The Processor round-robins the received requests to the 2 Workers. The Worker returns a response back to the Processor immediately after it receives it, and the same goes for the Processor back to the Client.
To demonstrate reduced number of request failure, 1 of the 2 Worker is stopped while the Client is sending request.
Before the change, the number of failed request from the Processor back to the Client is 32 / 100, because the router does not know one of the Worker has stopped before ETCD is able to update with its lease.
After the change, the number of failed request is reduced to 1 / 100, because the failed worker is temporary removed from the list of workers that joins the round-robin after its first failure on a request.
The Processor Worker Client example is from this commit.
Note: The actual number of failure before the change may vary depending on when the worker is stopped during the 10 seconds test and how quickly ETCD can figure out the worker is stopped thought its lease. The number of failure after the change should always be 1 because it only needs 1 failure to stop new requests from routing to the stopped worker until ETCD can update.
Where should the reviewer start?
Start with client.rs on how the downed instances are tracked in a hash map, and then to the updates on the push_router.rs on down detection and skipping to the routing algorithms
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
N/A
Summary by CodeRabbit
New Features
Refactor
Bug Fixes