Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ log = "0.4.11"
env_logger = "0.8.2"
futures = "0.3.8"
clap = "2.33.3"
async-trait = "0.1.42"

[dev-dependencies.rsocket_rust]
path = "../rsocket"

[dev-dependencies.rsocket_rust_transport_tcp]
path = "../rsocket-transport-tcp"
features = ["tls"]

[dev-dependencies.rsocket_rust_transport_websocket]
path = "../rsocket-transport-websocket"
Expand All @@ -41,3 +41,11 @@ path = "cli.rs"
[[example]]
name = "qps"
path = "qps.rs"

[[example]]
name = "tls-server"
path = "tls/server.rs"

[[example]]
name = "tls-client"
path = "tls/client.rs"
31 changes: 31 additions & 0 deletions examples/tls/cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-----BEGIN CERTIFICATE-----
MIIFUDCCAzigAwIBAgIJAJjWP27hY7PeMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMjEwMTA1MDMwODUzWhcNMjIwMTA1MDMwODUzWjBa
MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMRMwEQYDVQQDDApmb29iYXIuY29tMIICIjAN
BgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAtv10GWc14Kyu94jmT/a52HEdTf9f
gSFc9fTiGIcvsHwGe5HU5tMEEpVmVQqMeWHMgIunHpAVbmu9dhZ/r256QiR+bU8o
Zzv6ShJ6gdYAv/j2MEu7sHFpFHWEaZmPgqbREHioerS8AMJwNLcFF47EKUsKlbJy
Yn7rsfDl1Vir3I5l2VOORmjNOap/++CNJf39oGN8yx6+6YUT+lcUY2GkI8BkXLRI
ITTYOnAsjnwtk9k2sHQaCMfmlQYr5FCDTi2A4MEWfBecJ8Logbt+E02ZaaMj5pSW
Q4oqpZUEzhYhUgNUahy1Gfeso9BZsFj056dn8qiaa61tV1vtFsnk2bHEU9HTu/NA
TwTLsRhWVqfOA04zT2V9OPcaO+shiMsY7JzivLFnBBsPeTC43e5Js1OZSeRr6Dbq
8jJ61p+f5DbBRHUFl3oZ0pz+8xYpusDHHzVPqE5izKS3fz2FNyK+OjJwEsNmSpNc
X4+g3YeZj3Oq4fs/vGu3y1IZq9wIR1pBSRosW+SOpPd9PBWhbF43fUa0k2kSI7Xc
BkDQbEWxY0jEY4vzvNOA2C5A3eg8DzvDpjGE13GeSzk3DsObwmugHWZaz/TK0KeI
7LWDCHkUBizublZ+htUHDZxTU5mXWhaU2tU/udz+4rPVymns8RZ+Omm3uOC4PuQo
g+ezrcaZwyZptv0CAwEAAaMuMCwwEwYDVR0lBAwwCgYIKwYBBQUHAwEwFQYDVR0R
BA4wDIIKZm9vYmFyLmNvbTANBgkqhkiG9w0BAQsFAAOCAgEAKj6H77fLp5jL/oEJ
qglaOaM4OHU4FbQ3mTPLTjeVrvcnoxavigZC++0FwEkNm9FEfA8fDDNsVgK4CTof
Jdsq10hkHVE44Y5WLJha4o50dvUczj0k4wV5BaZ0cC2aYwdRXVkSYVNa90951pDq
FBy75ltWUWnQ8tibU4tZPgg2sbMzWkcRJ5bJfmEFVbB930x6ofllVZx2h9wZERBp
otgtJzQ1+P+MZ5By2nF59gpKURbRyS06y5emuhs/7UF2E/ETvMTiyBgcRiASYqGO
2bcxvZ6J6zo93D2gEAUKrP2QVsE0o3oTtc3N5ix1lXegdi9AM4AGnzb8uDkjyJHn
J9ibW1pWPYey2DjArbcq1uSKzXtC+YVEdS7k6X91ksYrFZgjAporyhJbqISq+sJO
1gaTXu1VRNRhFVfJQjNVfA0ar+RHcx5xetdJKRglekGqGcBQfTFMNhLcGfuY16pe
qRzF9gwCqnX0c+tIl32IxCS4g6Pj/LBn3qCGE4sycWUz6+mwjGopFWK2gBGpj4M0
Dvnv+c72QQKmijtpQX6tSn75MBhtG9hZD8jPpMyfIFfkhx71StZ4j1O+rvXQs7go
Nb9qJOug/jfSrDQl9D5FSlRaqX19fm7JxiK7+fSnh/WlMmBQfn/j/lJ/sSDcyAbb
KrQM79kml5Luw0QXo7yLtoYD93A=
-----END CERTIFICATE-----
35 changes: 35 additions & 0 deletions examples/tls/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#[macro_use]
extern crate log;

use rsocket_rust::prelude::*;
use rsocket_rust::Result;
use rsocket_rust_transport_tcp::tokio_native_tls::{native_tls, TlsConnector};
use rsocket_rust_transport_tcp::TlsClientTransport;

#[tokio::main]
async fn main() -> Result<()> {
env_logger::builder().format_timestamp_millis().init();

let pem = include_bytes!("cert.pem");
let cert = native_tls::Certificate::from_pem(pem)?;
let cx = native_tls::TlsConnector::builder()
.add_root_certificate(cert)
.build()?;
let cx = TlsConnector::from(cx);
let cli = RSocketFactory::connect()
.transport(TlsClientTransport::new(
"foobar.com".into(),
"127.0.0.1:4444".parse()?,
cx,
))
.start()
.await?;
let res = cli
.request_response(Payload::builder().set_data_utf8("hello").build())
.await?;
info!("response: {:?}", res);

cli.wait_for_close().await;

Ok(())
}
Binary file added examples/tls/identity.p12
Binary file not shown.
27 changes: 27 additions & 0 deletions examples/tls/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#[macro_use]
extern crate log;

use rsocket_rust::prelude::*;
use rsocket_rust::utils::EchoRSocket;
use rsocket_rust::Result;
use rsocket_rust_transport_tcp::tokio_native_tls::{native_tls, TlsAcceptor};
use rsocket_rust_transport_tcp::TlsServerTransport;

#[tokio::main]
async fn main() -> Result<()> {
env_logger::builder().format_timestamp_millis().init();

let der = include_bytes!("identity.p12");
let cert = native_tls::Identity::from_pkcs12(der, "mypass")?;
RSocketFactory::receive()
.acceptor(Box::new(|setup, _socket| {
info!("connection established: {:?}", setup);
Ok(Box::new(EchoRSocket))
}))
.transport(TlsServerTransport::new(
"127.0.0.1:4444".parse()?,
TlsAcceptor::from(native_tls::TlsAcceptor::builder(cert).build()?),
))
.serve()
.await
}
9 changes: 8 additions & 1 deletion rsocket-transport-tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ repository = "https://github.com/rsocket/rsocket-rust"
homepage = "https://github.com/rsocket/rsocket-rust"
description = "TCP RSocket transport implementation."

[features]
default = []
tls = ["tokio-native-tls"]

[dependencies]
log = "0.4.11"
futures = "0.3.8"
bytes = "0.6.0"
async-trait = "0.1.42"

[dependencies.rsocket_rust]
path = "../rsocket"
Expand All @@ -28,3 +31,7 @@ features = [ "rt", "rt-multi-thread", "net", "sync", "stream", "io-util", "macro
version = "0.5.1"
default-features = false
features = ["codec"]

[dependencies.tokio-native-tls]
optional = true
version = "0.2.0"
4 changes: 4 additions & 0 deletions rsocket-transport-tcp/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
mod tcp;
#[cfg(feature = "tls")]
mod tls;
mod uds;

pub use tcp::TcpClientTransport;
#[cfg(feature = "tls")]
pub use tls::TlsClientTransport;
pub use uds::UnixClientTransport;
2 changes: 1 addition & 1 deletion rsocket-transport-tcp/src/client/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{connection::TcpConnection, misc::parse_tcp_addr};
use async_trait::async_trait;
use rsocket_rust::async_trait;
use rsocket_rust::{error::RSocketError, transport::Transport, Result};
use std::net::SocketAddr;
use tokio::net::TcpStream;
Expand Down
50 changes: 50 additions & 0 deletions rsocket-transport-tcp/src/client/tls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::connection::TlsConnection;
use rsocket_rust::async_trait;
use rsocket_rust::{error::RSocketError, transport::Transport, Result};
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio_native_tls::{TlsConnector, TlsStream};

#[derive(Debug)]
enum Connector {
Direct(TlsStream<TcpStream>),
Lazy(String, SocketAddr, TlsConnector),
}

pub struct TlsClientTransport {
connector: Connector,
}

impl TlsClientTransport {
pub fn new(domain: String, addr: SocketAddr, connector: TlsConnector) -> Self {
Self {
connector: Connector::Lazy(domain, addr, connector),
}
}
}

#[async_trait]
impl Transport for TlsClientTransport {
type Conn = TlsConnection;

async fn connect(self) -> Result<Self::Conn> {
match self.connector {
Connector::Direct(stream) => Ok(TlsConnection::from(stream)),
Connector::Lazy(domain, addr, cx) => match TcpStream::connect(addr).await {
Ok(stream) => match cx.connect(&domain, stream).await {
Ok(stream) => Ok(TlsConnection::from(stream)),
Err(e) => Err(RSocketError::Other(e.into()).into()),
},
Err(e) => Err(RSocketError::IO(e).into()),
},
}
}
}

impl From<TlsStream<TcpStream>> for TlsClientTransport {
fn from(stream: TlsStream<TcpStream>) -> Self {
Self {
connector: Connector::Direct(stream),
}
}
}
2 changes: 1 addition & 1 deletion rsocket-transport-tcp/src/client/uds.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::connection::UnixConnection;
use crate::misc::parse_uds_addr;
use async_trait::async_trait;
use rsocket_rust::async_trait;
use rsocket_rust::{error::RSocketError, transport::Transport, Result};
use tokio::net::UnixStream;

Expand Down
4 changes: 4 additions & 0 deletions rsocket-transport-tcp/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
mod codec;
mod tcp;
#[cfg(feature = "tls")]
mod tls;
mod uds;

pub use tcp::TcpConnection;
#[cfg(feature = "tls")]
pub use tls::TlsConnection;
pub use uds::UnixConnection;
2 changes: 1 addition & 1 deletion rsocket-transport-tcp/src/connection/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::codec::LengthBasedFrameCodec;
use async_trait::async_trait;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use rsocket_rust::async_trait;
use rsocket_rust::frame::Frame;
use rsocket_rust::transport::{Connection, Reader, Writer};
use rsocket_rust::{error::RSocketError, Result};
Expand Down
64 changes: 64 additions & 0 deletions rsocket-transport-tcp/src/connection/tls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use super::codec::LengthBasedFrameCodec;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use rsocket_rust::async_trait;
use rsocket_rust::frame::Frame;
use rsocket_rust::transport::{Connection, Reader, Writer};
use rsocket_rust::{error::RSocketError, Result};
use tokio::net::TcpStream;
use tokio_native_tls::TlsStream;
use tokio_util::codec::Framed;

#[derive(Debug)]
pub struct TlsConnection {
stream: TlsStream<TcpStream>,
}

struct InnerWriter {
sink: SplitSink<Framed<TlsStream<TcpStream>, LengthBasedFrameCodec>, Frame>,
}

struct InnerReader {
stream: SplitStream<Framed<TlsStream<TcpStream>, LengthBasedFrameCodec>>,
}

#[async_trait]
impl Writer for InnerWriter {
async fn write(&mut self, frame: Frame) -> Result<()> {
match self.sink.send(frame).await {
Ok(()) => Ok(()),
Err(e) => Err(RSocketError::IO(e).into()),
}
}
}

#[async_trait]
impl Reader for InnerReader {
async fn read(&mut self) -> Option<Result<Frame>> {
self.stream
.next()
.await
.map(|next| next.map_err(|e| RSocketError::IO(e).into()))
}
}

impl Connection for TlsConnection {
fn split(
self,
) -> (
Box<dyn Writer + Send + Unpin>,
Box<dyn Reader + Send + Unpin>,
) {
let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split();
(
Box::new(InnerWriter { sink }),
Box::new(InnerReader { stream }),
)
}
}

impl From<TlsStream<TcpStream>> for TlsConnection {
fn from(stream: TlsStream<TcpStream>) -> Self {
Self { stream }
}
}
2 changes: 1 addition & 1 deletion rsocket-transport-tcp/src/connection/uds.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::codec::LengthBasedFrameCodec;
use async_trait::async_trait;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use rsocket_rust::async_trait;
use rsocket_rust::frame::Frame;
use rsocket_rust::transport::{Connection, Reader, Writer};
use rsocket_rust::{error::RSocketError, Result};
Expand Down
9 changes: 9 additions & 0 deletions rsocket-transport-tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@ mod server;
pub use client::{TcpClientTransport, UnixClientTransport};
pub use connection::{TcpConnection, UnixConnection};
pub use server::{TcpServerTransport, UnixServerTransport};

#[cfg(feature = "tls")]
pub use client::TlsClientTransport;
#[cfg(feature = "tls")]
pub use connection::TlsConnection;
#[cfg(feature = "tls")]
pub use server::TlsServerTransport;
#[cfg(feature = "tls")]
pub use tokio_native_tls;
5 changes: 5 additions & 0 deletions rsocket-transport-tcp/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
mod tcp;

#[cfg(feature = "tls")]
mod tls;
mod uds;

pub use tcp::TcpServerTransport;
#[cfg(feature = "tls")]
pub use tls::TlsServerTransport;
pub use uds::UnixServerTransport;
7 changes: 2 additions & 5 deletions rsocket-transport-tcp/src/server/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{client::TcpClientTransport, misc::parse_tcp_addr};
use async_trait::async_trait;
use rsocket_rust::async_trait;
use rsocket_rust::{error::RSocketError, transport::ServerTransport, Result};
use std::net::SocketAddr;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -40,10 +40,7 @@ impl ServerTransport for TcpServerTransport {
async fn next(&mut self) -> Option<Result<Self::Item>> {
match self.listener.as_mut() {
Some(listener) => match listener.accept().await {
Ok((socket, _)) => {
let tp = TcpClientTransport::from(socket);
Some(Ok(tp))
}
Ok((socket, _)) => Some(Ok(TcpClientTransport::from(socket))),
Err(e) => Some(Err(RSocketError::IO(e).into())),
},
None => None,
Expand Down
Loading