Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Oct 25, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Oct 27, 2025 11:59pm
rivet-inspector Ignored Ignored Preview Oct 27, 2025 11:59pm
rivet-site Ignored Ignored Preview Oct 27, 2025 11:59pm

Copy link
Contributor Author

MasterPtato commented Oct 25, 2025


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Oct 25, 2025

Pull Request Review: Improve Reconnection with Sleeping and Tunnel

Summary

This PR improves WebSocket reconnection reliability, particularly for sleeping actors and tunneled connections. The changes introduce message persistence, better reconnection handling, and synchronization improvements.

Positive Changes ✅

Architecture Improvements

  1. Concurrent HashMap (scc): Switching from Mutex<HashMap> to scc::HashMap in shared_state.rs is excellent for reducing lock contention in concurrent scenarios.

  2. WebSocket Handle Simplification: The refactoring of WebSocketHandle to eagerly accept connections (lines 23-31 in websocket_handle.rs) removes complex state machine logic and race conditions. Much cleaner!

  3. Pending Message Persistence: Storing pending messages by request ID (rather than by message ID) and replaying them on reconnection (send_reclaimed_messages) is a solid approach for handling actor sleep/wake cycles.

  4. Unique Request IDs: Using stable unique_request_id across retries (line 1794 in proxy_service.rs) ensures messages can be correlated correctly.

  5. Event Buffering in TypeScript: The WebSocketTunnelAdapter buffering events until listeners are attached is good defensive programming.

Critical Issues 🔴

1. Incorrect Into Implementation (shared_state.rs:34-37)

impl Into<protocol::ToClientTunnelMessage> for PendingMessage {
    fn into(self) -> protocol::ToClientTunnelMessage {
        self.payload
    }
}

Problem: This trait is never used in the codebase, and if it were, it should be From instead of Into (Rust convention is to implement From and get Into for free).

Fix: Remove this implementation entirely, or if needed, use:

impl From<PendingMessage> for protocol::ToClientTunnelMessage {
    fn from(msg: PendingMessage) -> Self {
        msg.payload
    }
}

2. Race Condition in Ack Handling (shared_state.rs:196-206)

if let Some(mut pending_msgs) = self.pending_messages.get_async(&msg.request_id).await {
    pending_msgs.retain(|m| m.payload.message_id != msg.message_id);
}

Problem: The retain modifies a temporary mutable reference but doesn't commit changes back. With scc::HashMap, you need to either:

  • Use update_async to atomically modify, or
  • The get_async returns a guard that commits on drop

Verification needed: Confirm that scc::HashMap::get_async() returns a guard that commits mutations on drop. If not, acks won't actually remove pending messages, causing memory leaks and incorrect replay behavior.

3. Hardcoded Backoff (proxy_service.rs:1853-1857)

let backoff = ProxyService::calculate_backoff(attempts, initial_interval);
let backoff = Duration::from_millis(100);  // ⚠️ Immediately overwrites!

Problem: Line 1857 completely overwrites the calculated backoff, making it always 100ms regardless of attempt count.

Fix: Either use the calculated backoff OR use a fixed 100ms, but remove one of these lines.

4. ACK Timeout Mismatch (Critical!)

  • Runner (TypeScript): MESSAGE_ACK_TIMEOUT = 5000 (5 seconds) - tunnel.ts:10
  • Gateway (Rust): MESSAGE_ACK_TIMEOUT = 30000 (30 seconds) - shared_state.rs:17

Problem: Runner will timeout and reject requests after 5 seconds, but gateway will keep messages pending for 30 seconds. This mismatch can cause:

  • Premature request failures on the runner side
  • Unnecessary pending message accumulation on gateway side
  • Inconsistent behavior under load

Fix: Align these values. Consider making 30 seconds standard for both, or making it configurable.

5. Debug Logging Left in Production (shared_state.rs:152)

tracing::info!(l=?pending_msgs_by_req_id.len(), message_id=?Uuid::from_bytes(payload.message_id), request_id=?Uuid::from_bytes(payload.request_id), "new msg -----------");

Problem: Multiple debug log statements with "-------" separators (lines 152, 196) are using tracing::info! instead of tracing::debug!.

Fix: Change to tracing::debug! and remove the separator dashes to follow CLAUDE.md logging conventions.

Moderate Issues 🟡

6. Unnecessary Async Blocks Removed

The removal of redundant async blocks in epoxy/src/ops/kv/*.rs is good, but verify this doesn't change the instrumentation span behavior.

7. Missing Error Context

In proxy_service.rs:1799, the context message is good:

.context("failed initiating websocket handle")?;

However, similar error paths at lines 2015 lack context. Be consistent.

8. Potential Message Reordering

When send_reclaimed_messages replays pending messages (shared_state.rs:268-298), there's no guarantee they'll be delivered in order relative to new messages being sent concurrently. Consider if this is acceptable for your use case.

9. MAX_PENDING_MSGS_PER_REQ Limit (shared_state.rs:154-161)

Setting limit to 1024 is reasonable, but when hit, you remove ALL pending messages for that request:

self.pending_messages.remove_async(&request_id).await;

This might be too aggressive - consider closing the connection more gracefully.

Minor Issues / Suggestions 🟢

10. WebSocket Receiver Mutex Contention (websocket_handle.rs:43-45)

pub fn recv(&self) -> Arc<Mutex<WebSocketReceiver>> {
    self.ws_rx.clone()
}

Returning Arc<Mutex<T>> means callers need to lock. Consider if this API could be improved by making recv take &mut self or by using a channel.

11. Logging Message Improvements

Per CLAUDE.md conventions:

  • Line 152: "new msg" → "sending new message"
  • Line 196: "ack" → "received ack"
  • Line 306: "tunnel sub closed" → "tunnel subscription closed"

12. Test Coverage

The test in actor_sleep.ts is good but only tests 10 iterations. Consider:

  • Testing boundary conditions (e.g., sleeping exactly at MESSAGE_ACK_TIMEOUT)
  • Testing concurrent WebSocket connections
  • Testing what happens when the pending message limit is reached

13. TypeScript Logging

Multiple debug logs in websocket-tunnel-adapter.ts and tunnel.ts use console-style messages. Consider if these should use structured logging like the Rust side.

Security Considerations 🔒

  1. DoS via Pending Messages: With 1024 max pending messages per request, a malicious client could potentially accumulate many slow requests. The limit helps, but consider rate limiting request creation too.

  2. WebSocket Protocol Validation: Ensure the rivet_actor.{actor_id} protocol extraction is properly validated against injection attacks.

Performance Considerations ⚡

  1. SCC HashMap: Good choice for high concurrency. Verify it's being used correctly with the async API.

  2. Message Cloning: In send_reclaimed_messages (line 288), messages are cloned via payload.clone(). For large messages, this could be expensive. Consider if you need a reference-counted approach.

Testing Recommendations 🧪

  1. Add integration test for message replay after actor wake
  2. Test ACK timeout scenarios on both runner and gateway
  3. Test the MAX_PENDING_MSGS_PER_REQ limit
  4. Test concurrent WebSocket reconnections
  5. Test behavior when messages arrive out of order

Final Recommendations

Before merging:

  1. ✅ Fix the ACK timeout mismatch ([SVC-2483] Remove hardcoded uses of rivet.gg #4)
  2. ✅ Fix the hardcoded backoff override ([SVC-2504] Fix 5 GB upload limit for local development from Cloudflare #3)
  3. ✅ Verify the scc::HashMap ack removal actually works ([SVC-2479] Send cluster events to PostHog #2)
  4. ✅ Clean up debug logging ([SVC-2358] Enable Redis memory overcommit #5)

Nice to have:

Overall Assessment

This is a well-thought-out improvement to WebSocket reconnection reliability. The core architecture changes (concurrent HashMap, message persistence, stable request IDs) are sound. The main concerns are around configuration mismatches and potential race conditions that need verification before merge.

Recommendation: Request changes for critical issues #2, #3, and #4, then approve after verification.

@MasterPtato MasterPtato force-pushed the 10-24-fix_improve_reconnection_with_sleeping_and_tunnel branch from 61ae6af to 049fe1f Compare October 27, 2025 23:59
@claude
Copy link

claude bot commented Oct 28, 2025

Code Review: Improve reconnection with sleeping and tunnel

Thanks for this PR! I've reviewed the changes focusing on code quality, best practices, potential bugs, performance, security, and test coverage. Here's my feedback:

Overall Assessment

This PR makes significant improvements to WebSocket reconnection handling and tunnel message reliability. The changes are generally well-architected, but there are several areas that need attention.


Major Issues

1. Data Race and Concurrency Issues in shared_state.rs

Location: engine/packages/pegboard-gateway/src/shared_state.rs:145-163

The code modifies pending_msgs_by_req_id while holding a mutable reference from the scc::HashMap:

let mut pending_msgs_by_req_id = self
    .pending_messages
    .entry_async(request_id)
    .await
    .or_insert_with(Vec::new);
let pending_msgs_by_req_id = pending_msgs_by_req_id.get_mut();

if pending_msgs_by_req_id.len() >= MAX_PENDING_MSGS_PER_REQ {
    self.pending_messages.remove_async(&request_id).await;  // ❌ Potential issue
    // ...
}

Issue: You're calling remove_async() while still holding a mutable reference to the entry. This could lead to undefined behavior or panics. The entry reference should be dropped before calling remove_async().

Recommendation:

if pending_msgs_by_req_id.len() >= MAX_PENDING_MSGS_PER_REQ {
    drop(pending_msgs_by_req_id); // Drop the entry reference first
    self.pending_messages.remove_async(&request_id).await;
    return Err(WebsocketPendingLimitReached { limit: MAX_PENDING_MSGS_PER_REQ }.build());
}

2. Debug Logging in Production Code

Locations: Multiple files with debug statements

The code contains several verbose debug/info logging statements that appear to be debugging artifacts:

  • shared_state.rs:152 - tracing::info!(l=?pending_msgs_by_req_id.len(), message_id=?Uuid::from_bytes(payload.message_id), request_id=?Uuid::from_bytes(payload.request_id), "new msg -----------");
  • shared_state.rs:196 - tracing::info!(message_id=?Uuid::from_bytes(msg.message_id), request_id=?Uuid::from_bytes(msg.request_id), "ack -----------");
  • Multiple "------------ " prefixed logs in TypeScript files

Issue: These look like temporary debugging statements with informal formatting (e.g., "------------ ", "-----------").

Recommendation: Remove or downgrade to tracing::debug! with more professional formatting before merging.

3. ACK Timing Logic Asymmetry

Location: tunnel.ts:286-299 and shared_state.rs:195-206

The WebSocket message ACK is conditionally sent based on whether the message is "unhandled":

let unhandled = await this.#handleWebSocketMessage(
    message.requestId,
    message.messageKind.val,
);

if (!unhandled) {
    this.#sendAck(message.requestId, message.messageId);
}

Issue: The _handleMessage method returns true when the adapter doesn't exist, meaning no ACK is sent. However, this creates a subtle edge case where messages sent during a brief window (after WS close but before cleanup) won't be ACKed, causing unnecessary retries.

Recommendation: Consider ACKing even "unhandled" messages to prevent retry storms, or add explicit handling for this edge case.

4. Missing Error Handling for send_reclaimed_messages

Location: lib.rs:249-252

self.shared_state
    .send_reclaimed_messages(request_id)
    .await?;

Issue: If send_reclaimed_messages fails after the WebSocket handshake succeeds, the connection is aborted. However, the client has already been notified that the connection succeeded. This could leave the client in a bad state.

Recommendation: Add better error recovery - perhaps close the WebSocket with an appropriate error code rather than returning early.


Medium Issues

5. MESSAGE_ACK_TIMEOUT Mismatch

Locations:

  • Rust: shared_state.rs:17 - const MESSAGE_ACK_TIMEOUT: Duration = Duration::from_secs(30);
  • TypeScript: tunnel.ts:10 - const MESSAGE_ACK_TIMEOUT = 5000; // 5 seconds

Issue: The timeout values are significantly different (30s vs 5s), which could lead to inconsistent behavior.

Recommendation: Align these timeouts or document why they differ.

6. WebSocketHandle State Management Simplification

Location: websocket_handle.rs:22-46

The refactor removes the state machine pattern in favor of always accepting the websocket immediately. While simpler, this changes the behavior significantly.

Previous: Lazy acceptance (accept on first use)
Current: Eager acceptance (accept immediately in constructor)

Concern: This means websockets are always accepted before any validation can occur in the caller. Ensure this is the intended behavior and doesn't introduce security issues.

7. Into Implementation Should Use From

Location: shared_state.rs:34-37

impl Into<protocol::ToClientTunnelMessage> for PendingMessage {
    fn into(self) -> protocol::ToClientTunnelMessage {
        self.payload
    }
}

Issue: The Rust API guidelines recommend implementing From instead of Into, as Into is automatically derived from From.

Recommendation:

impl From<PendingMessage> for protocol::ToClientTunnelMessage {
    fn from(msg: PendingMessage) -> Self {
        msg.payload
    }
}

Minor Issues

8. Unnecessary Async Block Removal

Locations: get_local.rs:32-40, get_optimistic.rs:55-74, etc.

The removal of inner async blocks looks correct but ensure these don't change instrumentation behavior in subtle ways.

9. Hardcoded Backoff Value

Location: proxy_service.rs:1857

let backoff = Duration::from_millis(100);

Issue: This appears to override any exponential backoff calculation (referenced but not shown). The comment mentions initial_interval but uses a hardcoded value.

Recommendation: Either use the calculated backoff or document why it's hardcoded.

10. Clone Before Checking Conditions

Location: shared_state.rs:111-124

Multiple clones happen before checking if the request is in flight. Consider checking first to avoid unnecessary work.


Performance Considerations

11. Lock-Free HashMap Usage

Location: shared_state.rs:4

Good choice using scc::HashMap for lock-free concurrent access! However:

Concern: The frequent get_async() followed by mutations could still cause contention under high load. Consider batching operations where possible.

12. Message Retention Strategy

Location: shared_state.rs:154-161

The code limits pending messages to 1024 per request and aborts the connection when exceeded.

Issue: This is a hard limit that could be hit legitimately under slow network conditions or burst traffic.

Recommendation: Consider:

  • Making this configurable
  • Adding metrics to monitor how often this limit is hit
  • Implementing a more graceful degradation strategy

Security Concerns

13. DOS via Pending Message Accumulation

Location: shared_state.rs:154

While the MAX_PENDING_MSGS_PER_REQ limit helps, there's no global limit on total pending messages across all requests.

Issue: An attacker could open many connections and accumulate up to 1024 messages each, potentially exhausting memory.

Recommendation: Add a global pending message limit or at least add monitoring/alerting.


Test Coverage

14. Missing Test Coverage

The PR modifies critical connection handling logic but doesn't include new tests.

Needed Tests:

  • Reconnection scenarios with pending messages
  • Message reclamation after reconnection
  • Pending message limit enforcement
  • ACK timeout handling
  • Concurrent request handling with the new lock-free HashMap

Recommendation: Add integration tests covering the reconnection flow, especially testing the message reclamation logic.


Positive Observations

  1. Good error handling: New error type WebsocketPendingLimitReached follows the project's error convention
  2. Logging improvements: Generally better structured logging with UUIDs
  3. Concurrency upgrade: Moving from Mutex<HashMap> to scc::HashMap is a smart performance improvement
  4. Unique request ID: Using a stable unique_request_id for WebSocket reconnection is a solid approach
  5. Event buffering: The TypeScript WebSocketTunnelAdapter event buffering (lines 22-26) elegantly handles race conditions

Recommendations Summary

Before merging:

  1. Fix the data race in shared_state.rs:155 (remove while holding entry reference)
  2. Remove or clean up debug logging statements
  3. Add tests for reconnection scenarios
  4. Align MESSAGE_ACK_TIMEOUT values or document difference
  5. Consider adding global pending message limits

Nice to have:
6. Fix the IntoFrom implementation
7. Review ACK timing for unhandled messages
8. Add error recovery for send_reclaimed_messages
9. Make message limits configurable


Overall, this is a solid improvement to the reconnection logic! The main concerns are around the data race and production readiness of the debug logging. Once those are addressed, this should be good to merge.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Oct 28, 2025

More templates

@rivetkit/actor

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/actor@3261

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@3261

@rivetkit/core

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/core@3261

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@3261

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@3261

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@3261

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@3261

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@3261

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@3261

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@3261

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@3261

commit: 049fe1f

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant