diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 14e4a3149..437e6e088 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -31,7 +31,6 @@ jobs: strategy: matrix: rust: - - nightly - beta - stable steps: @@ -61,12 +60,12 @@ jobs: run: ./ci/h2spec.sh if: matrix.rust == 'stable' - clippy_check: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - name: Run Clippy - run: cargo clippy --all-targets --all-features + #clippy_check: + # runs-on: ubuntu-latest + # steps: + # - uses: actions/checkout@v4 + # - name: Run Clippy + # run: cargo clippy --all-targets --all-features msrv: name: Check MSRV diff --git a/CHANGELOG.md b/CHANGELOG.md index 00d69725a..f5dc74672 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,19 @@ +# 0.3.26 (April 3, 2024) + +* Limit number of CONTINUATION frames for misbehaving connections. + +# 0.3.25 (March 15, 2024) + +* Improve performance decoding many headers. + +# 0.3.24 (January 17, 2024) + +* Limit error resets for misbehaving connections. + +# 0.3.23 (January 10, 2024) + +* Backport fix from 0.4.1 for stream capacity assignment. + # 0.3.22 (November 15, 2023) * Add `header_table_size(usize)` option to client and server builders. diff --git a/Cargo.toml b/Cargo.toml index c413e56ce..a84beebc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ name = "h2" # - html_root_url. # - Update CHANGELOG.md. # - Create git tag -version = "0.3.22" +version = "0.3.26" license = "MIT" authors = [ "Carl Lerche ", diff --git a/src/client.rs b/src/client.rs index 35cfc1414..29dd0ef2c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -336,6 +336,12 @@ pub struct Builder { /// The stream ID of the first (lowest) stream. Subsequent streams will use /// monotonically increasing stream IDs. stream_id: StreamId, + + /// Maximum number of locally reset streams due to protocol error across + /// the lifetime of the connection. + /// + /// When this gets exceeded, we issue GOAWAYs. + local_max_error_reset_streams: Option, } #[derive(Debug)] @@ -645,6 +651,7 @@ impl Builder { initial_max_send_streams: usize::MAX, settings: Default::default(), stream_id: 1.into(), + local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX), } } @@ -973,6 +980,23 @@ impl Builder { self } + /// Sets the maximum number of local resets due to protocol errors made by the remote end. + /// + /// Invalid frames and many other protocol errors will lead to resets being generated for those streams. + /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers. + /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate. + /// + /// When the number of local resets exceeds this threshold, the client will close the connection. + /// + /// If you really want to disable this, supply [`Option::None`] here. + /// Disabling this is not recommended and may expose you to DOS attacks. + /// + /// The default value is currently 1024, but could change. + pub fn max_local_error_reset_streams(&mut self, max: Option) -> &mut Self { + self.local_max_error_reset_streams = max; + self + } + /// Sets the maximum number of pending-accept remotely-reset streams. /// /// Streams that have been received by the peer, but not accepted by the @@ -1293,6 +1317,7 @@ where reset_stream_duration: builder.reset_stream_duration, reset_stream_max: builder.reset_stream_max, remote_reset_stream_max: builder.pending_accept_reset_stream_max, + local_error_reset_streams_max: builder.local_max_error_reset_streams, settings: builder.settings.clone(), }, ); @@ -1606,9 +1631,11 @@ impl proto::Peer for Peer { proto::DynPeer::Client } + /* fn is_server() -> bool { false } + */ fn convert_poll_message( pseudo: Pseudo, diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index 3b0030d93..9270a8635 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -30,6 +30,8 @@ pub struct FramedRead { max_header_list_size: usize, + max_continuation_frames: usize, + partial: Option, } @@ -41,6 +43,8 @@ struct Partial { /// Partial header payload buf: BytesMut, + + continuation_frames_count: usize, } #[derive(Debug)] @@ -51,10 +55,14 @@ enum Continuable { impl FramedRead { pub fn new(inner: InnerFramedRead) -> FramedRead { + let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE; + let max_continuation_frames = + calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length()); FramedRead { inner, hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE), - max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE, + max_header_list_size, + max_continuation_frames, partial: None, } } @@ -68,7 +76,6 @@ impl FramedRead { } /// Returns the current max frame size setting - #[cfg(feature = "unstable")] #[inline] pub fn max_frame_size(&self) -> usize { self.inner.decoder().max_frame_length() @@ -80,13 +87,17 @@ impl FramedRead { #[inline] pub fn set_max_frame_size(&mut self, val: usize) { assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize); - self.inner.decoder_mut().set_max_frame_length(val) + self.inner.decoder_mut().set_max_frame_length(val); + // Update max CONTINUATION frames too, since its based on this + self.max_continuation_frames = calc_max_continuation_frames(self.max_header_list_size, val); } /// Update the max header list size setting. #[inline] pub fn set_max_header_list_size(&mut self, val: usize) { self.max_header_list_size = val; + // Update max CONTINUATION frames too, since its based on this + self.max_continuation_frames = calc_max_continuation_frames(val, self.max_frame_size()); } /// Update the header table size setting. @@ -96,12 +107,22 @@ impl FramedRead { } } +fn calc_max_continuation_frames(header_max: usize, frame_max: usize) -> usize { + // At least this many frames needed to use max header list size + let min_frames_for_list = (header_max / frame_max).max(1); + // Some padding for imperfectly packed frames + // 25% without floats + let padding = min_frames_for_list >> 2; + min_frames_for_list.saturating_add(padding).max(5) +} + /// Decodes a frame. /// /// This method is intentionally de-generified and outlined because it is very large. fn decode_frame( hpack: &mut hpack::Decoder, max_header_list_size: usize, + max_continuation_frames: usize, partial_inout: &mut Option, mut bytes: BytesMut, ) -> Result, Error> { @@ -169,6 +190,7 @@ fn decode_frame( *partial_inout = Some(Partial { frame: Continuable::$frame(frame), buf: payload, + continuation_frames_count: 0, }); return Ok(None); @@ -273,6 +295,22 @@ fn decode_frame( return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } + // Check for CONTINUATION flood + if is_end_headers { + partial.continuation_frames_count = 0; + } else { + let cnt = partial.continuation_frames_count + 1; + if cnt > max_continuation_frames { + tracing::debug!("too_many_continuations, max = {}", max_continuation_frames); + return Err(Error::library_go_away_data( + Reason::ENHANCE_YOUR_CALM, + "too_many_continuations", + )); + } else { + partial.continuation_frames_count = cnt; + } + } + // Extend the buf if partial.buf.is_empty() { partial.buf = bytes.split_off(frame::HEADER_LEN); @@ -354,9 +392,16 @@ where ref mut hpack, max_header_list_size, ref mut partial, + max_continuation_frames, .. } = *self; - if let Some(frame) = decode_frame(hpack, max_header_list_size, partial, bytes)? { + if let Some(frame) = decode_frame( + hpack, + max_header_list_size, + max_continuation_frames, + partial, + bytes, + )? { tracing::debug!(?frame, "received"); return Poll::Ready(Some(Ok(frame))); } diff --git a/src/error.rs b/src/error.rs index eb2b2acbc..96a471bcb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,6 +25,7 @@ pub struct Error { #[derive(Debug)] enum Kind { /// A RST_STREAM frame was received or sent. + #[allow(dead_code)] Reset(StreamId, Reason, Initiator), /// A GO_AWAY frame was received or sent. diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 9d5c8cefe..fb8d6b146 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -12,6 +12,7 @@ use std::fmt; use std::io::Cursor; type EncodeBuf<'a> = bytes::buf::Limit<&'a mut BytesMut>; + /// Header frame /// /// This could be either a request or a response. @@ -87,6 +88,9 @@ struct HeaderBlock { /// The decoded header fields fields: HeaderMap, + /// Precomputed size of all of our header fields, for perf reasons + field_size: usize, + /// Set to true if decoding went over the max header list size. is_over_size: bool, @@ -115,6 +119,7 @@ impl Headers { stream_id, stream_dep: None, header_block: HeaderBlock { + field_size: calculate_headermap_size(&fields), fields, is_over_size: false, pseudo, @@ -131,6 +136,7 @@ impl Headers { stream_id, stream_dep: None, header_block: HeaderBlock { + field_size: calculate_headermap_size(&fields), fields, is_over_size: false, pseudo: Pseudo::default(), @@ -196,6 +202,7 @@ impl Headers { stream_dep, header_block: HeaderBlock { fields: HeaderMap::new(), + field_size: 0, is_over_size: false, pseudo: Pseudo::default(), }, @@ -350,6 +357,7 @@ impl PushPromise { PushPromise { flags: PushPromiseFlag::default(), header_block: HeaderBlock { + field_size: calculate_headermap_size(&fields), fields, is_over_size: false, pseudo, @@ -441,6 +449,7 @@ impl PushPromise { flags, header_block: HeaderBlock { fields: HeaderMap::new(), + field_size: 0, is_over_size: false, pseudo: Pseudo::default(), }, @@ -892,6 +901,8 @@ impl HeaderBlock { headers_size += decoded_header_size(name.as_str().len(), value.len()); if headers_size < max_header_list_size { + self.field_size += + decoded_header_size(name.as_str().len(), value.len()); self.fields.append(name, value); } else if !self.is_over_size { tracing::trace!("load_hpack; header list size over max"); @@ -958,14 +969,16 @@ impl HeaderBlock { + pseudo_size!(status) + pseudo_size!(authority) + pseudo_size!(path) - + self - .fields - .iter() - .map(|(name, value)| decoded_header_size(name.as_str().len(), value.len())) - .sum::() + + self.field_size } } +fn calculate_headermap_size(map: &HeaderMap) -> usize { + map.iter() + .map(|(name, value)| decoded_header_size(name.as_str().len(), value.len())) + .sum::() +} + fn decoded_header_size(name: usize, value: usize) -> usize { name + value + 32 } @@ -974,8 +987,6 @@ fn decoded_header_size(name: usize, value: usize) -> usize { mod test { use std::iter::FromIterator; - use http::HeaderValue; - use super::*; use crate::frame; use crate::hpack::{huffman, Encoder}; diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index 960cbb143..e48976c36 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -829,7 +829,6 @@ pub fn get_static(idx: usize) -> Header { #[cfg(test)] mod test { use super::*; - use crate::hpack::Header; #[test] fn test_peek_u8() { diff --git a/src/hpack/encoder.rs b/src/hpack/encoder.rs index d121a2aaf..bd49056f6 100644 --- a/src/hpack/encoder.rs +++ b/src/hpack/encoder.rs @@ -299,7 +299,6 @@ fn position(buf: &BytesMut) -> usize { #[cfg(test)] mod test { use super::*; - use crate::hpack::Header; use http::*; #[test] diff --git a/src/lib.rs b/src/lib.rs index a1fde6eb4..91fa322f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,6 @@ //! [`server::handshake`]: server/fn.handshake.html //! [`client::handshake`]: client/fn.handshake.html -#![doc(html_root_url = "https://docs.rs/h2/0.3.22")] #![deny( missing_debug_implementations, missing_docs, diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 637fac358..1fef38408 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,18 +1,18 @@ use crate::codec::UserError; use crate::frame::{Reason, StreamId}; -use crate::{client, frame, server}; +use crate::{client, server}; use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE; use crate::proto::*; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use futures_core::Stream; use std::io; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::AsyncRead; /// An H2 connection #[derive(Debug)] @@ -81,6 +81,7 @@ pub(crate) struct Config { pub reset_stream_duration: Duration, pub reset_stream_max: usize, pub remote_reset_stream_max: usize, + pub local_error_reset_streams_max: Option, pub settings: frame::Settings, } @@ -125,6 +126,7 @@ where .settings .max_concurrent_streams() .map(|max| max as usize), + local_max_error_reset_streams: config.local_error_reset_streams_max, } } let streams = Streams::new(streams_config(&config)); diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 567d03060..560927598 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -32,6 +32,7 @@ pub type WindowSize = u32; // Constants pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; // i32::MAX as u32 pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20; +pub const DEFAULT_LOCAL_RESET_COUNT_MAX: usize = 1024; pub const DEFAULT_RESET_STREAM_MAX: usize = 10; pub const DEFAULT_RESET_STREAM_SECS: u64 = 30; pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400; diff --git a/src/proto/peer.rs b/src/proto/peer.rs index d62d9e24e..cbe7fb289 100644 --- a/src/proto/peer.rs +++ b/src/proto/peer.rs @@ -14,7 +14,7 @@ pub(crate) trait Peer { fn r#dyn() -> Dyn; - fn is_server() -> bool; + //fn is_server() -> bool; fn convert_poll_message( pseudo: Pseudo, @@ -22,10 +22,12 @@ pub(crate) trait Peer { stream_id: StreamId, ) -> Result; + /* fn is_local_init(id: StreamId) -> bool { assert!(!id.is_zero()); Self::is_server() == id.is_server_initiated() } + */ } /// A dynamic representation of `Peer`. diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 28065cc68..93949d4f5 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,6 +1,5 @@ use crate::codec::UserError; use crate::error::Reason; -use crate::frame; use crate::proto::*; use std::task::{Context, Poll}; diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index add1312e5..710d42c8d 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -31,6 +31,16 @@ pub(super) struct Counts { /// Current number of "pending accept" streams that were remotely reset num_remote_reset_streams: usize, + + /// Maximum number of locally reset streams due to protocol error across + /// the lifetime of the connection. + /// + /// When this gets exceeded, we issue GOAWAYs. + max_local_error_reset_streams: Option, + + /// Total number of locally reset streams due to protocol error across the + /// lifetime of the connection. + num_local_error_reset_streams: usize, } impl Counts { @@ -46,6 +56,8 @@ impl Counts { num_local_reset_streams: 0, max_remote_reset_streams: config.remote_reset_max, num_remote_reset_streams: 0, + max_local_error_reset_streams: config.local_max_error_reset_streams, + num_local_error_reset_streams: 0, } } @@ -66,6 +78,26 @@ impl Counts { self.num_send_streams != 0 || self.num_recv_streams != 0 } + /// Returns true if we can issue another local reset due to protocol error. + pub fn can_inc_num_local_error_resets(&self) -> bool { + if let Some(max) = self.max_local_error_reset_streams { + max > self.num_local_error_reset_streams + } else { + true + } + } + + pub fn inc_num_local_error_resets(&mut self) { + assert!(self.can_inc_num_local_error_resets()); + + // Increment the number of remote initiated streams + self.num_local_error_reset_streams += 1; + } + + pub(crate) fn max_local_error_resets(&self) -> Option { + self.max_local_error_reset_streams + } + /// Returns true if the receive stream concurrency can be incremented pub fn can_inc_num_recv_streams(&self) -> bool { self.max_recv_streams > self.num_recv_streams diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index fbe32c7b0..b347442af 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -69,4 +69,10 @@ pub struct Config { /// Maximum number of remote initiated streams pub remote_max_initiated: Option, + + /// Maximum number of locally reset streams due to protocol error across + /// the lifetime of the connection. + /// + /// When this gets exceeded, we issue GOAWAYs. + pub local_max_error_reset_streams: Option, } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 3196049a4..14b37e223 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -1,12 +1,12 @@ use super::store::Resolve; use super::*; -use crate::frame::{Reason, StreamId}; +use crate::frame::Reason; use crate::codec::UserError; use crate::codec::UserError::*; -use bytes::buf::{Buf, Take}; +use bytes::buf::Take; use std::{ cmp::{self, Ordering}, fmt, io, mem, @@ -184,7 +184,15 @@ impl Prioritize { stream.requested_send_capacity = cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; - self.try_assign_capacity(stream); + // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity + // cannot be assigned at the time it is called. + // + // Streams over the max concurrent count will still call `send_data` so we should be + // careful not to put it into `pending_capacity` as it will starve the connection + // capacity for other streams + if !stream.is_pending_open { + self.try_assign_capacity(stream); + } } if frame.is_end_stream() { @@ -522,6 +530,7 @@ impl Prioritize { loop { if let Some(mut stream) = self.pop_pending_open(store, counts) { self.pending_send.push_front(&mut stream); + self.try_assign_capacity(&mut stream); } match self.pop_frame(buffer, store, max_frame_len, counts) { diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 0063942a4..71ec7901c 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,14 +1,14 @@ use super::*; use crate::codec::UserError; -use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; -use crate::proto::{self, Error}; +use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; +use crate::proto; use http::{HeaderMap, Request, Response}; use std::cmp::Ordering; use std::io; use std::task::{Context, Poll, Waker}; -use std::time::{Duration, Instant}; +use std::time::Instant; #[derive(Debug)] pub(super) struct Recv { diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 67b377b12..35fd6f25e 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -127,6 +127,7 @@ impl Store { } } + #[allow(clippy::blocks_in_conditions)] pub(crate) fn for_each(&mut self, mut f: F) where F: FnMut(Ptr), diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 274bf4553..7c00cd517 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1542,10 +1542,24 @@ impl Actions { ) -> Result<(), Error> { if let Err(Error::Reset(stream_id, reason, initiator)) = res { debug_assert_eq!(stream_id, stream.id); - // Reset the stream. - self.send - .send_reset(reason, initiator, buffer, stream, counts, &mut self.task); - Ok(()) + + if counts.can_inc_num_local_error_resets() { + counts.inc_num_local_error_resets(); + + // Reset the stream. + self.send + .send_reset(reason, initiator, buffer, stream, counts, &mut self.task); + Ok(()) + } else { + tracing::warn!( + "reset_on_recv_stream_err; locally-reset streams reached limit ({:?})", + counts.max_local_error_resets().unwrap(), + ); + Err(Error::library_go_away_data( + Reason::ENHANCE_YOUR_CALM, + "too_many_internal_resets", + )) + } } else { res } diff --git a/src/server.rs b/src/server.rs index bb20adc5d..65d2d8301 100644 --- a/src/server.rs +++ b/src/server.rs @@ -252,6 +252,12 @@ pub struct Builder { /// Maximum amount of bytes to "buffer" for writing per stream. max_send_buffer_size: usize, + + /// Maximum number of locally reset streams due to protocol error across + /// the lifetime of the connection. + /// + /// When this gets exceeded, we issue GOAWAYs. + local_max_error_reset_streams: Option, } /// Send a response back to the client @@ -650,6 +656,8 @@ impl Builder { settings: Settings::default(), initial_target_connection_window_size: None, max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, + + local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX), } } @@ -887,6 +895,24 @@ impl Builder { self } + /// Sets the maximum number of local resets due to protocol errors made by the remote end. + /// + /// Invalid frames and many other protocol errors will lead to resets being generated for those streams. + /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers. + /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate. + /// + /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of + /// `ENHANCE_YOUR_CALM` to the client. + /// + /// If you really want to disable this, supply [`Option::None`] here. + /// Disabling this is not recommended and may expose you to DOS attacks. + /// + /// The default value is currently 1024, but could change. + pub fn max_local_error_reset_streams(&mut self, max: Option) -> &mut Self { + self.local_max_error_reset_streams = max; + self + } + /// Sets the maximum number of pending-accept remotely-reset streams. /// /// Streams that have been received by the peer, but not accepted by the @@ -1361,6 +1387,9 @@ where reset_stream_duration: self.builder.reset_stream_duration, reset_stream_max: self.builder.reset_stream_max, remote_reset_stream_max: self.builder.pending_accept_reset_stream_max, + local_error_reset_streams_max: self + .builder + .local_max_error_reset_streams, settings: self.builder.settings.clone(), }, ); @@ -1472,9 +1501,11 @@ impl proto::Peer for Peer { const NAME: &'static str = "Server"; + /* fn is_server() -> bool { true } + */ fn r#dyn() -> proto::DynPeer { proto::DynPeer::Server @@ -1525,7 +1556,7 @@ impl proto::Peer for Peer { // A request translated from HTTP/1 must not include the :authority // header - if let Some(authority) = pseudo.authority { + if let Some(authority) = pseudo.authority.filter(|_| false) { let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner()); parts.authority = Some(maybe_authority.or_else(|why| { malformed!( diff --git a/tests/h2-support/src/frames.rs b/tests/h2-support/src/frames.rs index a76dd3b60..858bf770b 100644 --- a/tests/h2-support/src/frames.rs +++ b/tests/h2-support/src/frames.rs @@ -2,7 +2,7 @@ use std::convert::TryInto; use std::fmt; use bytes::Bytes; -use http::{self, HeaderMap, StatusCode}; +use http::{HeaderMap, StatusCode}; use h2::{ ext::Protocol, diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index 18d084841..30824943c 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -2,7 +2,7 @@ use crate::SendFrame; use h2::frame::{self, Frame}; use h2::proto::Error; -use h2::{self, SendError}; +use h2::SendError; use futures::future::poll_fn; use futures::{ready, Stream, StreamExt}; diff --git a/tests/h2-support/src/util.rs b/tests/h2-support/src/util.rs index aa7fb2c54..02b6450d0 100644 --- a/tests/h2-support/src/util.rs +++ b/tests/h2-support/src/util.rs @@ -1,5 +1,3 @@ -use h2; - use bytes::{BufMut, Bytes}; use futures::ready; use std::future::Future; diff --git a/tests/h2-tests/tests/hammer.rs b/tests/h2-tests/tests/hammer.rs index a5cba3dfa..4b5d04341 100644 --- a/tests/h2-tests/tests/hammer.rs +++ b/tests/h2-tests/tests/hammer.rs @@ -8,7 +8,6 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, - thread, }; use tokio::net::{TcpListener, TcpStream}; diff --git a/tests/h2-tests/tests/prioritization.rs b/tests/h2-tests/tests/prioritization.rs index 7c2681068..11d2c2ccf 100644 --- a/tests/h2-tests/tests/prioritization.rs +++ b/tests/h2-tests/tests/prioritization.rs @@ -1,5 +1,6 @@ -use futures::future::join; -use futures::{FutureExt, StreamExt}; +use futures::future::{join, select}; +use futures::{pin_mut, FutureExt, StreamExt}; + use h2_support::prelude::*; use h2_support::DEFAULT_WINDOW_SIZE; use std::task::Context; @@ -408,3 +409,95 @@ async fn send_data_receive_window_update() { join(mock, h2).await; } + +#[tokio::test] +async fn stream_count_over_max_stream_limit_does_not_starve_capacity() { + use tokio::sync::oneshot; + + h2_support::trace_init!(); + + let (io, mut srv) = mock::new(); + + let (tx, rx) = oneshot::channel(); + + let srv = async move { + let _ = srv + .assert_client_handshake_with_settings( + frames::settings() + // super tiny server + .max_concurrent_streams(1), + ) + .await; + srv.recv_frame(frames::headers(1).request("POST", "http://example.com/")) + .await; + + srv.recv_frame(frames::data(1, vec![0; 16384])).await; + srv.recv_frame(frames::data(1, vec![0; 16384])).await; + srv.recv_frame(frames::data(1, vec![0; 16384])).await; + srv.recv_frame(frames::data(1, vec![0; 16383]).eos()).await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + + // All of these connection capacities should be assigned to stream 3 + srv.send_frame(frames::window_update(0, 16384)).await; + srv.send_frame(frames::window_update(0, 16384)).await; + srv.send_frame(frames::window_update(0, 16384)).await; + srv.send_frame(frames::window_update(0, 16383)).await; + + // StreamId(3) should be able to send all of its request with the conn capacity + srv.recv_frame(frames::headers(3).request("POST", "http://example.com/")) + .await; + srv.recv_frame(frames::data(3, vec![0; 16384])).await; + srv.recv_frame(frames::data(3, vec![0; 16384])).await; + srv.recv_frame(frames::data(3, vec![0; 16384])).await; + srv.recv_frame(frames::data(3, vec![0; 16383]).eos()).await; + srv.send_frame(frames::headers(3).response(200).eos()).await; + + // Then all the future stream is guaranteed to be send-able by induction + tx.send(()).unwrap(); + }; + + fn request() -> Request<()> { + Request::builder() + .method(Method::POST) + .uri("http://example.com/") + .body(()) + .unwrap() + } + + let client = async move { + let (mut client, mut conn) = client::Builder::new() + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); + + let (req1, mut send1) = client.send_request(request(), false).unwrap(); + let (req2, mut send2) = client.send_request(request(), false).unwrap(); + + // Use up the connection window. + send1.send_data(vec![0; 65535].into(), true).unwrap(); + // Queue up for more connection window. + send2.send_data(vec![0; 65535].into(), true).unwrap(); + + // Queue up more pending open streams + for _ in 0..5 { + let (_, mut send) = client.send_request(request(), false).unwrap(); + send.send_data(vec![0; 65535].into(), true).unwrap(); + } + + let response = conn.drive(req1).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let response = conn.drive(req2).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let _ = rx.await; + }; + + let task = join(srv, client); + pin_mut!(task); + + let t = tokio::time::sleep(Duration::from_secs(5)).map(|_| panic!("time out")); + pin_mut!(t); + + select(task, t).await; +} diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index 6075c7dcf..7f0eb5456 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -use futures::future::{join, poll_fn}; +use futures::future::join; use futures::StreamExt; use h2_support::prelude::*; use tokio::io::AsyncWriteExt; @@ -883,6 +883,55 @@ async fn too_big_headers_sends_reset_after_431_if_not_eos() { join(client, srv).await; } +#[tokio::test] +async fn too_many_continuation_frames_sends_goaway() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_frame_eq(settings, frames::settings().max_header_list_size(1024 * 32)); + + // the mock impl automatically splits into CONTINUATION frames if the + // headers are too big for one frame. So without a max header list size + // set, we'll send a bunch of headers that will eventually get nuked. + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .field("a".repeat(10_000), "b".repeat(10_000)) + .field("c".repeat(10_000), "d".repeat(10_000)) + .field("e".repeat(10_000), "f".repeat(10_000)) + .field("g".repeat(10_000), "h".repeat(10_000)) + .field("i".repeat(10_000), "j".repeat(10_000)) + .field("k".repeat(10_000), "l".repeat(10_000)) + .field("m".repeat(10_000), "n".repeat(10_000)) + .field("o".repeat(10_000), "p".repeat(10_000)) + .field("y".repeat(10_000), "z".repeat(10_000)), + ) + .await; + client + .recv_frame(frames::go_away(0).calm().data("too_many_continuations")) + .await; + }; + + let srv = async move { + let mut srv = server::Builder::new() + // should mean ~3 continuation + .max_header_list_size(1024 * 32) + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); + + let err = srv.next().await.unwrap().expect_err("server"); + assert!(err.is_go_away()); + assert!(err.is_library()); + assert_eq!(err.reason(), Some(Reason::ENHANCE_YOUR_CALM)); + }; + + join(client, srv).await; +} + #[tokio::test] async fn pending_accept_recv_illegal_content_length_data() { h2_support::trace_init!(); diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index 16d113132..05a96a0f5 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -use futures::future::{join, join3, lazy, poll_fn, try_join}; +use futures::future::{join, join3, lazy, try_join}; use futures::{FutureExt, StreamExt, TryStreamExt}; use h2_support::prelude::*; use h2_support::util::yield_once;