diff --git a/Cargo.toml b/Cargo.toml index c81763e..cb702d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ socket2 = { version = ">=0.5.9, <0.7", optional = true, features = ["all"] } tracing = { version = "0.1", default-features = false, features = ["std"], optional = true } tokio = { version = "1", optional = true, default-features = false } tower-service = { version = "0.3", optional = true } +scopeguard = "1.2.0" [dev-dependencies] hyper = { version = "1.4.0", features = ["full"] } diff --git a/src/client/legacy/client.rs b/src/client/legacy/client.rs index e396520..9d02ed2 100644 --- a/src/client/legacy/client.rs +++ b/src/client/legacy/client.rs @@ -25,6 +25,7 @@ use super::connect::HttpConnector; use super::connect::{Alpn, Connect, Connected, Connection}; use super::pool::{self, Ver}; +use crate::client::legacy::pool::{EventHandler, PoolEvent}; use crate::common::future::poll_fn; use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper}; @@ -62,7 +63,7 @@ pub struct Error { } #[derive(Debug)] -enum ErrorKind { +pub enum ErrorKind { Canceled, ChannelClosed, Connect, @@ -89,8 +90,9 @@ macro_rules! e { }; } -// We might change this... :shrug: -type PoolKey = (http::uri::Scheme, http::uri::Authority); +/// PoolKey is a tuple of Scheme and Authority of Uri, used to +/// identify a connection pool for a specific destination. +pub type PoolKey = (http::uri::Scheme, http::uri::Authority); enum TrySendError { Retryable { @@ -247,7 +249,7 @@ where let uri = req.uri().clone(); loop { - req = match self.try_send_request(req, pool_key.clone()).await { + req = match self.try_send_request(req, &pool_key).await { Ok(resp) => return Ok(resp), Err(TrySendError::Nope(err)) => return Err(err), Err(TrySendError::Retryable { @@ -275,7 +277,7 @@ where async fn try_send_request( &self, mut req: Request, - pool_key: PoolKey, + pool_key: &PoolKey, ) -> Result, TrySendError> { let mut pooled = self .connection_for(pool_key) @@ -368,10 +370,10 @@ where async fn connection_for( &self, - pool_key: PoolKey, + pool_key: &PoolKey, ) -> Result, PoolKey>, Error> { loop { - match self.one_connection_for(pool_key.clone()).await { + match self.one_connection_for(pool_key).await { Ok(pooled) => return Ok(pooled), Err(ClientConnectError::Normal(err)) => return Err(err), Err(ClientConnectError::CheckoutIsClosed(reason)) => { @@ -391,7 +393,7 @@ where async fn one_connection_for( &self, - pool_key: PoolKey, + pool_key: &PoolKey, ) -> Result, PoolKey>, ClientConnectError> { // Return a single connection if pooling is not enabled if !self.pool.is_enabled() { @@ -484,7 +486,7 @@ where #[cfg(any(feature = "http1", feature = "http2"))] fn connect_to( &self, - pool_key: PoolKey, + pool_key: &PoolKey, ) -> impl Lazy, PoolKey>, Error>> + Send + Unpin { let executor = self.exec.clone(); @@ -496,7 +498,8 @@ where let ver = self.config.ver; let is_ver_h2 = ver == Ver::Http2; let connector = self.connector.clone(); - let dst = domain_as_uri(pool_key.clone()); + let dst = domain_as_uri(pool_key); + let pool_key = pool_key.clone(); hyper_lazy(move || { // Try to take a "connecting lock". // @@ -512,11 +515,49 @@ where return Either::Right(future::err(canceled)); } }; + + let on_event_error = pool.on_event.clone(); + let pool_key_cloned = pool_key.clone(); Either::Left( connector .connect(super::connect::sealed::Internal, dst) + .map_err(move |err| { + let err_box: Box = err.into(); + let mut source_err: Option<&dyn StdError> = Some(err_box.as_ref()); + let (mut io_error_kind, mut elapsed_error) = (None, false); + + while let Some(current_err) = source_err { + use std::io; + if let Some(io_err) = current_err.downcast_ref::() + { + io_error_kind = Some(io_err.kind()); + } else if current_err.is::() { + elapsed_error = true; + } + + source_err = current_err.source(); + } + + if let Some(ref handler) = on_event_error { + let is_timeout = + elapsed_error || matches!(io_error_kind, Some(std::io::ErrorKind::TimedOut)); + if is_timeout { + handler.notify(PoolEvent::ConnectionTimeout, &[&pool_key]); + } else { + handler.notify(PoolEvent::ConnectionError, &[&pool_key]); + } + } + // If the connection failed, we need to notify the event connection error. + + e!(Connect, err_box) + }) .map_err(|src| e!(Connect, src)) .and_then(move |io| { + // increment the total connection count for this pool key + if let Some(ref handler) = pool.on_event { + handler.notify(PoolEvent::NewConnection, &[&pool_key_cloned]); + } + let connected = io.connected(); // If ALPN is h2 and we aren't http2_only already, // then we need to convert our pool checkout into @@ -542,6 +583,13 @@ where let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2; Either::Left(Box::pin(async move { + use scopeguard::{guard, ScopeGuard}; + let guard = guard((), |_| { + // increment the destroy connection count for this pool key (if still armed) + if let Some(ref handler) = pool.on_event { + handler.notify(PoolEvent::ConnectionError, &[&pool_key_cloned]); + } + }); let tx = if is_h2 { #[cfg(feature = "http2")] { let (mut tx, conn) = @@ -653,6 +701,9 @@ where } }; + // “defuse” the guard... + ScopeGuard::into_inner(guard); + Ok(pool.pooled( connecting, PoolClient { @@ -932,10 +983,10 @@ fn authority_form(uri: &mut Uri) { } fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result { - let uri_clone = uri.clone(); - match (uri_clone.scheme(), uri_clone.authority()) { + match (uri.scheme(), uri.authority()) { (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())), (None, Some(auth)) if is_http_connect => { + let auth = auth.clone(); let scheme = match auth.port_u16() { Some(443) => { set_scheme(uri, Scheme::HTTPS); @@ -946,7 +997,7 @@ fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result { debug!("Client requires absolute-form URIs, received: {:?}", uri); @@ -955,10 +1006,10 @@ fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result Uri { +fn domain_as_uri((scheme, auth): &PoolKey) -> Uri { http::uri::Builder::new() - .scheme(scheme) - .authority(auth) + .scheme(scheme.clone()) + .authority(auth.clone()) .path_and_query("/") .build() .expect("domain is valid Uri") @@ -1021,6 +1072,7 @@ pub struct Builder { h2_builder: hyper::client::conn::http2::Builder, pool_config: pool::Config, pool_timer: Option, + event_handler: Option, } impl Builder { @@ -1046,6 +1098,7 @@ impl Builder { max_idle_per_host: usize::MAX, }, pool_timer: None, + event_handler: None, } } /// Set an optional timeout for idle sockets being kept-alive. @@ -1486,6 +1539,21 @@ impl Builder { self } + /// Sets the maximum number of HTTP2 concurrent streams. + /// + /// See the documentation of [`h2::client::Builder::max_concurrent_streams`] for more + /// details. + /// + /// The default value is determined by the `h2` crate. + /// + /// [`h2::client::Builder::max_concurrent_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_streams + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_concurrent_streams(&mut self, max: u32) -> &mut Self { + self.h2_builder.max_concurrent_streams(max); + self + } + /// Sets the maximum number of HTTP2 concurrent locally reset streams. /// /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more @@ -1539,6 +1607,13 @@ impl Builder { self } + /// Set a handler to be used for pool event notifications. + /// + pub fn pool_event_handler(&mut self, on_event: EventHandler) -> &mut Self { + self.event_handler = Some(on_event); + self + } + /// Set whether to retry requests that get disrupted before ever starting /// to write. /// @@ -1591,6 +1666,7 @@ impl Builder { { let exec = self.exec.clone(); let timer = self.pool_timer.clone(); + let on_event = self.event_handler.clone(); Client { config: self.client_config, exec: exec.clone(), @@ -1599,7 +1675,7 @@ impl Builder { #[cfg(feature = "http2")] h2_builder: self.h2_builder.clone(), connector, - pool: pool::Pool::new(self.pool_config, exec, timer), + pool: pool::Pool::new(self.pool_config, exec, timer, on_event), } } } diff --git a/src/client/legacy/mod.rs b/src/client/legacy/mod.rs index 1649ae7..1d1b445 100644 --- a/src/client/legacy/mod.rs +++ b/src/client/legacy/mod.rs @@ -1,7 +1,7 @@ #[cfg(any(feature = "http1", feature = "http2"))] mod client; #[cfg(any(feature = "http1", feature = "http2"))] -pub use client::{Builder, Client, Error, ResponseFuture}; +pub use client::{Builder, Client, Error, PoolKey, ResponseFuture}; pub mod connect; #[doc(hidden)] diff --git a/src/client/legacy/pool.rs b/src/client/legacy/pool.rs index e0d6f2f..9ec6b04 100644 --- a/src/client/legacy/pool.rs +++ b/src/client/legacy/pool.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] +use std::any::Any; use std::collections::{HashMap, HashSet, VecDeque}; use std::convert::Infallible; use std::error::Error as StdError; @@ -19,13 +20,57 @@ use tracing::{debug, trace}; use hyper::rt::Timer as _; +use crate::client::legacy::PoolKey; use crate::common::{exec, exec::Exec, timer::Timer}; +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum PoolEvent { + /// A new connection was created. + NewConnection, + /// A connection was closed. + ConnectionClosed, + /// A connection was closed. + IdleConnectionClosed, + /// Connection error. + ConnectionError, + /// Connection timeout. + ConnectionTimeout, +} + +type EventFunc = dyn Fn(PoolEvent, &dyn Any, &[&PoolKey]) + Send + Sync + 'static; + +#[derive(Clone)] +// This is used to notify the user of events in the pool. +pub struct EventHandler { + pub on_event: Arc, + tag: Arc, +} + +impl EventHandler { + pub fn new(update: F, tag: T) -> Self + where + F: Fn(PoolEvent, &dyn Any, &[&PoolKey]) + Send + Sync + 'static, + T: Any + Send + Sync + 'static, + { + EventHandler { + on_event: Arc::new(update), + tag: Arc::new(tag), + } + } + + pub fn notify(&self, event: PoolEvent, keys: &[&PoolKey]) { + (self.on_event)(event, self.tag.as_ref(), keys); + } +} + // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] pub struct Pool { // If the pool is disabled, this is None. inner: Option>>>, + // Notification handler for events in the pool. + // If the handler is provided, notification are available even when the pool is not enabled. + pub on_event: Option>, } // Before using a pooled connection, make sure the sender is not dead. @@ -42,9 +87,18 @@ pub trait Poolable: Unpin + Send + Sized + 'static { fn can_share(&self) -> bool; } -pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {} +pub trait Key: Eq + Hash + Clone + Debug + Send + Unpin + 'static { + fn as_any(&self) -> &dyn Any; +} -impl Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {} +impl Key for T +where + T: Eq + Hash + Clone + Debug + Send + Unpin + 'static, +{ + fn as_any(&self) -> &dyn Any { + self + } +} /// A marker to identify what version a pooled connection is. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] @@ -74,8 +128,7 @@ pub enum Reservation { /// Simple type alias in case the key type needs to be adjusted. // pub type Key = (http::uri::Scheme, http::uri::Authority); //Arc; - -struct PoolInner { +struct PoolInner { // A flag that a connection is being established, and the connection // should be shared. This prevents making multiple HTTP/2 connections // to the same host. @@ -100,6 +153,7 @@ struct PoolInner { exec: Exec, timer: Option, timeout: Option, + on_event: Option>, } // This is because `Weak::new()` *allocates* space for `T`, even if it @@ -119,13 +173,19 @@ impl Config { } impl Pool { - pub fn new(config: Config, executor: E, timer: Option) -> Pool + pub fn new( + config: Config, + executor: E, + timer: Option, + handler: Option, + ) -> Pool where E: hyper::rt::Executor + Send + Sync + Clone + 'static, M: hyper::rt::Timer + Send + Sync + Clone + 'static, { let exec = Exec::new(executor); let timer = timer.map(|t| Timer::new(t)); + let handler = handler.map(Arc::new); let inner = if config.is_enabled() { Some(Arc::new(Mutex::new(PoolInner { connecting: HashSet::new(), @@ -136,12 +196,16 @@ impl Pool { exec, timer, timeout: config.idle_timeout, + on_event: handler.clone(), }))) } else { None }; - Pool { inner } + Pool { + inner, + on_event: handler, + } } pub(crate) fn is_enabled(&self) -> bool { @@ -455,7 +519,7 @@ impl PoolInner { } } -impl PoolInner { +impl PoolInner { /// Any `FutureResponse`s that were created will have made a `Checkout`, /// and possibly inserted into the pool that it is waiting for an idle /// connection. If a user ever dropped that future, we need to clean out @@ -479,17 +543,19 @@ impl PoolInner { let now = Instant::now(); //self.last_idle_check_at = now; - + let mut removed_keys = vec![]; self.idle.retain(|key, values| { values.retain(|entry| { if !entry.value.is_open() { trace!("idle interval evicting closed for {:?}", key); + removed_keys.push(key.clone()); return false; } // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470. if now.saturating_duration_since(entry.idle_at) > dur { trace!("idle interval evicting expired for {:?}", key); + removed_keys.push(key.clone()); return false; } @@ -500,13 +566,33 @@ impl PoolInner { // returning false evicts this key/val !values.is_empty() }); + + if let Some(ref handler) = self.on_event { + let key_refs = downcast_refs::(&removed_keys); + handler.notify(PoolEvent::IdleConnectionClosed, &key_refs); + } + } +} + +fn downcast_refs(keys: &[K]) -> Vec<&T> +where + K: Key, + T: 'static, +{ + let mut out = Vec::with_capacity(keys.len()); + for k in keys { + if let Some(k) = k.as_any().downcast_ref::() { + out.push(k); + } } + out } impl Clone for Pool { fn clone(&self) -> Pool { Pool { inner: self.inner.clone(), + on_event: self.on_event.clone(), } } } @@ -554,15 +640,15 @@ impl DerefMut for Pooled { impl Drop for Pooled { fn drop(&mut self) { if let Some(value) = self.value.take() { - if !value.is_open() { - // If we *already* know the connection is done here, - // it shouldn't be re-inserted back into the pool. - return; - } - if let Some(pool) = self.pool.upgrade() { if let Ok(mut inner) = pool.lock() { - inner.put(self.key.clone(), value, &pool); + if value.is_open() { + inner.put(self.key.clone(), value, &pool); + } else if let Some(ref handler) = inner.on_event { + if let Some(key) = self.key.as_any().downcast_ref::() { + handler.notify(PoolEvent::ConnectionClosed, &[key]); + } + } } } else if !value.can_share() { trace!("pool dropped, dropping pooled ({:?})", self.key); @@ -886,6 +972,7 @@ mod tests { }, TokioExecutor::new(), Option::::None, + None, ); pool.no_timer(); pool @@ -984,6 +1071,7 @@ mod tests { }, TokioExecutor::new(), Some(TokioTimer::new()), + None ); let key = host_key("foo");