Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ path = "tls/server.rs"

[[example]]
name = "tls-client"
path = "tls/client.rs"
path = "tls/client.rs"
3 changes: 0 additions & 3 deletions rsocket-test/tests/test_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ fn test_websocket() {
exec_request_response(&cli).await;
exec_request_stream(&cli).await;
exec_request_channel(&cli).await;
cli.close();
});
}

Expand Down Expand Up @@ -120,7 +119,6 @@ fn test_tcp() {
exec_request_response(&cli).await;
exec_request_stream(&cli).await;
exec_request_channel(&cli).await;
cli.close();
});
}

Expand Down Expand Up @@ -176,7 +174,6 @@ fn test_unix() {
exec_request_response(&cli).await;
exec_request_stream(&cli).await;
exec_request_channel(&cli).await;
cli.close();
});
}

Expand Down
50 changes: 8 additions & 42 deletions rsocket-transport-tcp/src/connection/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,23 @@
use super::codec::LengthBasedFrameCodec;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use rsocket_rust::async_trait;
use rsocket_rust::frame::Frame;
use rsocket_rust::transport::{Connection, Reader, Writer};
use rsocket_rust::{error::RSocketError, Result};
use tokio::net::TcpStream;
use tokio_util::codec::Framed;

use rsocket_rust::error::RSocketError;
use rsocket_rust::transport::{Connection, FrameSink, FrameStream};

use super::codec::LengthBasedFrameCodec;

#[derive(Debug)]
pub struct TcpConnection {
stream: TcpStream,
}

struct InnerWriter {
sink: SplitSink<Framed<TcpStream, LengthBasedFrameCodec>, Frame>,
}

struct InnerReader {
stream: SplitStream<Framed<TcpStream, LengthBasedFrameCodec>>,
}

#[async_trait]
impl Writer for InnerWriter {
async fn write(&mut self, frame: Frame) -> Result<()> {
match self.sink.send(frame).await {
Ok(()) => Ok(()),
Err(e) => Err(RSocketError::IO(e).into()),
}
}
}

#[async_trait]
impl Reader for InnerReader {
async fn read(&mut self) -> Option<Result<Frame>> {
self.stream
.next()
.await
.map(|next| next.map_err(|e| RSocketError::IO(e).into()))
}
}

impl Connection for TcpConnection {
fn split(
self,
) -> (
Box<dyn Writer + Send + Unpin>,
Box<dyn Reader + Send + Unpin>,
) {
fn split(self) -> (Box<FrameSink>, Box<FrameStream>) {
let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split();
(
Box::new(InnerWriter { sink }),
Box::new(InnerReader { stream }),
Box::new(sink.sink_map_err(|e| RSocketError::Other(e.into()))),
Box::new(stream.map(|next| next.map_err(|e| RSocketError::Other(e.into())))),
)
}
}
Expand Down
46 changes: 5 additions & 41 deletions rsocket-transport-tcp/src/connection/tls.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use super::codec::LengthBasedFrameCodec;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use rsocket_rust::async_trait;
use rsocket_rust::frame::Frame;
use rsocket_rust::transport::{Connection, Reader, Writer};
use rsocket_rust::{error::RSocketError, Result};
use rsocket_rust::error::RSocketError;
use rsocket_rust::transport::{Connection, FrameSink, FrameStream};
use tokio::net::TcpStream;
use tokio_native_tls::TlsStream;
use tokio_util::codec::Framed;
Expand All @@ -14,45 +11,12 @@ pub struct TlsConnection {
stream: TlsStream<TcpStream>,
}

struct InnerWriter {
sink: SplitSink<Framed<TlsStream<TcpStream>, LengthBasedFrameCodec>, Frame>,
}

struct InnerReader {
stream: SplitStream<Framed<TlsStream<TcpStream>, LengthBasedFrameCodec>>,
}

#[async_trait]
impl Writer for InnerWriter {
async fn write(&mut self, frame: Frame) -> Result<()> {
match self.sink.send(frame).await {
Ok(()) => Ok(()),
Err(e) => Err(RSocketError::IO(e).into()),
}
}
}

#[async_trait]
impl Reader for InnerReader {
async fn read(&mut self) -> Option<Result<Frame>> {
self.stream
.next()
.await
.map(|next| next.map_err(|e| RSocketError::IO(e).into()))
}
}

impl Connection for TlsConnection {
fn split(
self,
) -> (
Box<dyn Writer + Send + Unpin>,
Box<dyn Reader + Send + Unpin>,
) {
fn split(self) -> (Box<FrameSink>, Box<FrameStream>) {
let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split();
(
Box::new(InnerWriter { sink }),
Box::new(InnerReader { stream }),
Box::new(sink.sink_map_err(|e| RSocketError::Other(e.into()))),
Box::new(stream.map(|it| it.map_err(|e| RSocketError::Other(e.into())))),
)
}
}
Expand Down
46 changes: 5 additions & 41 deletions rsocket-transport-tcp/src/connection/uds.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use super::codec::LengthBasedFrameCodec;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use rsocket_rust::async_trait;
use rsocket_rust::frame::Frame;
use rsocket_rust::transport::{Connection, Reader, Writer};
use rsocket_rust::{error::RSocketError, Result};
use rsocket_rust::error::RSocketError;
use rsocket_rust::transport::{Connection, FrameSink, FrameStream};
use tokio::net::UnixStream;
use tokio_util::codec::Framed;

Expand All @@ -13,49 +10,16 @@ pub struct UnixConnection {
stream: UnixStream,
}

struct InnerWriter {
sink: SplitSink<Framed<UnixStream, LengthBasedFrameCodec>, Frame>,
}

struct InnerReader {
stream: SplitStream<Framed<UnixStream, LengthBasedFrameCodec>>,
}

impl Connection for UnixConnection {
fn split(
self,
) -> (
Box<dyn Writer + Send + Unpin>,
Box<dyn Reader + Send + Unpin>,
) {
fn split(self) -> (Box<FrameSink>, Box<FrameStream>) {
let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split();
(
Box::new(InnerWriter { sink }),
Box::new(InnerReader { stream }),
Box::new(sink.sink_map_err(|e| RSocketError::Other(e.into()))),
Box::new(stream.map(|it| it.map_err(|e| RSocketError::Other(e.into())))),
)
}
}

#[async_trait]
impl Writer for InnerWriter {
async fn write(&mut self, frame: Frame) -> Result<()> {
match self.sink.send(frame).await {
Ok(()) => Ok(()),
Err(e) => Err(RSocketError::IO(e).into()),
}
}
}

#[async_trait]
impl Reader for InnerReader {
async fn read(&mut self) -> Option<Result<Frame>> {
self.stream
.next()
.await
.map(|it| it.map_err(|e| RSocketError::IO(e).into()))
}
}

impl From<UnixStream> for UnixConnection {
fn from(stream: UnixStream) -> UnixConnection {
UnixConnection { stream }
Expand Down
5 changes: 3 additions & 2 deletions rsocket-transport-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ description = "WASM Websocket RSocket transport implementation."
[dependencies]
bytes = "0.6.0"
wasm-bindgen-futures = "0.4.19"
futures-channel = "0.3.8"
futures-util = "0.3.8"
futures-channel = "0.3.9"
futures-util = "0.3.9"
js-sys = "0.3.46"
serde = "1.0.118"
serde_derive = "1.0.118"
async-trait = "0.1.42"
log = "0.4.11"

[dependencies.rsocket_rust]
path = "../rsocket"
Expand Down
14 changes: 2 additions & 12 deletions rsocket-transport-wasm/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use web_sys::{ErrorEvent, Event, FileReader, MessageEvent, ProgressEvent, WebSocket};

macro_rules! console_log {
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
}

#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = console)]
fn log(s: &str);
}

pub struct WebsocketClientTransport {
url: String,
}
Expand Down Expand Up @@ -104,15 +94,15 @@ impl Transport for WebsocketClientTransport {

// on error
let on_error = Closure::wrap(Box::new(move |e: ErrorEvent| {
console_log!("websocket error: {}", e.message());
log::error!("websocket error: {}", e.message());
})
as Box<dyn FnMut(ErrorEvent)>);
ws.set_onerror(Some(on_error.as_ref().unchecked_ref()));
on_error.forget();

// on_close
let on_close = Closure::once(Box::new(move |_e: Event| {
console_log!("websocket closed");
log::info!("websocket closed");
}) as Box<dyn FnMut(Event)>);
ws.set_onclose(Some(on_close.as_ref().unchecked_ref()));
on_close.forget();
Expand Down
41 changes: 5 additions & 36 deletions rsocket-transport-wasm/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,25 @@
use async_trait::async_trait;
use futures_channel::mpsc;
use futures_util::{SinkExt, StreamExt};
use rsocket_rust::transport::{Connection, Reader, Writer};
use rsocket_rust::{error::RSocketError, frame::Frame, Result};
use rsocket_rust::transport::{Connection, FrameSink, FrameStream};
use rsocket_rust::{error::RSocketError, frame::Frame};

#[derive(Debug)]
pub struct WebsocketConnection {
rx: mpsc::Receiver<Frame>,
tx: mpsc::Sender<Frame>,
}

struct InnerWriter {
tx: mpsc::Sender<Frame>,
}

struct InnerReader {
rx: mpsc::Receiver<Frame>,
}

impl WebsocketConnection {
pub(crate) fn new(tx: mpsc::Sender<Frame>, rx: mpsc::Receiver<Frame>) -> WebsocketConnection {
WebsocketConnection { rx, tx }
}
}

#[async_trait]
impl Writer for InnerWriter {
async fn write(&mut self, frame: Frame) -> Result<()> {
match self.tx.send(frame).await {
Ok(()) => Ok(()),
Err(e) => Err(RSocketError::Other(e.into()).into()),
}
}
}

#[async_trait]
impl Reader for InnerReader {
async fn read(&mut self) -> Option<Result<Frame>> {
self.rx.next().await.map(|frame| Ok(frame))
}
}

impl Connection for WebsocketConnection {
fn split(
self,
) -> (
Box<dyn Writer + Send + Unpin>,
Box<dyn Reader + Send + Unpin>,
) {
fn split(self) -> (Box<FrameSink>, Box<FrameStream>) {
(
Box::new(InnerWriter { tx: self.tx }),
Box::new(InnerReader { rx: self.rx }),
Box::new(self.tx.sink_map_err(|e| RSocketError::Other(e.into()))),
Box::new(self.rx.map(|it| Ok(it))),
)
}
}
Loading