Skip to content

Commit 7df3c37

Browse files
committed
fix(guard): fix ws close codes
1 parent 7a59909 commit 7df3c37

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

packages/core/guard/core/src/proxy_service.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub const X_FORWARDED_FOR: HeaderName = HeaderName::from_static("x-forwarded-for
3737
pub const X_RIVET_ERROR: HeaderName = HeaderName::from_static("x-rivet-error");
3838
const ROUTE_CACHE_TTL: Duration = Duration::from_secs(60 * 10); // 10 minutes
3939
const PROXY_STATE_CACHE_TTL: Duration = Duration::from_secs(60 * 60); // 1 hour
40+
const WEBSOCKET_CLOSE_LINGER: Duration = Duration::from_millis(100); // Keep TCP connection open briefly after WebSocket close
4041

4142
/// Response body type that can handle both streaming and buffered responses
4243
#[derive(Debug)]
@@ -1799,6 +1800,12 @@ impl ProxyService {
17991800
})))
18001801
.await?;
18011802

1803+
// Flush to ensure close frame is sent
1804+
ws_handle.flush().await?;
1805+
1806+
// Keep TCP connection open briefly to allow client to process close
1807+
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;
1808+
18021809
break;
18031810
}
18041811
Err(err) => {
@@ -1811,6 +1818,12 @@ impl ProxyService {
18111818
)))
18121819
.await?;
18131820

1821+
// Flush to ensure close frame is sent
1822+
ws_handle.flush().await?;
1823+
1824+
// Keep TCP connection open briefly to allow client to process close
1825+
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;
1826+
18141827
break;
18151828
} else {
18161829
let backoff = ProxyService::calculate_backoff(
@@ -1841,6 +1854,12 @@ impl ProxyService {
18411854
),
18421855
)))
18431856
.await?;
1857+
1858+
// Flush to ensure close frame is sent
1859+
ws_handle.flush().await?;
1860+
1861+
// Keep TCP connection open briefly to allow client to process close
1862+
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;
18441863
}
18451864
Ok(ResolveRouteOutput::Target(_)) => {
18461865
ws_handle
@@ -1850,6 +1869,13 @@ impl ProxyService {
18501869
),
18511870
)))
18521871
.await?;
1872+
1873+
// Flush to ensure close frame is sent
1874+
ws_handle.flush().await?;
1875+
1876+
// Keep TCP connection open briefly to allow client to process close
1877+
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;
1878+
18531879
break;
18541880
}
18551881
Err(err) => {
@@ -1858,6 +1884,13 @@ impl ProxyService {
18581884
err_to_close_frame(err),
18591885
)))
18601886
.await?;
1887+
1888+
// Flush to ensure close frame is sent
1889+
ws_handle.flush().await?;
1890+
1891+
// Keep TCP connection open briefly to allow client to process close
1892+
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;
1893+
18611894
break;
18621895
}
18631896
}

packages/core/guard/core/src/websocket_handle.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,19 @@ impl WebSocketHandleInner {
8383
}
8484
}
8585

86+
pub async fn flush(&self) -> Result<()> {
87+
let mut state = self.state.lock().await;
88+
match &mut *state {
89+
WebSocketState::Unaccepted { .. } | WebSocketState::Accepting => {
90+
bail!("websocket has not been accepted");
91+
}
92+
WebSocketState::Split { ws_tx } => {
93+
ws_tx.flush().await?;
94+
Ok(())
95+
}
96+
}
97+
}
98+
8699
async fn accept_inner(state: &mut WebSocketState) -> Result<WebSocketReceiver> {
87100
if !matches!(*state, WebSocketState::Unaccepted { .. }) {
88101
bail!("websocket already accepted")

0 commit comments

Comments
 (0)