Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions rsocket/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::payload::{Payload, SetupPayload, SetupPayloadBuilder};
use crate::runtime;
use crate::spi::{ClientResponder, Flux, RSocket};
use crate::transport::{
self, Acceptor, Connection, DuplexSocket, FrameSink, FrameStream, Splitter, Transport,
self, Connection, DuplexSocket, FrameSink, FrameStream, Splitter, Transport,
};
use crate::Result;

Expand Down Expand Up @@ -127,7 +127,11 @@ where
let mut socket = DuplexSocket::new(1, snd_tx, splitter).await;

let mut cloned_socket = socket.clone();
let acceptor: Option<Acceptor> = self.responder.map(|it| Acceptor::Simple(Arc::new(it)));

if let Some(f) = self.responder {
let responder = f();
socket.bind_responder(responder).await;
}

let conn = tp.connect().await?;
let (mut sink, mut stream) = conn.split();
Expand Down Expand Up @@ -191,7 +195,7 @@ where

runtime::spawn(async move {
while let Some(next) = read_rx.next().await {
if let Err(e) = cloned_socket.dispatch(next, &acceptor).await {
if let Err(e) = cloned_socket.dispatch(next, None).await {
error!("dispatch frame failed: {}", e);
break;
}
Expand Down
11 changes: 5 additions & 6 deletions rsocket/src/core/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use crate::frame::{self, Frame};
use crate::payload::SetupPayload;
use crate::runtime;
use crate::spi::{RSocket, ServerResponder};
use crate::transport::{
Acceptor, Connection, DuplexSocket, ServerTransport, Splitter, Transport, MIN_MTU,
};
use crate::transport::{Connection, DuplexSocket, ServerTransport, Splitter, Transport, MIN_MTU};
use crate::utils::EmptyRSocket;
use crate::Result;
use futures::{SinkExt, StreamExt};
Expand Down Expand Up @@ -70,7 +68,7 @@ where
{
pub async fn serve(mut self) -> Result<()> {
let mut server_transport = self.transport.take().expect("missing transport");
let acceptor = self.on_setup.map(|v| Acceptor::Generate(Arc::new(v)));
// let acceptor = self.on_setup.map(|v| Acceptor::Generate(Arc::new(v)));

let mtu = self.mtu;

Expand All @@ -80,6 +78,7 @@ where
invoke();
}

let acceptor = Arc::new(self.on_setup);
while let Some(next) = server_transport.next().await {
match next {
Ok(tp) => {
Expand All @@ -99,7 +98,7 @@ where
}

#[inline]
async fn on_transport(mtu: usize, tp: C, acceptor: Option<Acceptor>) -> Result<()> {
async fn on_transport(mtu: usize, tp: C, acceptor: Arc<Option<ServerResponder>>) -> Result<()> {
// Establish connection.
let conn = tp.connect().await?;
let (mut writer, mut reader) = conn.split();
Expand Down Expand Up @@ -148,7 +147,7 @@ where
});

while let Some(frame) = read_rx.next().await {
if let Err(e) = socket.dispatch(frame, &acceptor).await {
if let Err(e) = socket.dispatch(frame, acceptor.as_ref().as_ref()).await {
error!("dispatch incoming frame failed: {}", e);
break;
}
Expand Down
2 changes: 2 additions & 0 deletions rsocket/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub use async_stream::stream;
/// A re-export of [`async-trait`](https://docs.rs/async-trait) for use with RSocket trait implementation.
pub use async_trait::async_trait;

#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate log;
#[macro_use]
Expand Down
20 changes: 10 additions & 10 deletions rsocket/src/transport/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::spi::*;
use crate::error::{self, RSocketError};
use crate::frame::{self, Body, Frame};
use crate::payload::{Payload, SetupPayload};
use crate::spi::{Flux, RSocket};
use crate::spi::{Flux, RSocket, ServerResponder};
use crate::utils::EmptyRSocket;
use crate::{runtime, Result};
use async_stream::stream;
Expand Down Expand Up @@ -103,7 +103,7 @@ impl DuplexSocket {
pub(crate) async fn dispatch(
&mut self,
frame: Frame,
acceptor: &Option<Acceptor>,
acceptor: Option<&ServerResponder>,
) -> Result<()> {
if let Some(frame) = self.join_frame(frame).await {
self.process_once(frame, acceptor).await;
Expand All @@ -112,7 +112,7 @@ impl DuplexSocket {
}

#[inline]
async fn process_once(&mut self, msg: Frame, acceptor: &Option<Acceptor>) {
async fn process_once(&mut self, msg: Frame, acceptor: Option<&ServerResponder>) {
let sid = msg.get_stream_id();
let flag = msg.get_flag();
debug_frame(false, &msg);
Expand Down Expand Up @@ -343,10 +343,14 @@ impl DuplexSocket {
}
}

pub(crate) async fn bind_responder(&self, responder: Box<dyn RSocket>) {
self.responder.set(responder).await;
}

#[inline]
async fn on_setup(
&self,
acceptor: &Option<Acceptor>,
acceptor: Option<&ServerResponder>,
sid: u32,
flag: u16,
setup: SetupPayload,
Expand All @@ -356,11 +360,7 @@ impl DuplexSocket {
self.responder.set(Box::new(EmptyRSocket)).await;
Ok(())
}
Some(Acceptor::Simple(gen)) => {
self.responder.set(gen()).await;
Ok(())
}
Some(Acceptor::Generate(gen)) => match gen(setup, Box::new(self.clone())) {
Some(gen) => match gen(setup, Box::new(self.clone())) {
Ok(it) => {
self.responder.set(it).await;
Ok(())
Expand Down Expand Up @@ -414,7 +414,7 @@ impl DuplexSocket {
Err(e) => {
let sending = frame::Error::builder(sid, 0)
.set_code(error::ERR_APPLICATION)
.set_data(Bytes::from("TODO: should be error details"))
.set_data(Bytes::from(e.to_string()))
.build();
if let Err(e) = tx.send(sending) {
error!("respond REQUEST_RESPONSE failed: {}", e);
Expand Down
6 changes: 0 additions & 6 deletions rsocket/src/transport/spi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ use crate::spi::{ClientResponder, RSocket, ServerResponder};
use crate::{error::RSocketError, frame::Frame};
use crate::{Error, Result};

#[derive(Clone)]
pub(crate) enum Acceptor {
Simple(Arc<ClientResponder>),
Generate(Arc<ServerResponder>),
}

pub type FrameSink = dyn Sink<Frame, Error = RSocketError> + Send + Unpin;
pub type FrameStream = dyn Stream<Item = StdResult<Frame, RSocketError>> + Send + Unpin;

Expand Down
14 changes: 9 additions & 5 deletions rsocket/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,27 @@ pub(crate) struct EmptyRSocket;
#[async_trait]
impl RSocket for EmptyRSocket {
async fn metadata_push(&self, _req: Payload) -> Result<()> {
Err(RSocketError::ApplicationException("UNIMPLEMENT".into()).into())
Err(anyhow!("UNIMPLEMENT"))
}

async fn fire_and_forget(&self, _req: Payload) -> Result<()> {
Err(RSocketError::ApplicationException("UNIMPLEMENT".into()).into())
Err(anyhow!("UNIMPLEMENT"))
}

async fn request_response(&self, _req: Payload) -> Result<Option<Payload>> {
Err(RSocketError::ApplicationException("UNIMPLEMENT".into()).into())
Err(anyhow!("UNIMPLEMENT"))
}

fn request_stream(&self, _req: Payload) -> Flux<Result<Payload>> {
Box::pin(futures::stream::empty())
Box::pin(stream! {
yield Err(anyhow!("UNIMPLEMENT"));
})
}

fn request_channel(&self, _reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>> {
Box::pin(futures::stream::empty())
Box::pin(stream! {
yield Err(anyhow!("UNIMPLEMENT"));
})
}
}

Expand Down