Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ tokio = { version = "1", features = ["sync"] }

# Optional

atomic-waker = { version = "1.1.2", optional = true }
futures-channel = { version = "0.3", optional = true }
futures-util = { version = "0.3", default-features = false, optional = true }
futures-core = { version = "0.3.31", optional = true }
futures-util = { version = "0.3", default-features = false, features = ["alloc"], optional = true }
h2 = { version = "0.4.2", optional = true }
http-body-util = { version = "0.1", optional = true }
httparse = { version = "1.9", optional = true }
httpdate = { version = "1.0", optional = true }
itoa = { version = "1", optional = true }
pin-project-lite = { version = "0.2.4", optional = true }
pin-utils = { version = "0.1", optional = true } # TODO: replace with std::pin::pin! once MSRV >= 1.68
smallvec = { version = "1.12", features = ["const_generics", "const_new"], optional = true }
tracing = { version = "0.1", default-features = false, features = ["std"], optional = true }
want = { version = "0.3", optional = true }
Expand Down Expand Up @@ -77,15 +80,15 @@ full = [
]

# HTTP versions
http1 = ["dep:futures-channel", "dep:futures-util", "dep:httparse", "dep:itoa"]
http2 = ["dep:futures-channel", "dep:futures-util", "dep:h2"]
http1 = ["dep:atomic-waker", "dep:futures-channel", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"]
http2 = ["dep:futures-channel", "dep:futures-core", "dep:h2"]

# Client/Server
client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"]
server = ["dep:httpdate", "dep:pin-project-lite", "dep:smallvec"]

# C-API support (currently unstable (no semver))
ffi = ["dep:http-body-util", "futures-util?/alloc"]
ffi = ["dep:http-body-util", "futures-util"]
capi = []

# Utilize tracing (currently unstable)
Expand Down
4 changes: 2 additions & 2 deletions src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use futures_channel::{mpsc, oneshot};
any(feature = "http1", feature = "http2"),
any(feature = "client", feature = "server")
))]
use futures_util::ready;
use futures_core::ready;
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver
use futures_core::{stream::FusedStream, Stream}; // for mpsc::Receiver
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use http::HeaderMap;
use http_body::{Body, Frame, SizeHint};
Expand Down
6 changes: 3 additions & 3 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::task::{Context, Poll};

use crate::rt::{Read, Write};
use bytes::Bytes;
use futures_util::ready;
use futures_core::ready;
use http::{Request, Response};
use httparse::ParserConfig;

Expand Down Expand Up @@ -92,7 +92,7 @@ where
/// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
let mut conn = Some(self);
futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
crate::common::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
Poll::Ready(Ok(conn.take().unwrap().into_parts()))
})
Expand Down Expand Up @@ -148,7 +148,7 @@ impl<B> SendRequest<B> {
///
/// If the associated connection is closed, this returns an Error.
pub async fn ready(&mut self) -> crate::Result<()> {
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await
}

/// Checks if the connection is currently ready to send a request.
Expand Down
4 changes: 2 additions & 2 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::task::{Context, Poll};
use std::time::Duration;

use crate::rt::{Read, Write};
use futures_util::ready;
use futures_core::ready;
use http::{Request, Response};

use super::super::dispatch::{self, TrySendError};
Expand Down Expand Up @@ -99,7 +99,7 @@ impl<B> SendRequest<B> {
///
/// If the associated connection is closed, this returns an Error.
pub async fn ready(&mut self) -> crate::Result<()> {
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await
}

/// Checks if the connection is currently ready to send a request.
Expand Down
3 changes: 1 addition & 2 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ impl<T, U> Receiver<T, U> {

#[cfg(feature = "http1")]
pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
use futures_util::FutureExt;
match self.inner.recv().now_or_never() {
match crate::common::task::now_or_never(self.inner.recv()) {
Some(Some(mut env)) => env.0.take(),
_ => None,
}
Expand Down
46 changes: 46 additions & 0 deletions src/common/either.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

pin_project! {
/// One of two possible futures that have the same output type.
#[project = EitherProj]
pub(crate) enum Either<F1, F2> {
Left {
#[pin]
fut: F1
},
Right {
#[pin]
fut: F2,
},
}
}

impl<F1, F2> Either<F1, F2> {
pub(crate) fn left(fut: F1) -> Self {
Either::Left { fut }
}

pub(crate) fn right(fut: F2) -> Self {
Either::Right { fut }
}
}

impl<F1, F2> Future for Either<F1, F2>
where
F1: Future,
F2: Future<Output = F1::Output>,
{
type Output = F1::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
EitherProj::Left { fut } => fut.poll(cx),
EitherProj::Right { fut } => fut.poll(cx),
}
}
}
30 changes: 30 additions & 0 deletions src/common/future.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of all these polyfills but I included them for now to show it's possible (and what it costs) to do without MSRV bump. I'd also be happy to bump the MSRV if that's acceptable.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

// TODO: replace with `std::future::poll_fn` once MSRV >= 1.64
pub(crate) fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
PollFn { f }
}

pub(crate) struct PollFn<F> {
f: F,
}

impl<F> Unpin for PollFn<F> {}

impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
(self.as_mut().f)(cx)
}
}
7 changes: 7 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
pub(crate) mod buf;
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
pub(crate) mod date;
#[cfg(all(feature = "client", feature = "http2"))]
pub(crate) mod either;
#[cfg(any(
all(feature = "client", any(feature = "http1", feature = "http2")),
all(feature = "server", feature = "http1"),
))]
pub(crate) mod future;
pub(crate) mod io;
#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
pub(crate) mod task;
Expand Down
36 changes: 36 additions & 0 deletions src/common/task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::task::{Context, Poll};
#[cfg(feature = "client")]
use std::task::{RawWaker, RawWakerVTable, Waker};

/// A function to help "yield" a future, such that it is re-scheduled immediately.
///
Expand All @@ -7,3 +9,37 @@ pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll<std::convert::Infallible>
cx.waker().wake_by_ref();
Poll::Pending
}

// TODO: replace with `std::task::Waker::noop()` once MSRV >= 1.85
#[cfg(feature = "client")]
fn noop_waker() -> Waker {
const NOOP_RAW_WAKER: RawWaker = RawWaker::new(std::ptr::null(), &NOOP_VTABLE);
const NOOP_VTABLE: RawWakerVTable = RawWakerVTable::new(
// `clone` returns the same noop waker again
|_: *const ()| NOOP_RAW_WAKER,
// `wake`, `wake_by_ref`, and `drop` do nothing
|_: *const ()| {},
|_: *const ()| {},
|_: *const ()| {},
);

// SAFETY: all functions in the vtable are safe to call, and Waker's safety does not require
// them to actually do anything.
unsafe { Waker::from_raw(NOOP_RAW_WAKER) }
}

/// Poll the future once and return `Some` if it is ready, else `None`.
///
/// If the future wasn't ready, it future likely can't be driven to completion any more: the polling
/// uses a no-op waker, so knowledge of what the pending future was waiting for is lost.
#[cfg(feature = "client")]
pub(crate) fn now_or_never<F: std::future::Future>(fut: F) -> Option<F::Output> {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
// TODO: replace with std::pin::pin! and drop pin-utils once MSRV >= 1.68
pin_utils::pin_mut!(fut);
match fut.poll(&mut cx) {
Poll::Ready(res) => Some(res),
Poll::Pending => None,
}
}
2 changes: 1 addition & 1 deletion src/common/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Time {
}
}

#[cfg(feature = "http1")]
#[cfg(all(feature = "server", feature = "http1"))]
pub(crate) fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
match *self {
Time::Empty => {
Expand Down
2 changes: 1 addition & 1 deletion src/common/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! - The consumer is only notified if the value is different.
//! - The value `0` is reserved for closed.

use futures_util::task::AtomicWaker;
use atomic_waker::AtomicWaker;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand Down
2 changes: 1 addition & 1 deletion src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time::{Duration, Instant};

use crate::rt::{Read, Write};
use bytes::{Buf, Bytes};
use futures_util::ready;
use futures_core::ready;
use http::header::{HeaderValue, CONNECTION, TE};
use http::{HeaderMap, Method, Version};
use http_body::Frame;
Expand Down
2 changes: 1 addition & 1 deletion src/proto/h1/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io;
use std::task::{Context, Poll};

use bytes::{BufMut, Bytes, BytesMut};
use futures_util::ready;
use futures_core::ready;
use http::{HeaderMap, HeaderName, HeaderValue};
use http_body::Frame;

Expand Down
2 changes: 1 addition & 1 deletion src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use crate::rt::{Read, Write};
use bytes::{Buf, Bytes};
use futures_util::ready;
use futures_core::ready;
use http::Request;

use super::{Http1Transaction, Wants};
Expand Down
2 changes: 1 addition & 1 deletion src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::task::{Context, Poll};

use crate::rt::{Read, ReadBuf, Write};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures_util::ready;
use futures_core::ready;

use super::{Http1Transaction, ParseContext, ParsedMessage};
use crate::common::buf::BufList;
Expand Down
23 changes: 10 additions & 13 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ use crate::rt::{Read, Write};
use bytes::Bytes;
use futures_channel::mpsc::{Receiver, Sender};
use futures_channel::{mpsc, oneshot};
use futures_util::future::{Either, FusedFuture, FutureExt as _};
use futures_util::ready;
use futures_util::stream::{StreamExt as _, StreamFuture};
use futures_core::{ready, FusedFuture, FusedStream, Stream};
use h2::client::{Builder, Connection, SendRequest};
use h2::SendStream;
use http::{Method, StatusCode};
Expand All @@ -23,6 +21,7 @@ use super::ping::{Ponger, Recorder};
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
use crate::body::{Body, Incoming as IncomingBody};
use crate::client::dispatch::{Callback, SendWhen, TrySendError};
use crate::common::either::Either;
use crate::common::io::Compat;
use crate::common::time::Time;
use crate::ext::Protocol;
Expand Down Expand Up @@ -164,21 +163,19 @@ where
// 'Client' has been dropped. This is to get around a bug
// in h2 where dropping all SendRequests won't notify a
// parked Connection.
let (conn_drop_ref, rx) = mpsc::channel(1);
let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
let (cancel_tx, conn_eof) = oneshot::channel();

let conn_drop_rx = rx.into_future();

let ping_config = new_ping_config(config);

let (conn, ping) = if ping_config.is_enabled() {
let pp = conn.ping_pong().expect("conn.ping_pong");
let (recorder, ponger) = ping::channel(pp, ping_config, timer);

let conn: Conn<_, B> = Conn::new(ponger, conn);
(Either::Left(conn), recorder)
(Either::left(conn), recorder)
} else {
(Either::Right(conn), ping::disabled())
(Either::right(conn), ping::disabled())
};
let conn: ConnMapErr<T, B> = ConnMapErr {
conn,
Expand Down Expand Up @@ -305,7 +302,7 @@ pin_project! {
T: Unpin,
{
#[pin]
drop_rx: StreamFuture<Receiver<Infallible>>,
drop_rx: Receiver<Infallible>,
#[pin]
cancel_tx: Option<oneshot::Sender<Infallible>>,
#[pin]
Expand All @@ -320,7 +317,7 @@ where
{
fn new(
conn: ConnMapErr<T, B>,
drop_rx: StreamFuture<Receiver<Infallible>>,
drop_rx: Receiver<Infallible>,
cancel_tx: oneshot::Sender<Infallible>,
) -> Self {
Self {
Expand All @@ -341,12 +338,12 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
// ok or err, the `conn` has finished.
return Poll::Ready(());
}

if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
// mpsc has been dropped, hopefully polling
// the connection some more should start shutdown
// and then close.
Expand Down Expand Up @@ -468,7 +465,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let mut this = self.project();

match this.pipe.poll_unpin(cx) {
match Pin::new(&mut this.pipe).poll(cx) {
Poll::Ready(result) => {
if let Err(_e) = result {
debug!("client request body error: {}", _e);
Expand Down
2 changes: 1 addition & 1 deletion src/proto/h2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::{Buf, Bytes};
use futures_util::ready;
use futures_core::ready;
use h2::{Reason, RecvStream, SendStream};
use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
use http::HeaderMap;
Expand Down
2 changes: 1 addition & 1 deletion src/proto/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::task::{Context, Poll};
use std::time::Duration;

use bytes::Bytes;
use futures_util::ready;
use futures_core::ready;
use h2::server::{Connection, Handshake, SendResponse};
use h2::{Reason, RecvStream};
use http::{Method, Request};
Expand Down
Loading
Loading