Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ futures-core-preview = { version = "=0.3.0-alpha.18" }
futures-channel-preview = { version = "=0.3.0-alpha.18" }
futures-util-preview = { version = "=0.3.0-alpha.18" }
http = "0.1.15"
http-body = { git = "https://github.com/hyperium/http-body" }
http-body = "0.2.0-alpha.1"
httparse = "1.0"
h2 = { git = "https://github.com/hyperium/h2" }
iovec = "0.1"
Expand All @@ -38,6 +38,7 @@ pin-utils = "=0.1.0-alpha.4"
time = "0.1"
tokio = { version = "=0.2.0-alpha.4", optional = true, default-features = false, features = ["rt-full"] }
tower-service = "=0.3.0-alpha.1"
tower-make = { version = "0.1.0-alpha.2", features = ['io'] }
tokio-executor = { version = "=0.2.0-alpha.4", features = ["blocking"] }
tokio-io = "=0.2.0-alpha.4"
tokio-sync = "=0.2.0-alpha.4"
Expand Down
26 changes: 26 additions & 0 deletions examples/tower_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

use hyper::client::service::{Connect, Service, MakeService};
use hyper::client::conn::Builder;
use hyper::client::connect::HttpConnector;
use hyper::{Body, Request};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
pretty_env_logger::init();

let mut mk_svc = Connect::new(HttpConnector::new(), Builder::new());

let uri = "http://127.0.0.1:8080".parse::<http::Uri>()?;


let mut svc = mk_svc.make_service(uri.clone()).await?;

let body = Body::empty();

let req = Request::get(uri).body(body)?;
let res = svc.call(req).await?;

println!("RESPONSE={:?}", res);

Ok(())
}
18 changes: 11 additions & 7 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
use bytes::Bytes;
use futures_util::future::{self, Either, FutureExt as _};
use tokio_io::{AsyncRead, AsyncWrite};
use tower_service::Service;

use crate::body::Payload;
use crate::common::{Exec, Future, Pin, Poll, task};
Expand Down Expand Up @@ -242,18 +243,21 @@ where
}
}

/* TODO(0.12.0): when we change from tokio-service to tower.
impl<T, B> Service for SendRequest<T, B> {
type Request = Request<B>;
type Response = Response;
type Error = ::Error;
impl<B> Service<Request<B>> for SendRequest<B>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking we should rename this to Connection<B> and Background<T, B> for the old Connection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm undecided. Is there an issue with the current naming?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that Service1<Service2<Connection<Request>>> seems much better than Service1<Service2<SendRequest<Request>>> thats really it.

where
B: Payload + 'static, {
type Response = Response<Body>;
type Error = crate::Error;
type Future = ResponseFuture;

fn call(&self, req: Self::Request) -> Self::Future {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}

fn call(&mut self, req: Request<B>) -> Self::Future {
self.send_request(req)
}
}
*/

impl<B> fmt::Debug for SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
60 changes: 59 additions & 1 deletion src/client/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::mem;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;

use http::uri::Scheme;
use http::uri::{Scheme, Uri};
use futures_util::{TryFutureExt, FutureExt};
use net2::TcpBuilder;
use tokio_net::driver::Handle;
use tokio_net::tcp::TcpStream;
Expand Down Expand Up @@ -248,6 +249,63 @@ where
}
}

impl<R> tower_service::Service<Uri> for HttpConnector<R>
where
R: Resolve + Clone + Send + Sync + 'static,
R::Future: Send,
{
type Response = TcpStream;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}

fn call(&mut self, dst: Uri) -> Self::Future {
// TODO: return error here
let dst = Destination::try_from_uri(dst).unwrap();

trace!(
"Http::connect; scheme={}, host={}, port={:?}",
dst.scheme(),
dst.host(),
dst.port(),
);

if self.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url::<R>(InvalidUrl::NotHttp, &self.handle).map_ok(|(s, _)| s).boxed();
}
} else if dst.uri.scheme_part().is_none() {
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.handle).map_ok(|(s, _)| s).boxed();
}

let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.handle).map_ok(|(s, _)| s).boxed(),
};
let port = match dst.uri.port_part() {
Some(port) => port.as_u16(),
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
};

let fut = HttpConnecting {
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
handle: self.handle.clone(),
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
port,
reuse_address: self.reuse_address,
send_buffer_size: self.send_buffer_size,
recv_buffer_size: self.recv_buffer_size,
};

fut.map_ok(|(s, _)| s).boxed()
}
}

impl HttpInfo {
/// Get the remote address of the transport used.
pub fn remote_addr(&self) -> SocketAddr {
Expand Down
1 change: 1 addition & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub mod conn;
pub mod connect;
pub(crate) mod dispatch;
mod pool;
pub mod service;
#[cfg(test)]
mod tests;

Expand Down
85 changes: 85 additions & 0 deletions src/client/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//! Utilities used to interact with the Tower ecosystem.
//!
//! This module provides exports of `Service`, `MakeService` and `Connect` which
//! all provide hook-ins into the Tower ecosystem.

use super::conn::{SendRequest, Builder};
use std::marker::PhantomData;
use crate::{common::{Poll, task, Pin}, body::Payload};
use std::future::Future;
use std::error::Error as StdError;
use tower_make::MakeConnection;

pub use tower_service::Service;
pub use tower_make::MakeService;

/// Creates a connection via `SendRequest`.
///
/// This accepts a `hyper::client::conn::Builder` and provides
/// a `MakeService` implementation to create connections from some
/// target `T`.
#[derive(Debug)]
pub struct Connect<C, B, T> {
inner: C,
builder: Builder,
_pd: PhantomData<fn(T, B)>
}

impl<C, B, T> Connect<C, B, T> {
/// Create a new `Connect` with some inner connector `C` and a connection
/// builder.
pub fn new(inner: C, builder: Builder) -> Self {
Self {
inner,
builder,
_pd: PhantomData
}
}
}

impl<C, B, T> Service<T> for Connect<C, B, T>
where
C: MakeConnection<T>,
C::Connection: Unpin + Send + 'static,
C::Future: Send + 'static,
C::Error: Into<Box<dyn StdError + Send + Sync>> + Send,
B: Payload + Unpin + 'static,
B::Data: Unpin,
{
type Response = SendRequest<B>;
type Error = crate::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into()))
}

fn call(&mut self, req: T) -> Self::Future {
let builder = self.builder.clone();
let io = self.inner.make_connection(req);

let fut = async move {
match io.await {
Ok(io) => {
match builder.handshake(io).await {
Ok((sr, conn)) => {
builder.exec.execute(async move {
if let Err(e) = conn.await {
debug!("connection error: {:?}", e);
}
})?;
Ok(sr)
},
Err(e) => Err(e)
}
},
Err(e) => {
let err = crate::Error::new(crate::error::Kind::Connect).with(e.into());
Err(err)
}
}
};

Box::pin(fut)
}
}