Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 34 additions & 20 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::error::Error as StdError;
use std::fmt;
use std::io;
use std::mem;
use std::rc::Rc;
//use std::net::SocketAddr;

use futures::{Future, Poll, Async};
Expand All @@ -11,7 +12,7 @@ use tokio::net::{TcpStream, TcpStreamNew};
use tokio_service::Service;
use Uri;

use super::dns;
use super::dns::{self, DnsService};

/// A connector creates an Io to a remote address..
///
Expand Down Expand Up @@ -42,21 +43,34 @@ where T: Service<Request=Uri, Error=io::Error> + 'static,

/// A connector for the `http` scheme.
#[derive(Clone)]
pub struct HttpConnector {
dns: dns::Dns,
pub struct HttpConnector<D=dns::DnsPool> {
dns: Rc<D>,
enforce_http: bool,
handle: Handle,
}

impl HttpConnector {

/// Construct a new HttpConnector.
///
/// Takes number of DNS worker threads.
#[inline]
pub fn new(threads: usize, handle: &Handle) -> HttpConnector {
HttpConnector {
dns: dns::Dns::new(threads),
dns: Rc::new(dns::DnsPool::new(threads)),
enforce_http: true,
handle: handle.clone(),
}
}
}

impl<D> HttpConnector<D> {
/// Construct a new HttpConnector.
///
/// Takes a `DnsService`.
#[inline]
pub fn with_dns(dns: D, handle: &Handle) -> HttpConnector<D> {
HttpConnector {
dns: Rc::new(dns),
enforce_http: true,
handle: handle.clone(),
}
Expand All @@ -71,19 +85,19 @@ impl HttpConnector {
}
}

impl fmt::Debug for HttpConnector {
impl<D> fmt::Debug for HttpConnector<D> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("HttpConnector")
.finish()
}
}

impl Service for HttpConnector {
impl<D: DnsService> Service for HttpConnector<D> {
type Request = Uri;
type Response = TcpStream;
type Error = io::Error;
type Future = HttpConnecting;
type Future = HttpConnecting<D>;

fn call(&self, uri: Uri) -> Self::Future {
debug!("Http::connect({:?})", uri);
Expand Down Expand Up @@ -117,7 +131,7 @@ impl Service for HttpConnector {
}

#[inline]
fn invalid_url(err: InvalidUrl, handle: &Handle) -> HttpConnecting {
fn invalid_url<D: DnsService>(err: InvalidUrl, handle: &Handle) -> HttpConnecting<D> {
HttpConnecting {
state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))),
handle: handle.clone(),
Expand Down Expand Up @@ -148,19 +162,19 @@ impl StdError for InvalidUrl {
}

/// A Future representing work to connect to a URL.
pub struct HttpConnecting {
state: State,
pub struct HttpConnecting<D: DnsService = dns::DnsPool> {
state: State<D>,
handle: Handle,
}

enum State {
Lazy(dns::Dns, String, u16),
Resolving(dns::Query),
Connecting(ConnectingTcp),
enum State<D: DnsService> {
Lazy(Rc<D>, String, u16),
Resolving(<D as DnsService>::Future),
Connecting(ConnectingTcp<D>),
Error(Option<io::Error>),
}

impl Future for HttpConnecting {
impl<D: DnsService> Future for HttpConnecting<D> {
type Item = TcpStream;
type Error = io::Error;

Expand All @@ -177,7 +191,7 @@ impl Future for HttpConnecting {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
addrs: addrs.into_iter(),
current: None,
})
}
Expand All @@ -197,12 +211,12 @@ impl fmt::Debug for HttpConnecting {
}
}

struct ConnectingTcp {
addrs: dns::IpAddrs,
struct ConnectingTcp<D: DnsService> {
addrs: <D::Response as IntoIterator>::IntoIter,
current: Option<TcpStreamNew>,
}

impl ConnectingTcp {
impl<D: DnsService> ConnectingTcp<D> {
// not a Future, since passing a &Handle to poll
fn poll(&mut self, handle: &Handle) -> Poll<TcpStream, io::Error> {
let mut err = None;
Expand Down
56 changes: 47 additions & 9 deletions src/client/dns.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,61 @@
use std::fmt;
use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::vec;

use ::futures::{Future, Poll};
use ::futures_cpupool::{CpuPool, CpuFuture};

/// Trait for resolving a host and port into a set of ip addresses.
pub trait DnsService {
/// Response representing a set of ip addresses.
type Response: IntoIterator<Item=SocketAddr>;
/// Future being returned from `resolve`.
type Future: Future<Item=Self::Response, Error=io::Error>;

/// Resolves a host and port into a set of ip addresses.
fn resolve(&self, host: String, port: u16) -> Self::Future;
}

#[derive(Clone)]
pub struct Dns {
/// An implementation of a dns service using a thread pool.
pub struct DnsPool {
pool: CpuPool,
}

impl Dns {
pub fn new(threads: usize) -> Dns {
Dns {
impl DnsPool {
/// Create a new dns pool given a number of worker threads.
pub fn new(threads: usize) -> DnsPool {
DnsPool {
pool: CpuPool::new(threads)
}
}
}

impl fmt::Debug for DnsPool {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("DnsPool")
}
}

pub fn resolve(&self, host: String, port: u16) -> Query {
Query(self.pool.spawn_fn(move || work(host, port)))
impl DnsService for DnsPool {
type Response = IpAddrs;
type Future = DnsQuery;
fn resolve(&self, host: String, port: u16) -> Self::Future {
DnsQuery(self.pool.spawn_fn(move || work(host, port)))
}
}

pub struct Query(CpuFuture<IpAddrs, io::Error>);
/// A `Future` that will resolve to `IpAddrs`.
pub struct DnsQuery(CpuFuture<IpAddrs, io::Error>);

impl Future for Query {
impl fmt::Debug for DnsQuery {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("DnsQuery")
}
}

impl Future for DnsQuery {
type Item = IpAddrs;
type Error = io::Error;

Expand All @@ -33,10 +64,17 @@ impl Future for Query {
}
}

/// A set of ip addresses.
pub struct IpAddrs {
iter: vec::IntoIter<SocketAddr>,
}

impl fmt::Debug for IpAddrs {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("IpAddrs")
}
}

impl Iterator for IpAddrs {
type Item = SocketAddr;
#[inline]
Expand All @@ -45,7 +83,7 @@ impl Iterator for IpAddrs {
}
}

pub type Answer = io::Result<IpAddrs>;
type Answer = io::Result<IpAddrs>;

fn work(hostname: String, port: u16) -> Answer {
debug!("resolve {:?}:{:?}", hostname, port);
Expand Down
1 change: 1 addition & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use uri::{self, Uri};
pub use http::response::Response;
pub use http::request::Request;
pub use self::connect::{HttpConnector, Connect};
pub use self::dns::{DnsPool, DnsQuery, DnsService, IpAddrs};

mod connect;
mod dns;
Expand Down