diff --git a/src/client.rs b/src/client.rs index 5dd0b0f87..4147e8a46 100644 --- a/src/client.rs +++ b/src/client.rs @@ -326,6 +326,10 @@ pub struct Builder { /// Maximum number of locally reset streams to keep at a time. reset_stream_max: usize, + /// Maximum number of remotely reset streams to allow in the pending + /// accept queue. + pending_accept_reset_stream_max: usize, + /// Initial `Settings` frame to send as part of the handshake. settings: Settings, @@ -634,6 +638,7 @@ impl Builder { max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, + pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX, initial_target_connection_window_size: None, initial_max_send_streams: usize::MAX, settings: Default::default(), @@ -966,6 +971,49 @@ impl Builder { self } + /// Sets the maximum number of pending-accept remotely-reset streams. + /// + /// Streams that have been received by the peer, but not accepted by the + /// user, can also receive a RST_STREAM. This is a legitimate pattern: one + /// could send a request and then shortly after, realize it is not needed, + /// sending a CANCEL. + /// + /// However, since those streams are now "closed", they don't count towards + /// the max concurrent streams. So, they will sit in the accept queue, + /// using memory. + /// + /// When the number of remotely-reset streams sitting in the pending-accept + /// queue reaches this maximum value, a connection error with the code of + /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the + /// `Future`. + /// + /// The default value is currently 20, but could change. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::client::*; + /// # use bytes::Bytes; + /// # + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> + /// # { + /// // `client_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let client_fut = Builder::new() + /// .max_pending_accept_reset_streams(100) + /// .handshake(my_io); + /// # client_fut.await + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self { + self.pending_accept_reset_stream_max = max; + self + } + /// Sets the maximum send buffer size per stream. /// /// Once a stream has buffered up to (or over) the maximum, the stream's @@ -1209,6 +1257,7 @@ where max_send_buffer_size: builder.max_send_buffer_size, reset_stream_duration: builder.reset_stream_duration, reset_stream_max: builder.reset_stream_max, + remote_reset_stream_max: builder.pending_accept_reset_stream_max, settings: builder.settings.clone(), }, ); diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 59883cf33..619973df8 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -80,6 +80,7 @@ pub(crate) struct Config { pub max_send_buffer_size: usize, pub reset_stream_duration: Duration, pub reset_stream_max: usize, + pub remote_reset_stream_max: usize, pub settings: frame::Settings, } @@ -118,6 +119,7 @@ where .unwrap_or(false), local_reset_duration: config.reset_stream_duration, local_reset_max: config.reset_stream_max, + remote_reset_max: config.remote_reset_stream_max, remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, remote_max_initiated: config .settings @@ -172,6 +174,11 @@ where self.inner.streams.max_recv_streams() } + #[cfg(feature = "unstable")] + pub fn num_wired_streams(&self) -> usize { + self.inner.streams.num_wired_streams() + } + /// Returns `Ready` when the connection is ready to receive a frame. /// /// Returns `Error` as this may raise errors that are caused by delayed diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 5ec7bf992..d71ee9c42 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -31,6 +31,7 @@ pub type WindowSize = u32; // Constants pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; +pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20; pub const DEFAULT_RESET_STREAM_MAX: usize = 10; pub const DEFAULT_RESET_STREAM_SECS: u64 = 30; pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400; diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index 70dfc7851..6a5aa9ccd 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -21,10 +21,16 @@ pub(super) struct Counts { num_recv_streams: usize, /// Maximum number of pending locally reset streams - max_reset_streams: usize, + max_local_reset_streams: usize, /// Current number of pending locally reset streams - num_reset_streams: usize, + num_local_reset_streams: usize, + + /// Max number of "pending accept" streams that were remotely reset + max_remote_reset_streams: usize, + + /// Current number of "pending accept" streams that were remotely reset + num_remote_reset_streams: usize, } impl Counts { @@ -36,8 +42,10 @@ impl Counts { num_send_streams: 0, max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), num_recv_streams: 0, - max_reset_streams: config.local_reset_max, - num_reset_streams: 0, + max_local_reset_streams: config.local_reset_max, + num_local_reset_streams: 0, + max_remote_reset_streams: config.remote_reset_max, + num_remote_reset_streams: 0, } } @@ -90,7 +98,7 @@ impl Counts { /// Returns true if the number of pending reset streams can be incremented. pub fn can_inc_num_reset_streams(&self) -> bool { - self.max_reset_streams > self.num_reset_streams + self.max_local_reset_streams > self.num_local_reset_streams } /// Increments the number of pending reset streams. @@ -101,7 +109,34 @@ impl Counts { pub fn inc_num_reset_streams(&mut self) { assert!(self.can_inc_num_reset_streams()); - self.num_reset_streams += 1; + self.num_local_reset_streams += 1; + } + + pub(crate) fn max_remote_reset_streams(&self) -> usize { + self.max_remote_reset_streams + } + + /// Returns true if the number of pending REMOTE reset streams can be + /// incremented. + pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool { + self.max_remote_reset_streams > self.num_remote_reset_streams + } + + /// Increments the number of pending REMOTE reset streams. + /// + /// # Panics + /// + /// Panics on failure as this should have been validated before hand. + pub(crate) fn inc_num_remote_reset_streams(&mut self) { + assert!(self.can_inc_num_remote_reset_streams()); + + self.num_remote_reset_streams += 1; + } + + pub(crate) fn dec_num_remote_reset_streams(&mut self) { + assert!(self.num_remote_reset_streams > 0); + + self.num_remote_reset_streams -= 1; } pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { @@ -194,8 +229,8 @@ impl Counts { } fn dec_num_reset_streams(&mut self) { - assert!(self.num_reset_streams > 0); - self.num_reset_streams -= 1; + assert!(self.num_local_reset_streams > 0); + self.num_local_reset_streams -= 1; } } diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 0ff8131c1..fbe32c7b0 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -60,6 +60,10 @@ pub struct Config { /// Maximum number of locally reset streams to keep at a time pub local_reset_max: usize, + /// Maximum number of remotely reset "pending accept" streams to keep at a + /// time. Going over this number results in a connection error. + pub remote_reset_max: usize, + /// Initial window size of remote initiated streams pub remote_init_window_sz: WindowSize, diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 497efc9bc..0fe2bdd57 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -741,12 +741,39 @@ impl Recv { } /// Handle remote sending an explicit RST_STREAM. - pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { + pub fn recv_reset( + &mut self, + frame: frame::Reset, + stream: &mut Stream, + counts: &mut Counts, + ) -> Result<(), Error> { + // Reseting a stream that the user hasn't accepted is possible, + // but should be done with care. These streams will continue + // to take up memory in the accept queue, but will no longer be + // counted as "concurrent" streams. + // + // So, we have a separate limit for these. + // + // See https://github.com/hyperium/hyper/issues/2877 + if stream.is_pending_accept { + if counts.can_inc_num_remote_reset_streams() { + counts.inc_num_remote_reset_streams(); + } else { + tracing::warn!( + "recv_reset; remotely-reset pending-accept streams reached limit ({:?})", + counts.max_remote_reset_streams(), + ); + return Err(Error::library_go_away(Reason::ENHANCE_YOUR_CALM)); + } + } + // Notify the stream stream.state.recv_reset(frame, stream.is_pending_send); stream.notify_send(); stream.notify_recv(); + + Ok(()) } /// Handle a connection-level error @@ -1033,7 +1060,6 @@ impl Recv { cx: &Context, stream: &mut Stream, ) -> Poll>> { - // TODO: Return error when the stream is reset match stream.pending_recv.pop_front(&mut self.buffer) { Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))), Some(event) => { diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index db37831f8..b9612addc 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -360,6 +360,13 @@ impl State { } } + pub fn is_remote_reset(&self) -> bool { + match self.inner { + Closed(Cause::Error(ref e)) => e.is_local(), + _ => false, + } + } + /// Returns true if the stream is already reset. pub fn is_reset(&self) -> bool { match self.inner { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index e1a2e3fe7..dbaebfa7a 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -140,6 +140,12 @@ where // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding // the lock, so it can't. me.refs += 1; + + // Pending-accepted remotely-reset streams are counted. + if stream.state.is_remote_reset() { + me.counts.dec_num_remote_reset_streams(); + } + StreamRef { opaque: OpaqueStreamRef::new(self.inner.clone(), stream), send_buffer: self.send_buffer.clone(), @@ -601,7 +607,7 @@ impl Inner { let actions = &mut self.actions; self.counts.transition(stream, |counts, stream| { - actions.recv.recv_reset(frame, stream); + actions.recv.recv_reset(frame, stream, counts)?; actions.send.handle_error(send_buffer, stream, counts); assert!(stream.state.is_closed()); Ok(()) diff --git a/src/server.rs b/src/server.rs index bb3d3cf86..f1f4cf470 100644 --- a/src/server.rs +++ b/src/server.rs @@ -240,6 +240,10 @@ pub struct Builder { /// Maximum number of locally reset streams to keep at a time. reset_stream_max: usize, + /// Maximum number of remotely reset streams to allow in the pending + /// accept queue. + pending_accept_reset_stream_max: usize, + /// Initial `Settings` frame to send as part of the handshake. settings: Settings, @@ -576,6 +580,13 @@ where pub fn max_concurrent_recv_streams(&self) -> usize { self.connection.max_recv_streams() } + + // Could disappear at anytime. + #[doc(hidden)] + #[cfg(feature = "unstable")] + pub fn num_wired_streams(&self) -> usize { + self.connection.num_wired_streams() + } } #[cfg(feature = "stream")] @@ -635,6 +646,7 @@ impl Builder { Builder { reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, + pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX, settings: Settings::default(), initial_target_connection_window_size: None, max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, @@ -875,6 +887,49 @@ impl Builder { self } + /// Sets the maximum number of pending-accept remotely-reset streams. + /// + /// Streams that have been received by the peer, but not accepted by the + /// user, can also receive a RST_STREAM. This is a legitimate pattern: one + /// could send a request and then shortly after, realize it is not needed, + /// sending a CANCEL. + /// + /// However, since those streams are now "closed", they don't count towards + /// the max concurrent streams. So, they will sit in the accept queue, + /// using memory. + /// + /// When the number of remotely-reset streams sitting in the pending-accept + /// queue reaches this maximum value, a connection error with the code of + /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the + /// `Future`. + /// + /// The default value is currently 20, but could change. + /// + /// # Examples + /// + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc(my_io: T) + /// # -> Handshake + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .max_pending_accept_reset_streams(100) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self { + self.pending_accept_reset_stream_max = max; + self + } + /// Sets the maximum send buffer size per stream. /// /// Once a stream has buffered up to (or over) the maximum, the stream's @@ -1305,6 +1360,7 @@ where max_send_buffer_size: self.builder.max_send_buffer_size, reset_stream_duration: self.builder.reset_stream_duration, reset_stream_max: self.builder.reset_stream_max, + remote_reset_stream_max: self.builder.pending_accept_reset_stream_max, settings: self.builder.settings.clone(), }, ); diff --git a/tests/h2-support/src/frames.rs b/tests/h2-support/src/frames.rs index 862e0c629..bc4e2e708 100644 --- a/tests/h2-support/src/frames.rs +++ b/tests/h2-support/src/frames.rs @@ -297,6 +297,10 @@ impl Mock { self.reason(frame::Reason::FRAME_SIZE_ERROR) } + pub fn calm(self) -> Self { + self.reason(frame::Reason::ENHANCE_YOUR_CALM) + } + pub fn no_error(self) -> Self { self.reason(frame::Reason::NO_ERROR) } diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index 9f348d5f2..5d86f7b47 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -use futures::future::{join, join3, lazy, try_join}; +use futures::future::{join, join3, lazy, poll_fn, try_join}; use futures::{FutureExt, StreamExt, TryStreamExt}; use h2_support::prelude::*; use h2_support::util::yield_once; @@ -194,6 +194,47 @@ async fn closed_streams_are_released() { join(srv, h2).await; } +#[tokio::test] +async fn reset_streams_dont_grow_memory_continuously() { + //h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + const N: u32 = 50; + const MAX: usize = 20; + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + for n in (1..(N * 2)).step_by(2) { + client + .send_frame(frames::headers(n).request("GET", "https://a.b/").eos()) + .await; + client.send_frame(frames::reset(n).protocol_error()).await; + } + tokio::time::timeout( + std::time::Duration::from_secs(1), + client.recv_frame(frames::go_away((MAX * 2 + 1) as u32).calm()), + ) + .await + .expect("client goaway"); + }; + + let srv = async move { + let mut srv = server::Builder::new() + .max_pending_accept_reset_streams(MAX) + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); + + poll_fn(|cx| srv.poll_closed(cx)) + .await + .expect_err("server should error"); + // specifically, not 50; + assert_eq!(21, srv.num_wired_streams()); + }; + join(srv, client).await; +} + #[tokio::test] async fn errors_if_recv_frame_exceeds_max_frame_size() { h2_support::trace_init!();