Skip to content

Conversation

@PeaBrane
Copy link
Contributor

@PeaBrane PeaBrane commented Jul 22, 2025

Overview:

As titled, closes #1908

Previously, there was a race window between checking the existence of an entry and creating one. Using atomic transactions should address this.

Added a test to confirm (10 concurrent workers)

Summary by CodeRabbit

  • Refactor

    • Improved the reliability of key creation by making the operation atomic, ensuring only one creation succeeds when multiple requests occur simultaneously.
  • Tests

    • Added tests to verify correct behavior when multiple concurrent attempts are made to create the same key, ensuring only one creation is successful and others detect the key already exists.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 22, 2025

Walkthrough

The EtcdBucket::create method was refactored to use an atomic etcd transaction for key creation, ensuring only one concurrent creator succeeds. A new test module was added to verify correct behavior under concurrent creation attempts. No changes were made to public APIs or other methods.

Changes

File(s) Change Summary
lib/runtime/src/storage/key_value_store/etcd.rs Refactored create to use atomic etcd transaction; added concurrent_create_tests test module for concurrency correctness.

Sequence Diagram(s)

sequenceDiagram
    participant Client1 as Client 1
    participant ClientN as Client N
    participant EtcdBucket
    participant etcd

    Client1->>EtcdBucket: create(key, value)
    ClientN->>EtcdBucket: create(key, value)
    EtcdBucket->>etcd: txn (if key.version == 0) put(key, value)
    etcd-->>EtcdBucket: txn response (success for one, fail for others)
    EtcdBucket-->>Client1: Created(1) or Exists(version)
    EtcdBucket-->>ClientN: Created(1) or Exists(version)
Loading

Estimated code review effort

2 (~15 minutes)

Poem

In the warren of keys, where the data bunnies play,
Atomic hops now keep the race at bay.
Only one rabbit claims the carrot, the rest must wait—
With a transactional leap, we synchronize fate!
🥕

Test burrows checked, all safe and sound—
No more race conditions to be found!


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
lib/runtime/src/storage/key_value_store/etcd.rs (2)

157-199: Implementation correctly uses atomic transactions to prevent race conditions.

The refactored implementation properly addresses the race condition by using etcd's transaction API to atomically check and create keys. The logic is sound and handles both success and existence cases appropriately.

Consider improving the error message to provide more context:

-        Err(StorageError::EtcdError(
-            "Unexpected transaction response".to_string(),
-        ))
+        Err(StorageError::EtcdError(
+            format!("Unexpected transaction response for key: {}", k),
+        ))

283-287: Unnecessary mutex wrapping for the bucket.

The KeyValueBucket trait already requires Send + Sync, so wrapping it in Arc<Mutex<>> adds unnecessary synchronization overhead. You can use Arc directly:

-        let bucket = Arc::new(tokio::sync::Mutex::new(
-            storage
-                .get_or_create_bucket("test_concurrent_bucket", None)
-                .await?,
-        ));
+        let bucket = Arc::from(
+            storage
+                .get_or_create_bucket("test_concurrent_bucket", None)
+                .await?,
+        );

And update the usage sites to remove .lock().await:

-                let result = bucket_clone
-                    .lock()
-                    .await
-                    .insert(key_clone, value_clone, 0)
-                    .await;
+                let result = bucket_clone
+                    .insert(key_clone, value_clone, 0)
+                    .await;
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4449f3d and 31828ec.

📒 Files selected for processing (1)
  • lib/runtime/src/storage/key_value_store/etcd.rs (3 hunks)
🧠 Learnings (2)
📓 Common learnings
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1285
File: lib/llm/src/kv_router/scheduler.rs:260-266
Timestamp: 2025-05-30T06:34:12.785Z
Learning: In the KV router scheduler code, PeaBrane prefers fail-fast behavior over silent failure handling. When accessing worker metrics data that could be out-of-bounds (like dp_rank indexing), explicit panics are preferred over graceful degradation with continue statements to ensure data integrity issues are caught early.
lib/runtime/src/storage/key_value_store/etcd.rs (6)

Learnt from: jthomson04
PR: #1429
File: lib/runtime/src/utils/leader_worker_barrier.rs:69-72
Timestamp: 2025-06-08T03:12:03.985Z
Learning: In the leader-worker barrier implementation in lib/runtime/src/utils/leader_worker_barrier.rs, the wait_for_key_count function correctly uses exact equality (==) instead of greater-than-or-equal (>=) because worker IDs must be unique (enforced by etcd create-only operations), ensuring exactly the expected number of workers can register.

Learnt from: grahamking
PR: #1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.

Learnt from: alec-flowers
PR: #1181
File: lib/llm/src/kv_router/publisher.rs:379-425
Timestamp: 2025-05-29T00:02:35.018Z
Learning: In lib/llm/src/kv_router/publisher.rs, the functions create_stored_blocks and create_stored_block_from_parts are correctly implemented and not problematic duplications of existing functionality elsewhere in the codebase.

Learnt from: ryanolson
PR: #1919
File: lib/runtime/src/engine.rs:168-168
Timestamp: 2025-07-14T21:25:56.930Z
Learning: The AsyncEngineContextProvider trait in lib/runtime/src/engine.rs was intentionally changed from Send + Sync + Debug to Send + Debug because the Sync bound was overly constraining. The trait should only require Send + Debug as designed.

Learnt from: kthui
PR: #1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.

Learnt from: ryanolson
PR: #1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.

🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1285
File: lib/llm/src/kv_router/scheduler.rs:260-266
Timestamp: 2025-05-30T06:34:12.785Z
Learning: In the KV router scheduler code, PeaBrane prefers fail-fast behavior over silent failure handling. When accessing worker metrics data that could be out-of-bounds (like dp_rank indexing), explicit panics are preferred over graceful degradation with continue statements to ensure data integrity issues are caught early.
lib/runtime/src/storage/key_value_store/etcd.rs (6)

Learnt from: jthomson04
PR: #1429
File: lib/runtime/src/utils/leader_worker_barrier.rs:69-72
Timestamp: 2025-06-08T03:12:03.985Z
Learning: In the leader-worker barrier implementation in lib/runtime/src/utils/leader_worker_barrier.rs, the wait_for_key_count function correctly uses exact equality (==) instead of greater-than-or-equal (>=) because worker IDs must be unique (enforced by etcd create-only operations), ensuring exactly the expected number of workers can register.

Learnt from: grahamking
PR: #1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.

Learnt from: alec-flowers
PR: #1181
File: lib/llm/src/kv_router/publisher.rs:379-425
Timestamp: 2025-05-29T00:02:35.018Z
Learning: In lib/llm/src/kv_router/publisher.rs, the functions create_stored_blocks and create_stored_block_from_parts are correctly implemented and not problematic duplications of existing functionality elsewhere in the codebase.

Learnt from: ryanolson
PR: #1919
File: lib/runtime/src/engine.rs:168-168
Timestamp: 2025-07-14T21:25:56.930Z
Learning: The AsyncEngineContextProvider trait in lib/runtime/src/engine.rs was intentionally changed from Send + Sync + Debug to Send + Debug because the Sync bound was overly constraining. The trait should only require Send + Debug as designed.

Learnt from: kthui
PR: #1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.

Learnt from: ryanolson
PR: #1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Build and Test - vllm
  • GitHub Check: pre-merge-rust (lib/bindings/python)
  • GitHub Check: pre-merge-rust (.)
  • GitHub Check: pre-merge-rust (lib/runtime/examples)
🔇 Additional comments (3)
lib/runtime/src/storage/key_value_store/etcd.rs (3)

23-23: LGTM!

The transaction-related imports are necessary for the atomic implementation.


258-259: Why is the feature flag commented out?

The commented #[cfg(feature = "integration")] suggests this test requires external dependencies (etcd). Should this be an integration test gated behind the feature flag?


368-387: Comprehensive test assertions validate the atomic behavior.

The test correctly verifies that:

  1. Exactly one worker creates the key
  2. All other workers observe the key exists
  3. All operations complete successfully
  4. The key is properly stored in etcd

This provides strong evidence that the race condition has been eliminated.

@PeaBrane PeaBrane self-assigned this Jul 22, 2025
@PeaBrane PeaBrane changed the title feat: use atomic transactions when creating EtcdBucket feat: use atomic transactions when creating etcd kv Jul 22, 2025
Copy link
Contributor

@grahamking grahamking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

@PeaBrane PeaBrane merged commit 7882693 into main Jul 22, 2025
12 of 13 checks passed
@PeaBrane PeaBrane deleted the rupei/etcd-bucket-atomic branch July 22, 2025 15:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG]: occasionally race condition during vllm v1 engine inits

3 participants