From 66775d661bda844e000ba913d6d2a00caf801c1a Mon Sep 17 00:00:00 2001 From: Hanna Kruppe Date: Sat, 10 May 2025 20:11:11 +0200 Subject: [PATCH] refactor(lib): drop futures-util except in ffi Make hyper usable for h1/h2 and client/server without this heavyweight dependency. It's about 17k lines of code and takes up to 1.7 seconds to compile on my machine, but hyper is only using a tiny fraction of it. Larger applications probably still pull in futures-util by other means, but it's no longer as unavoidable as in the early days of the ecosystem. To remove futures-util without raising MSRV, I took these steps: * When futures-util just re-exports something from its dependencies, use it directly from the source. * Inline trivial helpers like `poll_unpin` that "only" communicate intent a little better but don't save any significant amount of code. * Refactor the h2 client code to avoid `StreamFuture` for the "Client has been dropped" detection -- just poll the mpsc channel directly. * Implement a couple of small helpers from scratch when they're straightforward and fit on one screen each. The majority of this is polyfills for standard library APIs that would require a higher MSRV. * Use `AtomicWaker` from the `atomic-waker` crate, a separately published copy of the futures-util type of the same name. While the two crates are owned by different organizations (smol-rs vs. rust-lang), it's mostly the same people maintaining both copies. The uses of future-util in hyper's tests/benches/examples and in the `ffi` module seem much harder to remove entirely, so I did not touch those modules at all. --- Cargo.toml | 11 ++++++---- src/body/incoming.rs | 4 ++-- src/client/conn/http1.rs | 6 +++--- src/client/conn/http2.rs | 4 ++-- src/client/dispatch.rs | 3 +-- src/common/either.rs | 46 ++++++++++++++++++++++++++++++++++++++++ src/common/future.rs | 30 ++++++++++++++++++++++++++ src/common/mod.rs | 7 ++++++ src/common/task.rs | 36 +++++++++++++++++++++++++++++++ src/common/time.rs | 2 +- src/common/watch.rs | 2 +- src/proto/h1/conn.rs | 2 +- src/proto/h1/decode.rs | 2 +- src/proto/h1/dispatch.rs | 2 +- src/proto/h1/io.rs | 2 +- src/proto/h2/client.rs | 23 +++++++++----------- src/proto/h2/mod.rs | 2 +- src/proto/h2/server.rs | 2 +- src/server/conn/http1.rs | 4 ++-- src/server/conn/http2.rs | 2 +- 20 files changed, 155 insertions(+), 37 deletions(-) create mode 100644 src/common/either.rs create mode 100644 src/common/future.rs diff --git a/Cargo.toml b/Cargo.toml index c97f581508..3034cfeb57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,14 +27,17 @@ tokio = { version = "1", features = ["sync"] } # Optional +atomic-waker = { version = "1.1.2", optional = true } futures-channel = { version = "0.3", optional = true } -futures-util = { version = "0.3", default-features = false, optional = true } +futures-core = { version = "0.3.31", optional = true } +futures-util = { version = "0.3", default-features = false, features = ["alloc"], optional = true } h2 = { version = "0.4.2", optional = true } http-body-util = { version = "0.1", optional = true } httparse = { version = "1.9", optional = true } httpdate = { version = "1.0", optional = true } itoa = { version = "1", optional = true } pin-project-lite = { version = "0.2.4", optional = true } +pin-utils = { version = "0.1", optional = true } # TODO: replace with std::pin::pin! once MSRV >= 1.68 smallvec = { version = "1.12", features = ["const_generics", "const_new"], optional = true } tracing = { version = "0.1", default-features = false, features = ["std"], optional = true } want = { version = "0.3", optional = true } @@ -77,15 +80,15 @@ full = [ ] # HTTP versions -http1 = ["dep:futures-channel", "dep:futures-util", "dep:httparse", "dep:itoa"] -http2 = ["dep:futures-channel", "dep:futures-util", "dep:h2"] +http1 = ["dep:atomic-waker", "dep:futures-channel", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"] +http2 = ["dep:futures-channel", "dep:futures-core", "dep:h2"] # Client/Server client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"] server = ["dep:httpdate", "dep:pin-project-lite", "dep:smallvec"] # C-API support (currently unstable (no semver)) -ffi = ["dep:http-body-util", "futures-util?/alloc"] +ffi = ["dep:http-body-util", "futures-util"] capi = [] # Utilize tracing (currently unstable) diff --git a/src/body/incoming.rs b/src/body/incoming.rs index dcdc440524..64ee5001a9 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -11,9 +11,9 @@ use futures_channel::{mpsc, oneshot}; any(feature = "http1", feature = "http2"), any(feature = "client", feature = "server") ))] -use futures_util::ready; +use futures_core::ready; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver +use futures_core::{stream::FusedStream, Stream}; // for mpsc::Receiver #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] use http::HeaderMap; use http_body::{Body, Frame, SizeHint}; diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index ecfe6eb8fb..85c1fdc665 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -8,7 +8,7 @@ use std::task::{Context, Poll}; use crate::rt::{Read, Write}; use bytes::Bytes; -use futures_util::ready; +use futures_core::ready; use http::{Request, Response}; use httparse::ParserConfig; @@ -92,7 +92,7 @@ where /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. pub async fn without_shutdown(self) -> crate::Result> { let mut conn = Some(self); - futures_util::future::poll_fn(move |cx| -> Poll>> { + crate::common::future::poll_fn(move |cx| -> Poll>> { ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; Poll::Ready(Ok(conn.take().unwrap().into_parts())) }) @@ -148,7 +148,7 @@ impl SendRequest { /// /// If the associated connection is closed, this returns an Error. pub async fn ready(&mut self) -> crate::Result<()> { - futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await } /// Checks if the connection is currently ready to send a request. diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 3db28957b6..356d909dd9 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -10,7 +10,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use crate::rt::{Read, Write}; -use futures_util::ready; +use futures_core::ready; use http::{Request, Response}; use super::super::dispatch::{self, TrySendError}; @@ -99,7 +99,7 @@ impl SendRequest { /// /// If the associated connection is closed, this returns an Error. pub async fn ready(&mut self) -> crate::Result<()> { - futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await } /// Checks if the connection is currently ready to send a request. diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index d3e9415de2..412f7af52c 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -199,8 +199,7 @@ impl Receiver { #[cfg(feature = "http1")] pub(crate) fn try_recv(&mut self) -> Option<(T, Callback)> { - use futures_util::FutureExt; - match self.inner.recv().now_or_never() { + match crate::common::task::now_or_never(self.inner.recv()) { Some(Some(mut env)) => env.0.take(), _ => None, } diff --git a/src/common/either.rs b/src/common/either.rs new file mode 100644 index 0000000000..0412aa775c --- /dev/null +++ b/src/common/either.rs @@ -0,0 +1,46 @@ +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// One of two possible futures that have the same output type. + #[project = EitherProj] + pub(crate) enum Either { + Left { + #[pin] + fut: F1 + }, + Right { + #[pin] + fut: F2, + }, + } +} + +impl Either { + pub(crate) fn left(fut: F1) -> Self { + Either::Left { fut } + } + + pub(crate) fn right(fut: F2) -> Self { + Either::Right { fut } + } +} + +impl Future for Either +where + F1: Future, + F2: Future, +{ + type Output = F1::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project() { + EitherProj::Left { fut } => fut.poll(cx), + EitherProj::Right { fut } => fut.poll(cx), + } + } +} diff --git a/src/common/future.rs b/src/common/future.rs new file mode 100644 index 0000000000..c2e921cb7a --- /dev/null +++ b/src/common/future.rs @@ -0,0 +1,30 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +// TODO: replace with `std::future::poll_fn` once MSRV >= 1.64 +pub(crate) fn poll_fn(f: F) -> PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + PollFn { f } +} + +pub(crate) struct PollFn { + f: F, +} + +impl Unpin for PollFn {} + +impl Future for PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (self.as_mut().f)(cx) + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs index a0c71385cb..4b73437203 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -2,6 +2,13 @@ pub(crate) mod buf; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] pub(crate) mod date; +#[cfg(all(feature = "client", feature = "http2"))] +pub(crate) mod either; +#[cfg(any( + all(feature = "client", any(feature = "http1", feature = "http2")), + all(feature = "server", feature = "http1"), +))] +pub(crate) mod future; pub(crate) mod io; #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] pub(crate) mod task; diff --git a/src/common/task.rs b/src/common/task.rs index 41671b1453..90a0883465 100644 --- a/src/common/task.rs +++ b/src/common/task.rs @@ -1,4 +1,6 @@ use std::task::{Context, Poll}; +#[cfg(feature = "client")] +use std::task::{RawWaker, RawWakerVTable, Waker}; /// A function to help "yield" a future, such that it is re-scheduled immediately. /// @@ -7,3 +9,37 @@ pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll cx.waker().wake_by_ref(); Poll::Pending } + +// TODO: replace with `std::task::Waker::noop()` once MSRV >= 1.85 +#[cfg(feature = "client")] +fn noop_waker() -> Waker { + const NOOP_RAW_WAKER: RawWaker = RawWaker::new(std::ptr::null(), &NOOP_VTABLE); + const NOOP_VTABLE: RawWakerVTable = RawWakerVTable::new( + // `clone` returns the same noop waker again + |_: *const ()| NOOP_RAW_WAKER, + // `wake`, `wake_by_ref`, and `drop` do nothing + |_: *const ()| {}, + |_: *const ()| {}, + |_: *const ()| {}, + ); + + // SAFETY: all functions in the vtable are safe to call, and Waker's safety does not require + // them to actually do anything. + unsafe { Waker::from_raw(NOOP_RAW_WAKER) } +} + +/// Poll the future once and return `Some` if it is ready, else `None`. +/// +/// If the future wasn't ready, it future likely can't be driven to completion any more: the polling +/// uses a no-op waker, so knowledge of what the pending future was waiting for is lost. +#[cfg(feature = "client")] +pub(crate) fn now_or_never(fut: F) -> Option { + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + // TODO: replace with std::pin::pin! and drop pin-utils once MSRV >= 1.68 + pin_utils::pin_mut!(fut); + match fut.poll(&mut cx) { + Poll::Ready(res) => Some(res), + Poll::Pending => None, + } +} diff --git a/src/common/time.rs b/src/common/time.rs index a8d3cc9c85..8055cb364b 100644 --- a/src/common/time.rs +++ b/src/common/time.rs @@ -40,7 +40,7 @@ impl Time { } } - #[cfg(feature = "http1")] + #[cfg(all(feature = "server", feature = "http1"))] pub(crate) fn sleep_until(&self, deadline: Instant) -> Pin> { match *self { Time::Empty => { diff --git a/src/common/watch.rs b/src/common/watch.rs index ba17d551cb..81acd5df70 100644 --- a/src/common/watch.rs +++ b/src/common/watch.rs @@ -4,7 +4,7 @@ //! - The consumer is only notified if the value is different. //! - The value `0` is reserved for closed. -use futures_util::task::AtomicWaker; +use atomic_waker::AtomicWaker; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index bea8faa221..7c168e005e 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -10,7 +10,7 @@ use std::time::{Duration, Instant}; use crate::rt::{Read, Write}; use bytes::{Buf, Bytes}; -use futures_util::ready; +use futures_core::ready; use http::header::{HeaderValue, CONNECTION, TE}; use http::{HeaderMap, Method, Version}; use http_body::Frame; diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index dd293e1228..bcc43313d1 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -4,7 +4,7 @@ use std::io; use std::task::{Context, Poll}; use bytes::{BufMut, Bytes, BytesMut}; -use futures_util::ready; +use futures_core::ready; use http::{HeaderMap, HeaderName, HeaderValue}; use http_body::Frame; diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 4d921a3b83..5daeb5ebf6 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -8,7 +8,7 @@ use std::{ use crate::rt::{Read, Write}; use bytes::{Buf, Bytes}; -use futures_util::ready; +use futures_core::ready; use http::Request; use super::{Http1Transaction, Wants}; diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index d5afba683a..6a30d73c84 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -6,7 +6,7 @@ use std::task::{Context, Poll}; use crate::rt::{Read, ReadBuf, Write}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures_util::ready; +use futures_core::ready; use super::{Http1Transaction, ParseContext, ParsedMessage}; use crate::common::buf::BufList; diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index faf36dde7f..3860a5afaf 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -11,9 +11,7 @@ use crate::rt::{Read, Write}; use bytes::Bytes; use futures_channel::mpsc::{Receiver, Sender}; use futures_channel::{mpsc, oneshot}; -use futures_util::future::{Either, FusedFuture, FutureExt as _}; -use futures_util::ready; -use futures_util::stream::{StreamExt as _, StreamFuture}; +use futures_core::{ready, FusedFuture, FusedStream, Stream}; use h2::client::{Builder, Connection, SendRequest}; use h2::SendStream; use http::{Method, StatusCode}; @@ -23,6 +21,7 @@ use super::ping::{Ponger, Recorder}; use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; use crate::body::{Body, Incoming as IncomingBody}; use crate::client::dispatch::{Callback, SendWhen, TrySendError}; +use crate::common::either::Either; use crate::common::io::Compat; use crate::common::time::Time; use crate::ext::Protocol; @@ -164,11 +163,9 @@ where // 'Client' has been dropped. This is to get around a bug // in h2 where dropping all SendRequests won't notify a // parked Connection. - let (conn_drop_ref, rx) = mpsc::channel(1); + let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1); let (cancel_tx, conn_eof) = oneshot::channel(); - let conn_drop_rx = rx.into_future(); - let ping_config = new_ping_config(config); let (conn, ping) = if ping_config.is_enabled() { @@ -176,9 +173,9 @@ where let (recorder, ponger) = ping::channel(pp, ping_config, timer); let conn: Conn<_, B> = Conn::new(ponger, conn); - (Either::Left(conn), recorder) + (Either::left(conn), recorder) } else { - (Either::Right(conn), ping::disabled()) + (Either::right(conn), ping::disabled()) }; let conn: ConnMapErr = ConnMapErr { conn, @@ -305,7 +302,7 @@ pin_project! { T: Unpin, { #[pin] - drop_rx: StreamFuture>, + drop_rx: Receiver, #[pin] cancel_tx: Option>, #[pin] @@ -320,7 +317,7 @@ where { fn new( conn: ConnMapErr, - drop_rx: StreamFuture>, + drop_rx: Receiver, cancel_tx: oneshot::Sender, ) -> Self { Self { @@ -341,12 +338,12 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); - if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() { + if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() { // ok or err, the `conn` has finished. return Poll::Ready(()); } - if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() { + if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() { // mpsc has been dropped, hopefully polling // the connection some more should start shutdown // and then close. @@ -468,7 +465,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { let mut this = self.project(); - match this.pipe.poll_unpin(cx) { + match Pin::new(&mut this.pipe).poll(cx) { Poll::Ready(result) => { if let Err(_e) = result { debug!("client request body error: {}", _e); diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index adb6de87f9..73bda4c224 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; -use futures_util::ready; +use futures_core::ready; use h2::{Reason, RecvStream, SendStream}; use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE}; use http::HeaderMap; diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index a8a20dd68e..7995b349bf 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -5,7 +5,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use bytes::Bytes; -use futures_util::ready; +use futures_core::ready; use h2::server::{Connection, Handshake, SendResponse}; use h2::{Reason, RecvStream}; use http::{Method, Request}; diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index af703018c5..881c29a4be 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -11,7 +11,7 @@ use std::time::Duration; use crate::rt::{Read, Write}; use crate::upgrade::Upgraded; use bytes::Bytes; -use futures_util::ready; +use futures_core::ready; use crate::body::{Body, Incoming as IncomingBody}; use crate::proto; @@ -179,7 +179,7 @@ where /// This errors if the underlying connection protocol is not HTTP/1. pub fn without_shutdown(self) -> impl Future>> { let mut zelf = Some(self); - futures_util::future::poll_fn(move |cx| { + crate::common::future::poll_fn(move |cx| { ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?; Poll::Ready(Ok(zelf.take().unwrap().into_parts())) }) diff --git a/src/server/conn/http2.rs b/src/server/conn/http2.rs index e0d61c13a6..0452109c3c 100644 --- a/src/server/conn/http2.rs +++ b/src/server/conn/http2.rs @@ -9,7 +9,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use crate::rt::{Read, Write}; -use futures_util::ready; +use futures_core::ready; use pin_project_lite::pin_project; use crate::body::{Body, Incoming as IncomingBody};