Skip to content

Commit b3e5506

Browse files
committed
feat(server): introduce Accept trait
The `Accept` trait is used by the server types to asynchronously accept incoming connections. This replaces the previous usage of `Stream`. BREAKING CHANGE: Passing a `Stream` to `Server::builder` or `Http::serve_incoming` must be changed to pass an `Accept` instead. The `stream` optional feature can be enabled, and the a stream can be converted using `hyper::server::accept::from_stream`.
1 parent 0867ad5 commit b3e5506

File tree

7 files changed

+189
-71
lines changed

7 files changed

+189
-71
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ runtime = [
6969
"tokio-net",
7070
"tokio-timer",
7171
]
72+
73+
# unstable features
74+
stream = []
75+
76+
# internal features used in CI
7277
nightly = []
7378
__internal_flaky_tests = []
7479
__internal_happy_eyeballs_tests = []

src/server/accept.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
//! The `Accept` trait and supporting types.
2+
//!
3+
//! This module contains:
4+
//!
5+
//! - The [`Accept`](Accept) trait used to asynchronously accept incoming
6+
//! connections.
7+
//! - Utilities like `poll_fn` to ease creating a custom `Accept`.
8+
9+
#[cfg(feature = "stream")]
10+
use futures_core::Stream;
11+
12+
use crate::common::{Pin, task::{self, Poll}};
13+
14+
/// Asynchronously accept incoming connections.
15+
pub trait Accept {
16+
/// The connection type that can be accepted.
17+
type Conn;
18+
/// The error type that can occur when accepting a connection.
19+
type Error;
20+
21+
/// Poll to accept the next connection.
22+
fn poll_accept(self: Pin<&mut Self>, cx: &mut task::Context<'_>)
23+
-> Poll<Option<Result<Self::Conn, Self::Error>>>;
24+
}
25+
26+
/// Create an `Accept` with a polling function.
27+
///
28+
/// # Example
29+
///
30+
/// ```
31+
/// use std::task::Poll;
32+
/// use hyper::server::{accept, Server};
33+
///
34+
/// # let mock_conn = ();
35+
/// // If we created some mocked connection...
36+
/// let mut conn = Some(mock_conn);
37+
///
38+
/// // And accept just the mocked conn once...
39+
/// let once = accept::poll_fn(move |cx| {
40+
/// Poll::Ready(conn.take().map(Ok::<_, ()>))
41+
/// });
42+
///
43+
/// let builder = Server::builder(once);
44+
/// ```
45+
pub fn poll_fn<F, IO, E>(func: F) -> impl Accept<Conn = IO, Error = E>
46+
where
47+
F: FnMut(&mut task::Context<'_>) -> Poll<Option<Result<IO, E>>>,
48+
{
49+
struct PollFn<F>(F);
50+
51+
impl<F, IO, E> Accept for PollFn<F>
52+
where
53+
F: FnMut(&mut task::Context<'_>) -> Poll<Option<Result<IO, E>>>,
54+
{
55+
type Conn = IO;
56+
type Error = E;
57+
fn poll_accept(self: Pin<&mut Self>, cx: &mut task::Context<'_>)
58+
-> Poll<Option<Result<Self::Conn, Self::Error>>>
59+
{
60+
unsafe {
61+
(self.get_unchecked_mut().0)(cx)
62+
}
63+
}
64+
}
65+
66+
PollFn(func)
67+
}
68+
69+
/// Adapt a `Stream` of incoming connections into an `Accept`.
70+
///
71+
/// # Unstable
72+
///
73+
/// This function requires enabling the unstable `stream` feature in your
74+
/// `Cargo.toml`.
75+
#[cfg(feature = "stream")]
76+
pub fn from_stream<S, IO, E>(stream: S) -> impl Accept<Conn = IO, Error = E>
77+
where
78+
S: Stream<Item = Result<IO, E>>,
79+
{
80+
struct FromStream<S>(S);
81+
82+
impl<S, IO, E> Accept for FromStream<S>
83+
where
84+
S: Stream<Item = Result<IO, E>>,
85+
{
86+
type Conn = IO;
87+
type Error = E;
88+
fn poll_accept(self: Pin<&mut Self>, cx: &mut task::Context<'_>)
89+
-> Poll<Option<Result<Self::Conn, Self::Error>>>
90+
{
91+
unsafe {
92+
Pin::new_unchecked(&mut self.get_unchecked_mut().0)
93+
.poll_next(cx)
94+
}
95+
}
96+
}
97+
98+
FromStream(stream)
99+
}

src/server/conn.rs

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::error::{Kind, Parse};
2828
use crate::proto;
2929
use crate::service::{MakeServiceRef, Service};
3030
use crate::upgrade::Upgraded;
31+
use super::Accept;
3132

3233
pub(super) use self::spawn_all::NoopWatcher;
3334
use self::spawn_all::NewSvcTask;
@@ -403,13 +404,10 @@ impl<E> Http<E> {
403404
}
404405
}
405406

406-
/// Bind the provided `addr` with the default `Handle` and return [`Serve`](Serve).
407-
///
408-
/// This method will bind the `addr` provided with a new TCP listener ready
409-
/// to accept connections. Each connection will be processed with the
410-
/// `make_service` object provided, creating a new service per
411-
/// connection.
412407
#[cfg(feature = "runtime")]
408+
#[doc(hidden)]
409+
#[deprecated]
410+
#[allow(deprecated)]
413411
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, make_service: S) -> crate::Result<Serve<AddrIncoming, S, E>>
414412
where
415413
S: MakeServiceRef<
@@ -428,13 +426,10 @@ impl<E> Http<E> {
428426
Ok(self.serve_incoming(incoming, make_service))
429427
}
430428

431-
/// Bind the provided `addr` with the `Handle` and return a [`Serve`](Serve)
432-
///
433-
/// This method will bind the `addr` provided with a new TCP listener ready
434-
/// to accept connections. Each connection will be processed with the
435-
/// `make_service` object provided, creating a new service per
436-
/// connection.
437429
#[cfg(feature = "runtime")]
430+
#[doc(hidden)]
431+
#[deprecated]
432+
#[allow(deprecated)]
438433
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, make_service: S) -> crate::Result<Serve<AddrIncoming, S, E>>
439434
where
440435
S: MakeServiceRef<
@@ -453,10 +448,11 @@ impl<E> Http<E> {
453448
Ok(self.serve_incoming(incoming, make_service))
454449
}
455450

456-
/// Bind the provided stream of incoming IO objects with a `MakeService`.
451+
#[doc(hidden)]
452+
#[deprecated]
457453
pub fn serve_incoming<I, IO, IE, S, Bd>(&self, incoming: I, make_service: S) -> Serve<I, S, E>
458454
where
459-
I: Stream<Item = Result<IO, IE>>,
455+
I: Accept<Conn=IO, Error=IE>,
460456
IE: Into<Box<dyn StdError + Send + Sync>>,
461457
IO: AsyncRead + AsyncWrite + Unpin,
462458
S: MakeServiceRef<
@@ -678,13 +674,6 @@ where
678674
// ===== impl Serve =====
679675

680676
impl<I, S, E> Serve<I, S, E> {
681-
/// Spawn all incoming connections onto the executor in `Http`.
682-
pub(super) fn spawn_all(self) -> SpawnAll<I, S, E> {
683-
SpawnAll {
684-
serve: self,
685-
}
686-
}
687-
688677
/// Get a reference to the incoming stream.
689678
#[inline]
690679
pub fn incoming_ref(&self) -> &I {
@@ -696,22 +685,28 @@ impl<I, S, E> Serve<I, S, E> {
696685
pub fn incoming_mut(&mut self) -> &mut I {
697686
&mut self.incoming
698687
}
688+
689+
/// Spawn all incoming connections onto the executor in `Http`.
690+
pub(super) fn spawn_all(self) -> SpawnAll<I, S, E> {
691+
SpawnAll {
692+
serve: self,
693+
}
694+
}
699695
}
700696

701-
impl<I, IO, IE, S, B, E> Stream for Serve<I, S, E>
697+
698+
699+
700+
impl<I, IO, IE, S, B, E> Serve<I, S, E>
702701
where
703-
I: Stream<Item = Result<IO, IE>>,
702+
I: Accept<Conn=IO, Error=IE>,
704703
IO: AsyncRead + AsyncWrite + Unpin,
705704
IE: Into<Box<dyn StdError + Send + Sync>>,
706705
S: MakeServiceRef<IO, Body, ResBody=B>,
707-
//S::Error2: Into<Box<StdError + Send + Sync>>,
708-
//SME: Into<Box<StdError + Send + Sync>>,
709706
B: Payload,
710707
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
711708
{
712-
type Item = crate::Result<Connecting<IO, S::Future, E>>;
713-
714-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
709+
fn poll_next_(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> {
715710
match ready!(self.project().make_service.poll_ready_ref(cx)) {
716711
Ok(()) => (),
717712
Err(e) => {
@@ -720,7 +715,7 @@ where
720715
}
721716
}
722717

723-
if let Some(item) = ready!(self.project().incoming.poll_next(cx)) {
718+
if let Some(item) = ready!(self.project().incoming.poll_accept(cx)) {
724719
let io = item.map_err(crate::Error::new_accept)?;
725720
let new_fut = self.project().make_service.make_service_ref(&io);
726721
Poll::Ready(Some(Ok(Connecting {
@@ -734,6 +729,23 @@ where
734729
}
735730
}
736731

732+
// deprecated
733+
impl<I, IO, IE, S, B, E> Stream for Serve<I, S, E>
734+
where
735+
I: Accept<Conn=IO, Error=IE>,
736+
IO: AsyncRead + AsyncWrite + Unpin,
737+
IE: Into<Box<dyn StdError + Send + Sync>>,
738+
S: MakeServiceRef<IO, Body, ResBody=B>,
739+
B: Payload,
740+
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
741+
{
742+
type Item = crate::Result<Connecting<IO, S::Future, E>>;
743+
744+
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
745+
self.poll_next_(cx)
746+
}
747+
}
748+
737749
// ===== impl Connecting =====
738750

739751

@@ -772,7 +784,7 @@ impl<I, S, E> SpawnAll<I, S, E> {
772784

773785
impl<I, IO, IE, S, B, E> SpawnAll<I, S, E>
774786
where
775-
I: Stream<Item=Result<IO, IE>>,
787+
I: Accept<Conn=IO, Error=IE>,
776788
IE: Into<Box<dyn StdError + Send + Sync>>,
777789
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
778790
S: MakeServiceRef<
@@ -789,7 +801,7 @@ where
789801
W: Watcher<IO, S::Service, E>,
790802
{
791803
loop {
792-
if let Some(connecting) = ready!(self.project().serve.poll_next(cx)?) {
804+
if let Some(connecting) = ready!(self.project().serve.poll_next_(cx)?) {
793805
let fut = NewSvcTask::new(connecting, watcher.clone());
794806
self.project().serve.project().protocol.exec.execute_new_svc(fut)?;
795807
} else {

src/server/mod.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
//! # fn main() {}
4949
//! ```
5050
51+
pub mod accept;
5152
pub mod conn;
5253
mod shutdown;
5354
#[cfg(feature = "runtime")] mod tcp;
@@ -58,14 +59,14 @@ use std::fmt;
5859

5960
#[cfg(feature = "runtime")] use std::time::Duration;
6061

61-
use futures_core::Stream;
6262
use tokio_io::{AsyncRead, AsyncWrite};
6363
use pin_project::pin_project;
6464

6565
use crate::body::{Body, Payload};
6666
use crate::common::exec::{Exec, H2Exec, NewSvcExec};
6767
use crate::common::{Future, Pin, Poll, Unpin, task};
6868
use crate::service::{MakeServiceRef, Service};
69+
use self::accept::Accept;
6970
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
7071
// error that `hyper::server::Http` is private...
7172
use self::conn::{Http as Http_, NoopWatcher, SpawnAll};
@@ -143,7 +144,7 @@ impl<S> Server<AddrIncoming, S> {
143144

144145
impl<I, IO, IE, S, E, B> Server<I, S, E>
145146
where
146-
I: Stream<Item=Result<IO, IE>>,
147+
I: Accept<Conn=IO, Error=IE>,
147148
IE: Into<Box<dyn StdError + Send + Sync>>,
148149
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
149150
S: MakeServiceRef<IO, Body, ResBody=B>,
@@ -200,7 +201,7 @@ where
200201

201202
impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
202203
where
203-
I: Stream<Item=Result<IO, IE>>,
204+
I: Accept<Conn=IO, Error=IE>,
204205
IE: Into<Box<dyn StdError + Send + Sync>>,
205206
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
206207
S: MakeServiceRef<IO, Body, ResBody=B>,
@@ -380,17 +381,17 @@ impl<I, E> Builder<I, E> {
380381
/// // Finally, spawn `server` onto an Executor...
381382
/// # }
382383
/// ```
383-
pub fn serve<S, B, IO, IE>(self, new_service: S) -> Server<I, S, E>
384+
pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
384385
where
385-
I: Stream<Item=Result<IO, IE>>,
386-
IE: Into<Box<dyn StdError + Send + Sync>>,
387-
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
388-
S: MakeServiceRef<IO, Body, ResBody=B>,
386+
I: Accept,
387+
I::Error: Into<Box<dyn StdError + Send + Sync>>,
388+
I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
389+
S: MakeServiceRef<I::Conn, Body, ResBody=B>,
389390
S::Error: Into<Box<dyn StdError + Send + Sync>>,
390391
S::Service: 'static,
391392
B: Payload,
392393
B::Data: Unpin,
393-
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
394+
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
394395
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
395396
{
396397
let serve = self.protocol.serve_incoming(self.incoming, new_service);

src/server/shutdown.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::error::Error as StdError;
22

3-
use futures_core::Stream;
43
use tokio_io::{AsyncRead, AsyncWrite};
54
use pin_project::{pin_project, project};
65

@@ -9,6 +8,7 @@ use crate::common::drain::{self, Draining, Signal, Watch, Watching};
98
use crate::common::exec::{H2Exec, NewSvcExec};
109
use crate::common::{Future, Pin, Poll, Unpin, task};
1110
use crate::service::{MakeServiceRef, Service};
11+
use super::Accept;
1212
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
1313

1414
#[allow(missing_debug_implementations)]
@@ -46,7 +46,7 @@ impl<I, S, F, E> Graceful<I, S, F, E> {
4646

4747
impl<I, IO, IE, S, B, F, E> Future for Graceful<I, S, F, E>
4848
where
49-
I: Stream<Item=Result<IO, IE>>,
49+
I: Accept<Conn=IO, Error=IE>,
5050
IE: Into<Box<dyn StdError + Send + Sync>>,
5151
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
5252
S: MakeServiceRef<IO, Body, ResBody=B>,

0 commit comments

Comments
 (0)