Skip to content

Commit fda3317

Browse files
drcaramelsyrupgumpt
authored andcommitted
Always drain v1 request body before session reuse
Previously the pingora-proxy filter implementations would be responsible for ensuring that the HTTP/1.1 streams are safe for reuse and contain no remaining body bytes to drain. Draining body bytes is now always done as part of finishing the downstream v1 session before it is attempted for reuse, alongside a configurable total body drain timeout.
1 parent a1baa60 commit fda3317

File tree

4 files changed

+109
-15
lines changed

4 files changed

+109
-15
lines changed

.bleep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
c8dbb31df87f456082be681af9987172ee8c9954
1+
2513fe1ff4219fbb218dfd1fe7ffb4bbfb96aa91

pingora-core/src/protocols/http/server.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,9 @@ impl Session {
111111
/// This is useful for making streams reusable (in particular for HTTP/1.1) after returning an
112112
/// error before the whole body has been read.
113113
pub async fn drain_request_body(&mut self) -> Result<()> {
114-
loop {
115-
match self.read_request_body().await {
116-
Ok(Some(_)) => { /* continue to drain */ }
117-
Ok(None) => return Ok(()), // done
118-
Err(e) => return Err(e),
119-
}
114+
match self {
115+
Self::H1(s) => s.drain_request_body().await,
116+
Self::H2(s) => s.drain_request_body().await,
120117
}
121118
}
122119

@@ -177,7 +174,7 @@ impl Session {
177174
Self::H1(mut s) => {
178175
// need to flush body due to buffering
179176
s.finish_body().await?;
180-
Ok(s.reuse().await)
177+
s.reuse().await
181178
}
182179
Self::H2(mut s) => {
183180
s.finish()?;
@@ -225,6 +222,19 @@ impl Session {
225222
}
226223
}
227224

225+
/// Sets the total drain timeout, which will be applied while discarding the
226+
/// request body using `drain_request_body`.
227+
///
228+
/// For HTTP/1.1, reusing a session requires ensuring that the request body
229+
/// is consumed. If the timeout is exceeded, the caller should give up on
230+
/// trying to reuse the session.
231+
pub fn set_total_drain_timeout(&mut self, timeout: Duration) {
232+
match self {
233+
Self::H1(s) => s.set_total_drain_timeout(timeout),
234+
Self::H2(s) => s.set_total_drain_timeout(timeout),
235+
}
236+
}
237+
228238
/// Sets the minimum downstream send rate in bytes per second. This
229239
/// is used to calculate a write timeout in seconds based on the size
230240
/// of the buffer being written. If a `min_send_rate` is configured it

pingora-core/src/protocols/http/v1/server.rs

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub struct HttpSession {
6262
keepalive_timeout: KeepaliveStatus,
6363
read_timeout: Option<Duration>,
6464
write_timeout: Option<Duration>,
65+
/// How long to wait to make downstream session reusable, if body needs to be drained.
66+
total_drain_timeout: Option<Duration>,
6567
/// A copy of the response that is already written to the client
6668
response_written: Option<Box<ResponseHeader>>,
6769
/// The parsed request header
@@ -106,6 +108,7 @@ impl HttpSession {
106108
request_header: None,
107109
read_timeout: None,
108110
write_timeout: None,
111+
total_drain_timeout: None,
109112
body_bytes_sent: 0,
110113
body_bytes_read: 0,
111114
retry_buffer: None,
@@ -399,6 +402,30 @@ impl HttpSession {
399402
}
400403
}
401404

405+
async fn do_drain_request_body(&mut self) -> Result<()> {
406+
loop {
407+
match self.read_body_bytes().await {
408+
Ok(Some(_)) => { /* continue to drain */ }
409+
Ok(None) => return Ok(()), // done
410+
Err(e) => return Err(e),
411+
}
412+
}
413+
}
414+
415+
/// Drain the request body. `Ok(())` when there is no (more) body to read.
416+
pub async fn drain_request_body(&mut self) -> Result<()> {
417+
if self.is_body_done() {
418+
return Ok(());
419+
}
420+
match self.total_drain_timeout {
421+
Some(t) => match timeout(t, self.do_drain_request_body()).await {
422+
Ok(res) => res,
423+
Err(_) => Error::e_explain(ReadTimedout, format!("draining body, timeout: {t:?}")),
424+
},
425+
None => self.do_drain_request_body().await,
426+
}
427+
}
428+
402429
/// Whether there is no (more) body need to be read.
403430
pub fn is_body_done(&mut self) -> bool {
404431
self.init_body_reader();
@@ -862,6 +889,18 @@ impl HttpSession {
862889
self.write_timeout = Some(timeout);
863890
}
864891

892+
/// Sets the total drain timeout. For HTTP/1.1, reusing a session requires
893+
/// ensuring that the request body is consumed. This `timeout` will be used
894+
/// to determine how long to wait for the entirety of the downstream request
895+
/// body to finish after the upstream response is completed to return the
896+
/// session to the reuse pool. If the timeout is exceeded, we will give up
897+
/// on trying to reuse the session.
898+
///
899+
/// Note that the downstream read timeout still applies between body byte reads.
900+
pub fn set_total_drain_timeout(&mut self, timeout: Duration) {
901+
self.total_drain_timeout = Some(timeout);
902+
}
903+
865904
/// Sets the minimum downstream send rate in bytes per second. This
866905
/// is used to calculate a write timeout in seconds based on the size
867906
/// of the buffer being written. If a `min_send_rate` is configured it
@@ -911,19 +950,25 @@ impl HttpSession {
911950
}
912951

913952
/// Consume `self`, if the connection can be reused, the underlying stream will be returned
914-
/// to be fed to the next [`Self::new()`]. The next session can just call [`Self::read_request()`].
953+
/// to be fed to the next [`Self::new()`]. This drains any remaining request body if it hasn't
954+
/// yet been read and the stream is reusable.
955+
///
956+
/// The next session can just call [`Self::read_request()`].
957+
///
915958
/// If the connection cannot be reused, the underlying stream will be closed and `None` will be
916-
/// returned.
917-
pub async fn reuse(mut self) -> Option<Stream> {
918-
// TODO: this function is unnecessarily slow for keepalive case
919-
// because that case does not need async
959+
/// returned. If there was an error while draining any remaining request body that error will
960+
/// be returned.
961+
pub async fn reuse(mut self) -> Result<Option<Stream>> {
920962
match self.keepalive_timeout {
921963
KeepaliveStatus::Off => {
922964
debug!("HTTP shutdown connection");
923965
self.shutdown().await;
924-
None
966+
Ok(None)
967+
}
968+
_ => {
969+
self.drain_request_body().await?;
970+
Ok(Some(self.underlying_stream))
925971
}
926-
_ => Some(self.underlying_stream),
927972
}
928973
}
929974

pingora-core/src/protocols/http/v2/server.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use http::uri::PathAndQuery;
2424
use http::{header, HeaderMap, Response};
2525
use log::{debug, warn};
2626
use pingora_http::{RequestHeader, ResponseHeader};
27+
use pingora_timeout::timeout;
2728
use std::sync::Arc;
29+
use std::time::Duration;
2830

2931
use crate::protocols::http::body_buffer::FixedBuffer;
3032
use crate::protocols::http::date::get_cached_date;
@@ -99,6 +101,8 @@ pub struct HttpSession {
99101
retry_buffer: Option<FixedBuffer>,
100102
// digest to record underlying connection info
101103
digest: Arc<Digest>,
104+
// How long to wait when draining (discarding) request body
105+
total_drain_timeout: Option<Duration>,
102106
}
103107

104108
impl HttpSession {
@@ -138,6 +142,7 @@ impl HttpSession {
138142
body_sent: 0,
139143
retry_buffer: None,
140144
digest,
145+
total_drain_timeout: None,
141146
}
142147
}))
143148
}
@@ -178,6 +183,40 @@ impl HttpSession {
178183
Ok(data)
179184
}
180185

186+
async fn do_drain_request_body(&mut self) -> Result<()> {
187+
loop {
188+
match self.read_body_bytes().await {
189+
Ok(Some(_)) => { /* continue to drain */ }
190+
Ok(None) => return Ok(()), // done
191+
Err(e) => return Err(e),
192+
}
193+
}
194+
}
195+
196+
/// Drain the request body. `Ok(())` when there is no (more) body to read.
197+
// NOTE for h2 it may be worth allowing cancellation of the stream via reset.
198+
pub async fn drain_request_body(&mut self) -> Result<()> {
199+
if self.is_body_done() {
200+
return Ok(());
201+
}
202+
match self.total_drain_timeout {
203+
Some(t) => match timeout(t, self.do_drain_request_body()).await {
204+
Ok(res) => res,
205+
Err(_) => Error::e_explain(
206+
ErrorType::ReadTimedout,
207+
format!("draining body, timeout: {t:?}"),
208+
),
209+
},
210+
None => self.do_drain_request_body().await,
211+
}
212+
}
213+
214+
/// Sets the total drain timeout. This `timeout` will be used while draining
215+
/// the request body.
216+
pub fn set_total_drain_timeout(&mut self, timeout: Duration) {
217+
self.total_drain_timeout = Some(timeout);
218+
}
219+
181220
// the write_* don't have timeouts because the actual writing happens on the connection
182221
// not here.
183222

0 commit comments

Comments
 (0)