-
Notifications
You must be signed in to change notification settings - Fork 663
feat: use atomic transactions when creating etcd kv #2044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe Changes
Sequence Diagram(s)sequenceDiagram
participant Client1 as Client 1
participant ClientN as Client N
participant EtcdBucket
participant etcd
Client1->>EtcdBucket: create(key, value)
ClientN->>EtcdBucket: create(key, value)
EtcdBucket->>etcd: txn (if key.version == 0) put(key, value)
etcd-->>EtcdBucket: txn response (success for one, fail for others)
EtcdBucket-->>Client1: Created(1) or Exists(version)
EtcdBucket-->>ClientN: Created(1) or Exists(version)
Estimated code review effort2 (~15 minutes) Poem
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
lib/runtime/src/storage/key_value_store/etcd.rs (2)
157-199: Implementation correctly uses atomic transactions to prevent race conditions.The refactored implementation properly addresses the race condition by using etcd's transaction API to atomically check and create keys. The logic is sound and handles both success and existence cases appropriately.
Consider improving the error message to provide more context:
- Err(StorageError::EtcdError( - "Unexpected transaction response".to_string(), - )) + Err(StorageError::EtcdError( + format!("Unexpected transaction response for key: {}", k), + ))
283-287: Unnecessary mutex wrapping for the bucket.The
KeyValueBuckettrait already requiresSend + Sync, so wrapping it inArc<Mutex<>>adds unnecessary synchronization overhead. You can useArcdirectly:- let bucket = Arc::new(tokio::sync::Mutex::new( - storage - .get_or_create_bucket("test_concurrent_bucket", None) - .await?, - )); + let bucket = Arc::from( + storage + .get_or_create_bucket("test_concurrent_bucket", None) + .await?, + );And update the usage sites to remove
.lock().await:- let result = bucket_clone - .lock() - .await - .insert(key_clone, value_clone, 0) - .await; + let result = bucket_clone + .insert(key_clone, value_clone, 0) + .await;
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
lib/runtime/src/storage/key_value_store/etcd.rs(3 hunks)
🧠 Learnings (2)
📓 Common learnings
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1285
File: lib/llm/src/kv_router/scheduler.rs:260-266
Timestamp: 2025-05-30T06:34:12.785Z
Learning: In the KV router scheduler code, PeaBrane prefers fail-fast behavior over silent failure handling. When accessing worker metrics data that could be out-of-bounds (like dp_rank indexing), explicit panics are preferred over graceful degradation with continue statements to ensure data integrity issues are caught early.
lib/runtime/src/storage/key_value_store/etcd.rs (6)
Learnt from: jthomson04
PR: #1429
File: lib/runtime/src/utils/leader_worker_barrier.rs:69-72
Timestamp: 2025-06-08T03:12:03.985Z
Learning: In the leader-worker barrier implementation in lib/runtime/src/utils/leader_worker_barrier.rs, the wait_for_key_count function correctly uses exact equality (==) instead of greater-than-or-equal (>=) because worker IDs must be unique (enforced by etcd create-only operations), ensuring exactly the expected number of workers can register.
Learnt from: grahamking
PR: #1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.
Learnt from: alec-flowers
PR: #1181
File: lib/llm/src/kv_router/publisher.rs:379-425
Timestamp: 2025-05-29T00:02:35.018Z
Learning: In lib/llm/src/kv_router/publisher.rs, the functions create_stored_blocks and create_stored_block_from_parts are correctly implemented and not problematic duplications of existing functionality elsewhere in the codebase.
Learnt from: ryanolson
PR: #1919
File: lib/runtime/src/engine.rs:168-168
Timestamp: 2025-07-14T21:25:56.930Z
Learning: The AsyncEngineContextProvider trait in lib/runtime/src/engine.rs was intentionally changed from Send + Sync + Debug to Send + Debug because the Sync bound was overly constraining. The trait should only require Send + Debug as designed.
Learnt from: kthui
PR: #1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
Learnt from: ryanolson
PR: #1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1285
File: lib/llm/src/kv_router/scheduler.rs:260-266
Timestamp: 2025-05-30T06:34:12.785Z
Learning: In the KV router scheduler code, PeaBrane prefers fail-fast behavior over silent failure handling. When accessing worker metrics data that could be out-of-bounds (like dp_rank indexing), explicit panics are preferred over graceful degradation with continue statements to ensure data integrity issues are caught early.
lib/runtime/src/storage/key_value_store/etcd.rs (6)
Learnt from: jthomson04
PR: #1429
File: lib/runtime/src/utils/leader_worker_barrier.rs:69-72
Timestamp: 2025-06-08T03:12:03.985Z
Learning: In the leader-worker barrier implementation in lib/runtime/src/utils/leader_worker_barrier.rs, the wait_for_key_count function correctly uses exact equality (==) instead of greater-than-or-equal (>=) because worker IDs must be unique (enforced by etcd create-only operations), ensuring exactly the expected number of workers can register.
Learnt from: grahamking
PR: #1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.
Learnt from: alec-flowers
PR: #1181
File: lib/llm/src/kv_router/publisher.rs:379-425
Timestamp: 2025-05-29T00:02:35.018Z
Learning: In lib/llm/src/kv_router/publisher.rs, the functions create_stored_blocks and create_stored_block_from_parts are correctly implemented and not problematic duplications of existing functionality elsewhere in the codebase.
Learnt from: ryanolson
PR: #1919
File: lib/runtime/src/engine.rs:168-168
Timestamp: 2025-07-14T21:25:56.930Z
Learning: The AsyncEngineContextProvider trait in lib/runtime/src/engine.rs was intentionally changed from Send + Sync + Debug to Send + Debug because the Sync bound was overly constraining. The trait should only require Send + Debug as designed.
Learnt from: kthui
PR: #1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
Learnt from: ryanolson
PR: #1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Build and Test - vllm
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
🔇 Additional comments (3)
lib/runtime/src/storage/key_value_store/etcd.rs (3)
23-23: LGTM!The transaction-related imports are necessary for the atomic implementation.
258-259: Why is the feature flag commented out?The commented
#[cfg(feature = "integration")]suggests this test requires external dependencies (etcd). Should this be an integration test gated behind the feature flag?
368-387: Comprehensive test assertions validate the atomic behavior.The test correctly verifies that:
- Exactly one worker creates the key
- All other workers observe the key exists
- All operations complete successfully
- The key is properly stored in etcd
This provides strong evidence that the race condition has been eliminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice!
Overview:
As titled, closes #1908
Previously, there was a race window between checking the existence of an entry and creating one. Using atomic transactions should address this.
Added a test to confirm (10 concurrent workers)
Summary by CodeRabbit
Refactor
Tests