From 35a918b4322bea2ba43aeff5434f97b91e15c1ff Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 6 Dec 2021 15:14:51 -0800 Subject: [PATCH 1/5] Add max send buffer per stream option --- src/client.rs | 16 ++++++++++++++++ src/proto/connection.rs | 2 ++ src/proto/streams/mod.rs | 3 +++ src/proto/streams/send.rs | 6 +++++- src/server.rs | 16 ++++++++++++++++ 5 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 3a818a582..29f500e01 100644 --- a/src/client.rs +++ b/src/client.rs @@ -320,6 +320,9 @@ pub struct Builder { /// Initial target window size for new connections. initial_target_connection_window_size: Option, + /// Maximum amount of bytes to "buffer" for writing per stream. + max_send_buffer_size: usize, + /// Maximum number of locally reset streams to keep at a time. reset_stream_max: usize, @@ -628,6 +631,7 @@ impl Builder { /// ``` pub fn new() -> Builder { Builder { + max_send_buffer_size: 1024 * 1024, reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, initial_target_connection_window_size: None, @@ -962,6 +966,17 @@ impl Builder { self } + /// Sets the maximum send buffer size per stream. + /// + /// Once a stream has buffered up to (or over) the maximum, the stream's + /// flow control will not "poll" additional capacity. Once bytes for the + /// stream have been written to the connection, the send buffer capacity + /// will be freed up again. + pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + self.max_send_buffer_size = max; + self + } + /// Enables or disables server push promises. /// /// This value is included in the initial SETTINGS handshake. When set, the @@ -1184,6 +1199,7 @@ where proto::Config { next_stream_id: builder.stream_id, initial_max_send_streams: builder.initial_max_send_streams, + max_send_buffer_size: builder.max_send_buffer_size, reset_stream_duration: builder.reset_stream_duration, reset_stream_max: builder.reset_stream_max, settings: builder.settings.clone(), diff --git a/src/proto/connection.rs b/src/proto/connection.rs index d1b8b5125..cd011a1d5 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -77,6 +77,7 @@ struct DynConnection<'a, B: Buf = Bytes> { pub(crate) struct Config { pub next_stream_id: StreamId, pub initial_max_send_streams: usize, + pub max_send_buffer_size: usize, pub reset_stream_duration: Duration, pub reset_stream_max: usize, pub settings: frame::Settings, @@ -108,6 +109,7 @@ where .initial_window_size() .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), initial_max_send_streams: config.initial_max_send_streams, + local_max_buffer_size: config.max_send_buffer_size, local_next_stream_id: config.next_stream_id, local_push_enabled: config.settings.is_push_enabled().unwrap_or(true), extended_connect_protocol_enabled: config diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 0fd61a29a..de2a2c85a 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -41,6 +41,9 @@ pub struct Config { /// MAX_CONCURRENT_STREAMS specified in the frame. pub initial_max_send_streams: usize, + /// Max amount of DATA bytes to buffer per stream. + pub local_max_buffer_size: usize, + /// The stream ID to start the next local stream with pub local_next_stream_id: StreamId, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index e3fcf6d32..006528dcf 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -28,6 +28,9 @@ pub(super) struct Send { /// > the identified last stream. max_stream_id: StreamId, + /// The maximum amount of bytes a stream should buffer. + max_buffer_size: usize, + /// Initial window size of locally initiated streams init_window_sz: WindowSize, @@ -52,6 +55,7 @@ impl Send { pub fn new(config: &Config) -> Self { Send { init_window_sz: config.remote_init_window_sz, + max_buffer_size: config.local_max_buffer_size, max_stream_id: StreamId::MAX, next_stream_id: Ok(config.local_next_stream_id), prioritize: Prioritize::new(config), @@ -339,7 +343,7 @@ impl Send { if available as usize <= buffered { 0 } else { - available - buffered as WindowSize + available.min(self.max_buffer_size as WindowSize) - buffered as WindowSize } } diff --git a/src/server.rs b/src/server.rs index 1eb40312c..d4b3c6a79 100644 --- a/src/server.rs +++ b/src/server.rs @@ -245,6 +245,9 @@ pub struct Builder { /// Initial target window size for new connections. initial_target_connection_window_size: Option, + + /// Maximum amount of bytes to "buffer" for writing per stream. + max_send_buffer_size: usize, } /// Send a response back to the client @@ -633,6 +636,7 @@ impl Builder { reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, settings: Settings::default(), initial_target_connection_window_size: None, + max_send_buffer_size: 1024 * 1024, } } @@ -870,6 +874,17 @@ impl Builder { self } + /// Sets the maximum send buffer size per stream. + /// + /// Once a stream has buffered up to (or over) the maximum, the stream's + /// flow control will not "poll" additional capacity. Once bytes for the + /// stream have been written to the connection, the send buffer capacity + /// will be freed up again. + pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + self.max_send_buffer_size = max; + self + } + /// Sets the maximum number of concurrent locally reset streams. /// /// When a stream is explicitly reset by either calling @@ -1290,6 +1305,7 @@ where next_stream_id: 2.into(), // Server does not need to locally initiate any streams initial_max_send_streams: 0, + max_send_buffer_size: self.builder.max_send_buffer_size, reset_stream_duration: self.builder.reset_stream_duration, reset_stream_max: self.builder.reset_stream_max, settings: self.builder.settings.clone(), From b610a3954a4f84842807285ee79c7370548b4ab1 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 7 Dec 2021 17:10:07 -0800 Subject: [PATCH 2/5] catch sub overflow in capacity() --- src/proto/streams/send.rs | 10 ++++- tests/h2-tests/tests/flow_control.rs | 57 ++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 006528dcf..92cb27d48 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -340,10 +340,18 @@ impl Send { let available = stream.send_flow.available().as_size(); let buffered = stream.buffered_send_data; + tracing::info!( + ?available, + ?buffered, + max_buffer_size = self.max_buffer_size + ); + if available as usize <= buffered { 0 } else { - available.min(self.max_buffer_size as WindowSize) - buffered as WindowSize + available + .min(self.max_buffer_size as WindowSize) + .saturating_sub(buffered as WindowSize) } } diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index e7d630808..1a6018f73 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1611,3 +1611,60 @@ async fn poll_capacity_after_send_data_and_reserve() { join(srv, h2).await; } + +#[tokio::test] +async fn max_send_buffer_size_overflow() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + srv.recv_frame(frames::data(1, &[0; 10][..])).await; + srv.recv_frame(frames::data(1, &[][..]).eos()).await; + }; + + let client = async move { + let (mut client, mut conn) = client::Builder::new() + .max_send_buffer_size(5) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = conn.drive(response).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + assert_eq!(stream.capacity(), 0); + stream.reserve_capacity(10); + assert_eq!( + stream.capacity(), + 5, + "polled capacity not over max buffer size" + ); + + stream.send_data([0; 10][..].into(), false).unwrap(); + + stream.reserve_capacity(15); + assert_eq!( + stream.capacity(), + 0, + "now with buffered over the max, don't overflow" + ); + stream.send_data([0; 0][..].into(), true).unwrap(); + + // Wait for the connection to close + conn.await.unwrap(); + }; + + join(srv, client).await; +} From 980f23e278545a183da5123aac1127a321dad2af Mon Sep 17 00:00:00 2001 From: Anthony Ramine Date: Wed, 8 Dec 2021 10:24:09 +0100 Subject: [PATCH 3/5] Fix computation in Send::capacity --- src/proto/streams/send.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 92cb27d48..f86c02382 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -337,7 +337,7 @@ impl Send { /// Current available stream send capacity pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { - let available = stream.send_flow.available().as_size(); + let available = stream.send_flow.available().as_size() as usize; let buffered = stream.buffered_send_data; tracing::info!( @@ -346,13 +346,7 @@ impl Send { max_buffer_size = self.max_buffer_size ); - if available as usize <= buffered { - 0 - } else { - available - .min(self.max_buffer_size as WindowSize) - .saturating_sub(buffered as WindowSize) - } + available.min(self.max_buffer_size).saturating_sub(buffered) as WindowSize } pub fn poll_reset( From aa715015560ac9c765fd06996353766e0d51f40b Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 8 Dec 2021 08:24:39 -0800 Subject: [PATCH 4/5] remove stray log --- src/proto/streams/send.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index f86c02382..b7230030e 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -340,12 +340,6 @@ impl Send { let available = stream.send_flow.available().as_size() as usize; let buffered = stream.buffered_send_data; - tracing::info!( - ?available, - ?buffered, - max_buffer_size = self.max_buffer_size - ); - available.min(self.max_buffer_size).saturating_sub(buffered) as WindowSize } From 1945c027f9a51fb79372e84a35109aeafa4fedca Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 8 Dec 2021 08:29:29 -0800 Subject: [PATCH 5/5] add asserts, adjust default --- src/client.rs | 9 ++++++++- src/proto/mod.rs | 1 + src/server.rs | 9 ++++++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 29f500e01..d4ec3b906 100644 --- a/src/client.rs +++ b/src/client.rs @@ -631,7 +631,7 @@ impl Builder { /// ``` pub fn new() -> Builder { Builder { - max_send_buffer_size: 1024 * 1024, + max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, initial_target_connection_window_size: None, @@ -972,7 +972,14 @@ impl Builder { /// flow control will not "poll" additional capacity. Once bytes for the /// stream have been written to the connection, the send buffer capacity /// will be freed up again. + /// + /// The default is currently ~400MB, but may change. + /// + /// # Panics + /// + /// This function panics if `max` is larger than `u32::MAX`. pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + assert!(max <= std::u32::MAX as usize); self.max_send_buffer_size = max; self } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index d505e77f3..5ec7bf992 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -33,3 +33,4 @@ pub type WindowSize = u32; pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; pub const DEFAULT_RESET_STREAM_MAX: usize = 10; pub const DEFAULT_RESET_STREAM_SECS: u64 = 30; +pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400; diff --git a/src/server.rs b/src/server.rs index d4b3c6a79..87c300083 100644 --- a/src/server.rs +++ b/src/server.rs @@ -636,7 +636,7 @@ impl Builder { reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, settings: Settings::default(), initial_target_connection_window_size: None, - max_send_buffer_size: 1024 * 1024, + max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, } } @@ -880,7 +880,14 @@ impl Builder { /// flow control will not "poll" additional capacity. Once bytes for the /// stream have been written to the connection, the send buffer capacity /// will be freed up again. + /// + /// The default is currently ~400MB, but may change. + /// + /// # Panics + /// + /// This function panics if `max` is larger than `u32::MAX`. pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + assert!(max <= std::u32::MAX as usize); self.max_send_buffer_size = max; self }