diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 483cc67..4d48ada 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -48,4 +48,4 @@ path = "tls/server.rs" [[example]] name = "tls-client" -path = "tls/client.rs" \ No newline at end of file +path = "tls/client.rs" diff --git a/rsocket-test/tests/test_clients.rs b/rsocket-test/tests/test_clients.rs index f782398..50d99c6 100644 --- a/rsocket-test/tests/test_clients.rs +++ b/rsocket-test/tests/test_clients.rs @@ -74,7 +74,6 @@ fn test_websocket() { exec_request_response(&cli).await; exec_request_stream(&cli).await; exec_request_channel(&cli).await; - cli.close(); }); } @@ -120,7 +119,6 @@ fn test_tcp() { exec_request_response(&cli).await; exec_request_stream(&cli).await; exec_request_channel(&cli).await; - cli.close(); }); } @@ -176,7 +174,6 @@ fn test_unix() { exec_request_response(&cli).await; exec_request_stream(&cli).await; exec_request_channel(&cli).await; - cli.close(); }); } diff --git a/rsocket-transport-tcp/src/connection/tcp.rs b/rsocket-transport-tcp/src/connection/tcp.rs index 00252f3..6b0b4ce 100644 --- a/rsocket-transport-tcp/src/connection/tcp.rs +++ b/rsocket-transport-tcp/src/connection/tcp.rs @@ -1,57 +1,23 @@ -use super::codec::LengthBasedFrameCodec; -use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; -use rsocket_rust::async_trait; -use rsocket_rust::frame::Frame; -use rsocket_rust::transport::{Connection, Reader, Writer}; -use rsocket_rust::{error::RSocketError, Result}; use tokio::net::TcpStream; use tokio_util::codec::Framed; +use rsocket_rust::error::RSocketError; +use rsocket_rust::transport::{Connection, FrameSink, FrameStream}; + +use super::codec::LengthBasedFrameCodec; + #[derive(Debug)] pub struct TcpConnection { stream: TcpStream, } -struct InnerWriter { - sink: SplitSink, Frame>, -} - -struct InnerReader { - stream: SplitStream>, -} - -#[async_trait] -impl Writer for InnerWriter { - async fn write(&mut self, frame: Frame) -> Result<()> { - match self.sink.send(frame).await { - Ok(()) => Ok(()), - Err(e) => Err(RSocketError::IO(e).into()), - } - } -} - -#[async_trait] -impl Reader for InnerReader { - async fn read(&mut self) -> Option> { - self.stream - .next() - .await - .map(|next| next.map_err(|e| RSocketError::IO(e).into())) - } -} - impl Connection for TcpConnection { - fn split( - self, - ) -> ( - Box, - Box, - ) { + fn split(self) -> (Box, Box) { let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split(); ( - Box::new(InnerWriter { sink }), - Box::new(InnerReader { stream }), + Box::new(sink.sink_map_err(|e| RSocketError::Other(e.into()))), + Box::new(stream.map(|next| next.map_err(|e| RSocketError::Other(e.into())))), ) } } diff --git a/rsocket-transport-tcp/src/connection/tls.rs b/rsocket-transport-tcp/src/connection/tls.rs index b2041c7..5d9f032 100644 --- a/rsocket-transport-tcp/src/connection/tls.rs +++ b/rsocket-transport-tcp/src/connection/tls.rs @@ -1,10 +1,7 @@ use super::codec::LengthBasedFrameCodec; -use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; -use rsocket_rust::async_trait; -use rsocket_rust::frame::Frame; -use rsocket_rust::transport::{Connection, Reader, Writer}; -use rsocket_rust::{error::RSocketError, Result}; +use rsocket_rust::error::RSocketError; +use rsocket_rust::transport::{Connection, FrameSink, FrameStream}; use tokio::net::TcpStream; use tokio_native_tls::TlsStream; use tokio_util::codec::Framed; @@ -14,45 +11,12 @@ pub struct TlsConnection { stream: TlsStream, } -struct InnerWriter { - sink: SplitSink, LengthBasedFrameCodec>, Frame>, -} - -struct InnerReader { - stream: SplitStream, LengthBasedFrameCodec>>, -} - -#[async_trait] -impl Writer for InnerWriter { - async fn write(&mut self, frame: Frame) -> Result<()> { - match self.sink.send(frame).await { - Ok(()) => Ok(()), - Err(e) => Err(RSocketError::IO(e).into()), - } - } -} - -#[async_trait] -impl Reader for InnerReader { - async fn read(&mut self) -> Option> { - self.stream - .next() - .await - .map(|next| next.map_err(|e| RSocketError::IO(e).into())) - } -} - impl Connection for TlsConnection { - fn split( - self, - ) -> ( - Box, - Box, - ) { + fn split(self) -> (Box, Box) { let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split(); ( - Box::new(InnerWriter { sink }), - Box::new(InnerReader { stream }), + Box::new(sink.sink_map_err(|e| RSocketError::Other(e.into()))), + Box::new(stream.map(|it| it.map_err(|e| RSocketError::Other(e.into())))), ) } } diff --git a/rsocket-transport-tcp/src/connection/uds.rs b/rsocket-transport-tcp/src/connection/uds.rs index f62e831..8b454d3 100644 --- a/rsocket-transport-tcp/src/connection/uds.rs +++ b/rsocket-transport-tcp/src/connection/uds.rs @@ -1,10 +1,7 @@ use super::codec::LengthBasedFrameCodec; -use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; -use rsocket_rust::async_trait; -use rsocket_rust::frame::Frame; -use rsocket_rust::transport::{Connection, Reader, Writer}; -use rsocket_rust::{error::RSocketError, Result}; +use rsocket_rust::error::RSocketError; +use rsocket_rust::transport::{Connection, FrameSink, FrameStream}; use tokio::net::UnixStream; use tokio_util::codec::Framed; @@ -13,49 +10,16 @@ pub struct UnixConnection { stream: UnixStream, } -struct InnerWriter { - sink: SplitSink, Frame>, -} - -struct InnerReader { - stream: SplitStream>, -} - impl Connection for UnixConnection { - fn split( - self, - ) -> ( - Box, - Box, - ) { + fn split(self) -> (Box, Box) { let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split(); ( - Box::new(InnerWriter { sink }), - Box::new(InnerReader { stream }), + Box::new(sink.sink_map_err(|e| RSocketError::Other(e.into()))), + Box::new(stream.map(|it| it.map_err(|e| RSocketError::Other(e.into())))), ) } } -#[async_trait] -impl Writer for InnerWriter { - async fn write(&mut self, frame: Frame) -> Result<()> { - match self.sink.send(frame).await { - Ok(()) => Ok(()), - Err(e) => Err(RSocketError::IO(e).into()), - } - } -} - -#[async_trait] -impl Reader for InnerReader { - async fn read(&mut self) -> Option> { - self.stream - .next() - .await - .map(|it| it.map_err(|e| RSocketError::IO(e).into())) - } -} - impl From for UnixConnection { fn from(stream: UnixStream) -> UnixConnection { UnixConnection { stream } diff --git a/rsocket-transport-wasm/Cargo.toml b/rsocket-transport-wasm/Cargo.toml index ad5fb1d..84d8274 100644 --- a/rsocket-transport-wasm/Cargo.toml +++ b/rsocket-transport-wasm/Cargo.toml @@ -12,12 +12,13 @@ description = "WASM Websocket RSocket transport implementation." [dependencies] bytes = "0.6.0" wasm-bindgen-futures = "0.4.19" -futures-channel = "0.3.8" -futures-util = "0.3.8" +futures-channel = "0.3.9" +futures-util = "0.3.9" js-sys = "0.3.46" serde = "1.0.118" serde_derive = "1.0.118" async-trait = "0.1.42" +log = "0.4.11" [dependencies.rsocket_rust] path = "../rsocket" diff --git a/rsocket-transport-wasm/src/client.rs b/rsocket-transport-wasm/src/client.rs index 45a6c06..62431d9 100644 --- a/rsocket-transport-wasm/src/client.rs +++ b/rsocket-transport-wasm/src/client.rs @@ -15,16 +15,6 @@ use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; use web_sys::{ErrorEvent, Event, FileReader, MessageEvent, ProgressEvent, WebSocket}; -macro_rules! console_log { - ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) -} - -#[wasm_bindgen] -extern "C" { - #[wasm_bindgen(js_namespace = console)] - fn log(s: &str); -} - pub struct WebsocketClientTransport { url: String, } @@ -104,7 +94,7 @@ impl Transport for WebsocketClientTransport { // on error let on_error = Closure::wrap(Box::new(move |e: ErrorEvent| { - console_log!("websocket error: {}", e.message()); + log::error!("websocket error: {}", e.message()); }) as Box); ws.set_onerror(Some(on_error.as_ref().unchecked_ref())); @@ -112,7 +102,7 @@ impl Transport for WebsocketClientTransport { // on_close let on_close = Closure::once(Box::new(move |_e: Event| { - console_log!("websocket closed"); + log::info!("websocket closed"); }) as Box); ws.set_onclose(Some(on_close.as_ref().unchecked_ref())); on_close.forget(); diff --git a/rsocket-transport-wasm/src/connection.rs b/rsocket-transport-wasm/src/connection.rs index a93f921..bff9d11 100644 --- a/rsocket-transport-wasm/src/connection.rs +++ b/rsocket-transport-wasm/src/connection.rs @@ -1,8 +1,7 @@ -use async_trait::async_trait; use futures_channel::mpsc; use futures_util::{SinkExt, StreamExt}; -use rsocket_rust::transport::{Connection, Reader, Writer}; -use rsocket_rust::{error::RSocketError, frame::Frame, Result}; +use rsocket_rust::transport::{Connection, FrameSink, FrameStream}; +use rsocket_rust::{error::RSocketError, frame::Frame}; #[derive(Debug)] pub struct WebsocketConnection { @@ -10,47 +9,17 @@ pub struct WebsocketConnection { tx: mpsc::Sender, } -struct InnerWriter { - tx: mpsc::Sender, -} - -struct InnerReader { - rx: mpsc::Receiver, -} - impl WebsocketConnection { pub(crate) fn new(tx: mpsc::Sender, rx: mpsc::Receiver) -> WebsocketConnection { WebsocketConnection { rx, tx } } } -#[async_trait] -impl Writer for InnerWriter { - async fn write(&mut self, frame: Frame) -> Result<()> { - match self.tx.send(frame).await { - Ok(()) => Ok(()), - Err(e) => Err(RSocketError::Other(e.into()).into()), - } - } -} - -#[async_trait] -impl Reader for InnerReader { - async fn read(&mut self) -> Option> { - self.rx.next().await.map(|frame| Ok(frame)) - } -} - impl Connection for WebsocketConnection { - fn split( - self, - ) -> ( - Box, - Box, - ) { + fn split(self) -> (Box, Box) { ( - Box::new(InnerWriter { tx: self.tx }), - Box::new(InnerReader { rx: self.rx }), + Box::new(self.tx.sink_map_err(|e| RSocketError::Other(e.into()))), + Box::new(self.rx.map(|it| Ok(it))), ) } } diff --git a/rsocket-transport-websocket/src/connection.rs b/rsocket-transport-websocket/src/connection.rs index 52fbec9..eff31d2 100644 --- a/rsocket-transport-websocket/src/connection.rs +++ b/rsocket-transport-websocket/src/connection.rs @@ -1,79 +1,81 @@ use bytes::{BufMut, BytesMut}; -use futures::stream::{SplitSink, SplitStream}; -use futures::{SinkExt, StreamExt}; +use futures::stream::SplitSink; +use futures::{Sink, SinkExt, StreamExt}; use rsocket_rust::{ - async_trait, error::RSocketError, frame::Frame, - transport::{Connection, Reader, Writer}, + transport::{Connection, FrameSink, FrameStream}, utils::Writeable, - Result, }; +use std::result::Result; use tokio::net::TcpStream; -use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use tokio_tungstenite::{ + tungstenite::{Error as WsError, Message}, + WebSocketStream, +}; #[derive(Debug)] pub struct WebsocketConnection { stream: WebSocketStream, } -struct InnerWriter { - sink: SplitSink, Message>, -} - -struct InnerReader { - stream: SplitStream>, -} - impl WebsocketConnection { pub(crate) fn new(stream: WebSocketStream) -> WebsocketConnection { WebsocketConnection { stream } } } -impl Connection for WebsocketConnection { - fn split( - self, - ) -> ( - Box, - Box, - ) { - let (sink, stream) = self.stream.split(); - ( - Box::new(InnerWriter { sink }), - Box::new(InnerReader { stream }), - ) +struct InnerSink(SplitSink, Message>); + +impl Sink for InnerSink { + type Error = WsError; + + fn poll_ready( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.as_mut().0.poll_ready_unpin(cx) + } + + fn start_send(mut self: std::pin::Pin<&mut Self>, item: Frame) -> Result<(), Self::Error> { + let mut b = BytesMut::new(); + item.write_to(&mut b); + let msg = Message::binary(b.to_vec()); + self.as_mut().0.start_send_unpin(msg) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.as_mut().0.poll_flush_unpin(cx) } -} -#[async_trait] -impl Writer for InnerWriter { - async fn write(&mut self, frame: Frame) -> Result<()> { - let mut bf = BytesMut::new(); - frame.write_to(&mut bf); - let msg = Message::binary(bf.to_vec()); - match self.sink.send(msg).await { - Ok(()) => Ok(()), - Err(e) => Err(RSocketError::Other(e.into()).into()), - } + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.as_mut().0.poll_close_unpin(cx) } } -#[async_trait] -impl Reader for InnerReader { - async fn read(&mut self) -> Option> { - match self.stream.next().await { - Some(Ok(msg)) => { - let raw = msg.into_data(); - let mut bf = BytesMut::new(); - bf.put_slice(&raw[..]); - match Frame::decode(&mut bf) { - Ok(frame) => Some(Ok(frame)), - Err(e) => Some(Err(e)), +impl Connection for WebsocketConnection { + fn split(self) -> (Box, Box) { + let (sink, stream) = self.stream.split(); + ( + Box::new(InnerSink(sink).sink_map_err(|e| RSocketError::Other(e.into()))), + Box::new(stream.map(|it| match it { + Ok(msg) => { + let raw = msg.into_data(); + let mut bf = BytesMut::new(); + bf.put_slice(&raw[..]); + match Frame::decode(&mut bf) { + Ok(frame) => Ok(frame), + Err(e) => Err(RSocketError::Other(e.into())), + } } - } - Some(Err(e)) => Some(Err(RSocketError::Other(e.into()).into())), - None => None, - } + Err(e) => Err(RSocketError::Other(e.into())), + })), + ) } } diff --git a/rsocket/src/core/client.rs b/rsocket/src/core/client.rs index da7c686..667d1e4 100644 --- a/rsocket/src/core/client.rs +++ b/rsocket/src/core/client.rs @@ -1,20 +1,22 @@ +use std::marker::PhantomData; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use futures::{future, FutureExt, Sink, SinkExt, Stream, StreamExt}; +use tokio::sync::{mpsc, Mutex, Notify}; + use crate::error::{RSocketError, ERR_CONN_CLOSED}; use crate::frame::{self, Frame}; use crate::payload::{Payload, SetupPayload, SetupPayloadBuilder}; use crate::runtime; use crate::spi::{ClientResponder, Flux, RSocket}; use crate::transport::{ - self, Acceptor, Connection, DuplexSocket, Reader, Splitter, Transport, Writer, + self, Acceptor, Connection, DuplexSocket, FrameSink, FrameStream, Splitter, Transport, }; use crate::Result; -use async_trait::async_trait; -use futures::{future, FutureExt, SinkExt, StreamExt}; -use std::marker::PhantomData; -use std::net::SocketAddr; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{mpsc, Mutex, Notify}; #[derive(Clone)] pub struct Client { @@ -120,7 +122,7 @@ where Some(Splitter::new(self.mtu)) }; - let (snd_tx, mut snd_rx) = mpsc::channel::(super::CHANNEL_SIZE); + let (snd_tx, mut snd_rx) = mpsc::unbounded_channel::(); let cloned_snd_tx = snd_tx.clone(); let mut socket = DuplexSocket::new(1, snd_tx, splitter).await; @@ -144,7 +146,7 @@ where break; } } - if let Err(e) = (&mut sink).write(frame).await { + if let Err(e) = sink.send(frame).await { error!("write frame failed: {}", e); break; } @@ -154,7 +156,7 @@ where // keepalive let keepalive_frame = frame::Keepalive::builder(0, Frame::FLAG_RESPOND).build(); - if let Err(e) = (&mut sink).write(keepalive_frame).await { + if let Err(e) = sink.send(keepalive_frame).await { error!("write frame failed: {}", e); break; } @@ -167,12 +169,15 @@ where let closer = self.closer.take(); let closed = Arc::new(Notify::new()); let closed_clone = closed.clone(); + + let (read_tx, mut read_rx) = mpsc::unbounded_channel::(); + runtime::spawn(async move { - while let Some(next) = stream.read().await { + while let Some(next) = stream.next().await { match next { Ok(frame) => { - if let Err(e) = cloned_socket.dispatch(frame, &acceptor).await { - error!("dispatch frame failed: {}", e); + if let Err(e) = read_tx.send(frame) { + error!("read next frame failed: {}", e); break; } } @@ -182,12 +187,21 @@ where } } } + }); + + runtime::spawn(async move { + while let Some(next) = read_rx.next().await { + if let Err(e) = cloned_socket.dispatch(next, &acceptor).await { + error!("dispatch frame failed: {}", e); + break; + } + } // workaround: send a notify frame that the connection has been closed. let close_frame = frame::Error::builder(0, 0) .set_code(ERR_CONN_CLOSED) .build(); - if let Err(_) = cloned_snd_tx.send(close_frame).await { + if let Err(_) = cloned_snd_tx.send(close_frame) { debug!("send close notify frame failed!"); } @@ -213,10 +227,6 @@ impl Client { pub async fn wait_for_close(self) { self.closed.notified().await } - - pub fn close(self) { - // TODO: support close - } } #[async_trait] diff --git a/rsocket/src/core/mod.rs b/rsocket/src/core/mod.rs index c778404..9730794 100644 --- a/rsocket/src/core/mod.rs +++ b/rsocket/src/core/mod.rs @@ -2,8 +2,6 @@ mod client; mod factory; mod server; -pub(crate) const CHANNEL_SIZE: usize = 32; - pub use client::{Client, ClientBuilder}; pub use factory::RSocketFactory; pub use server::ServerBuilder; diff --git a/rsocket/src/core/server.rs b/rsocket/src/core/server.rs index 999a9b3..9d48c8c 100644 --- a/rsocket/src/core/server.rs +++ b/rsocket/src/core/server.rs @@ -8,6 +8,7 @@ use crate::transport::{ }; use crate::utils::EmptyRSocket; use crate::Result; +use futures::{SinkExt, StreamExt}; use std::error::Error; use std::future::Future; use std::marker::PhantomData; @@ -111,34 +112,45 @@ where }; // Init duplex socket. - let (snd_tx, mut snd_rx) = mpsc::channel::(super::CHANNEL_SIZE); + let (snd_tx, mut snd_rx) = mpsc::unbounded_channel::(); let mut socket = DuplexSocket::new(0, snd_tx, splitter).await; // Begin loop for writing frames. runtime::spawn(async move { while let Some(frame) = snd_rx.recv().await { - if let Err(e) = writer.write(frame).await { + if let Err(e) = writer.send(frame).await { error!("write frame failed: {}", e); break; } } }); - loop { - match reader.read().await { - Some(Ok(frame)) => { - if let Err(e) = socket.dispatch(frame, &acceptor).await { - error!("dispatch incoming frame failed: {}", e); + let (read_tx, mut read_rx) = mpsc::unbounded_channel::(); + + runtime::spawn(async move { + loop { + match reader.next().await { + Some(Ok(frame)) => { + if let Err(e) = read_tx.send(frame) { + error!("read next frame failed: {}", e); + break; + } + } + Some(Err(e)) => { + error!("read next frame failed: {}", e); + break; + } + None => { break; } } - Some(Err(e)) => { - error!("read next frame failed: {}", e); - break; - } - None => { - break; - } + } + }); + + while let Some(frame) = read_rx.next().await { + if let Err(e) = socket.dispatch(frame, &acceptor).await { + error!("dispatch incoming frame failed: {}", e); + break; } } diff --git a/rsocket/src/transport/socket.rs b/rsocket/src/transport/socket.rs index 007a37b..b3e3b6b 100644 --- a/rsocket/src/transport/socket.rs +++ b/rsocket/src/transport/socket.rs @@ -22,7 +22,7 @@ use tokio::sync::{mpsc, oneshot, RwLock}; pub(crate) struct DuplexSocket { seq: StreamID, responder: Responder, - tx: mpsc::Sender, + tx: mpsc::UnboundedSender, handlers: Arc>, canceller: mpsc::Sender, splitter: Option, @@ -45,7 +45,7 @@ enum Handler { impl DuplexSocket { pub(crate) async fn new( first_stream_id: u32, - tx: mpsc::Sender, + tx: mpsc::UnboundedSender, splitter: Option, ) -> DuplexSocket { let (canceller_tx, canceller_rx) = mpsc::channel::(32); @@ -85,7 +85,7 @@ impl DuplexSocket { if let Some(b) = m { bu = bu.set_metadata(b); } - self.tx.send(bu.build()).await.expect("Send setup failed"); + self.tx.send(bu.build()).expect("Send setup failed"); } #[inline] @@ -127,7 +127,7 @@ impl DuplexSocket { .set_code(error::ERR_REJECT_SETUP) .set_data(Bytes::from(errmsg)) .build(); - self.tx.send(sending).await.expect("Reject setup failed"); + self.tx.send(sending).expect("Reject setup failed"); return; } } @@ -416,7 +416,7 @@ impl DuplexSocket { .set_code(error::ERR_APPLICATION) .set_data(Bytes::from("TODO: should be error details")) .build(); - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("respond REQUEST_RESPONSE failed: {}", e); } } @@ -442,13 +442,12 @@ impl DuplexSocket { .set_code(error::ERR_APPLICATION) .set_data(Bytes::from(format!("{}", e))) .build(); - tx.send(sending).await.expect("Send stream response failed"); + tx.send(sending).expect("Send stream response failed"); } }; } let complete = frame::Payload::builder(sid, Frame::FLAG_COMPLETE).build(); tx.send(complete) - .await .expect("Send stream complete response failed"); }); } @@ -466,7 +465,7 @@ impl DuplexSocket { // TODO: support custom RequestN. let request_n = frame::RequestN::builder(sid, 0).build(); - if let Err(e) = tx.send(request_n).await { + if let Err(e) = tx.send(request_n) { error!("respond REQUEST_N failed: {}", e); } @@ -488,10 +487,10 @@ impl DuplexSocket { .set_data(Bytes::from(format!("{}", e))) .build(), }; - tx.send(sending).await.expect("Send failed!"); + tx.send(sending).expect("Send failed!"); } let complete = frame::Payload::builder(sid, Frame::FLAG_COMPLETE).build(); - if let Err(e) = tx.send(complete).await { + if let Err(e) = tx.send(complete) { error!("complete REQUEST_CHANNEL failed: {}", e); } }); @@ -511,7 +510,7 @@ impl DuplexSocket { if let Some(b) = data { sending = sending.set_data(b); } - if let Err(e) = self.tx.send(sending.build()).await { + if let Err(e) = self.tx.send(sending.build()) { error!("respond KEEPALIVE failed: {}", e); } } @@ -519,7 +518,7 @@ impl DuplexSocket { #[inline] async fn try_send_channel( splitter: &Option, - tx: &mut mpsc::Sender, + tx: &mut mpsc::UnboundedSender, sid: u32, res: Payload, flag: u16, @@ -541,7 +540,7 @@ impl DuplexSocket { .build() }; // send frame - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("send request_channel failed: {}", e); return; } @@ -562,7 +561,7 @@ impl DuplexSocket { .build() }; // send frame - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("send request_channel failed: {}", e); } } @@ -570,7 +569,7 @@ impl DuplexSocket { let sending = frame::RequestChannel::builder(sid, flag) .set_all(res.split()) .build(); - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("send request_channel failed: {}", e); } } @@ -578,9 +577,9 @@ impl DuplexSocket { } #[inline] - async fn try_send_complete(tx: &mut mpsc::Sender, sid: u32, flag: u16) { + async fn try_send_complete(tx: &mut mpsc::UnboundedSender, sid: u32, flag: u16) { let sending = frame::Payload::builder(sid, flag).build(); - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("respond failed: {}", e); } } @@ -588,7 +587,7 @@ impl DuplexSocket { #[inline] async fn try_send_payload( splitter: &Option, - tx: &mut mpsc::Sender, + tx: &mut mpsc::UnboundedSender, sid: u32, res: Payload, flag: u16, @@ -609,7 +608,7 @@ impl DuplexSocket { .build() }; // send frame - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("send payload failed: {}", e); return; } @@ -626,7 +625,7 @@ impl DuplexSocket { .build() }; // send frame - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("send payload failed: {}", e); } } @@ -634,7 +633,7 @@ impl DuplexSocket { let sending = frame::Payload::builder(sid, flag) .set_all(res.split()) .build(); - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("respond failed: {}", e); } } @@ -652,11 +651,8 @@ impl RSocket for DuplexSocket { if let Some(b) = m { bu = bu.set_metadata(b); } - tx.send(bu.build()).await?; + tx.send(bu.build())?; Ok(()) - // if let Err(e) = tx.send(bu.build()).await { - // error!("send metadata_push failed: {}", e); - // } } async fn fire_and_forget(&self, req: Payload) -> Result<()> { @@ -682,7 +678,7 @@ impl RSocket for DuplexSocket { .build() }; // send frame - tx.send(sending).await?; + tx.send(sending)?; } prev = Some(next); cuts += 1; @@ -700,13 +696,13 @@ impl RSocket for DuplexSocket { .build() }; // send frame - tx.send(sending).await?; + tx.send(sending)?; } None => { let sending = frame::RequestFNF::builder(sid, 0) .set_all(req.split()) .build(); - tx.send(sending).await?; + tx.send(sending)?; } } Ok(()) @@ -741,7 +737,7 @@ impl RSocket for DuplexSocket { .build() }; // send frame - if let Err(e) = sender.send(sending).await { + if let Err(e) = sender.send(sending) { error!("send request_response failed: {}", e); return; } @@ -762,7 +758,7 @@ impl RSocket for DuplexSocket { .build() }; // send frame - if let Err(e) = sender.send(sending).await { + if let Err(e) = sender.send(sending) { error!("send request_response failed: {}", e); } } @@ -772,7 +768,7 @@ impl RSocket for DuplexSocket { .set_all(req.split()) .build(); // send frame - if let Err(e) = sender.send(sending).await { + if let Err(e) = sender.send(sending) { error!("send request_response failed: {}", e); } } @@ -812,7 +808,7 @@ impl RSocket for DuplexSocket { .build() }; // send frame - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("send request_stream failed: {}", e); return; } @@ -833,7 +829,7 @@ impl RSocket for DuplexSocket { .build() }; // send frame - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("send request_stream failed: {}", e); } } @@ -841,7 +837,7 @@ impl RSocket for DuplexSocket { let sending = frame::RequestStream::builder(sid, 0) .set_all(input.split()) .build(); - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("send request_stream failed: {}", e); } } @@ -877,14 +873,14 @@ impl RSocket for DuplexSocket { .set_code(error::ERR_APPLICATION) .set_data(Bytes::from(format!("{}", e))) .build(); - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("send REQUEST_CHANNEL failed: {}", e); } } }; } let sending = frame::Payload::builder(sid, Frame::FLAG_COMPLETE).build(); - if let Err(e) = tx.send(sending).await { + if let Err(e) = tx.send(sending) { error!("complete REQUEST_CHANNEL failed: {}", e); } }); diff --git a/rsocket/src/transport/spi.rs b/rsocket/src/transport/spi.rs index 6778277..dfd5b01 100644 --- a/rsocket/src/transport/spi.rs +++ b/rsocket/src/transport/spi.rs @@ -1,40 +1,31 @@ -use crate::frame::Frame; -use crate::payload::SetupPayload; -use crate::spi::{ClientResponder, RSocket, ServerResponder}; -use crate::{Error, Result}; -use async_trait::async_trait; -use futures::channel::{mpsc, oneshot}; -use futures::sink::Sink; -use futures::stream::Stream; use std::future::Future; +use std::io::Error as IOError; use std::marker::Unpin; use std::pin::Pin; +use std::result::Result as StdResult; use std::sync::Arc; + +use async_trait::async_trait; +use futures::channel::{mpsc, oneshot}; +use futures::{Sink, Stream}; use tokio::sync::Notify; +use crate::payload::SetupPayload; +use crate::spi::{ClientResponder, RSocket, ServerResponder}; +use crate::{error::RSocketError, frame::Frame}; +use crate::{Error, Result}; + #[derive(Clone)] pub(crate) enum Acceptor { Simple(Arc), Generate(Arc), } -#[async_trait] -pub trait Reader { - async fn read(&mut self) -> Option>; -} - -#[async_trait] -pub trait Writer { - async fn write(&mut self, frame: Frame) -> Result<()>; -} +pub type FrameSink = dyn Sink + Send + Unpin; +pub type FrameStream = dyn Stream> + Send + Unpin; pub trait Connection { - fn split( - self, - ) -> ( - Box, - Box, - ); + fn split(self) -> (Box, Box); } #[async_trait] @@ -44,19 +35,6 @@ pub trait Transport { async fn connect(self) -> Result; } -#[async_trait] -pub trait ServerTransportOld { - type Item; - - async fn start( - self, - starter: Option>, - acceptor: Box Result<()> + Send + Sync>, - ) -> Result<()> - where - Self::Item: Transport + Sized; -} - #[async_trait] pub trait ServerTransport { type Item: Transport;