diff --git a/rsocket/src/core/client.rs b/rsocket/src/core/client.rs index 810502a..65157ef 100644 --- a/rsocket/src/core/client.rs +++ b/rsocket/src/core/client.rs @@ -22,6 +22,7 @@ use crate::Result; pub struct Client { closed: Arc, socket: DuplexSocket, + closing: mpsc::Sender<()>, } pub struct ClientBuilder { @@ -171,28 +172,41 @@ where // begin read loop let closer = self.closer.take(); - let closed = Arc::new(Notify::new()); - let closed_clone = closed.clone(); + let close_notify = Arc::new(Notify::new()); + let close_notify_clone = close_notify.clone(); + let (closing, mut closing_rx) = mpsc::channel::<()>(1); let (read_tx, mut read_rx) = mpsc::unbounded_channel::(); + // read frames from stream, then writes into channel runtime::spawn(async move { - while let Some(next) = stream.next().await { - match next { - Ok(frame) => { - if let Err(e) = read_tx.send(frame) { - error!("read next frame failed: {}", e); - break; + loop { + tokio::select! { + res = stream.next() => { + match res { + Some(next) => match next { + Ok(frame) => { + if let Err(e) = read_tx.send(frame) { + error!("forward frame failed: {}", e); + break; + } + } + Err(e) => { + error!("read frame failed: {}", e); + break; + } + } + None => break, } } - Err(e) => { - error!("read next frame failed: {}", e); - break; + _ = closing_rx.recv() => { + break } } } }); + // process frames runtime::spawn(async move { while let Some(next) = read_rx.recv().await { if let Err(e) = cloned_socket.dispatch(next, None).await { @@ -205,12 +219,12 @@ where let close_frame = frame::Error::builder(0, 0) .set_code(ERR_CONN_CLOSED) .build(); - if let Err(_) = cloned_snd_tx.send(close_frame) { - debug!("send close notify frame failed!"); + if let Err(e) = cloned_snd_tx.send(close_frame) { + debug!("send close notify frame failed: {}", e); } // notify client closed - closed_clone.notify_one(); + close_notify_clone.notify_one(); // invoke on_close handler if let Some(mut invoke) = closer { @@ -219,13 +233,18 @@ where }); socket.setup(setup).await; - Ok(Client::new(socket, closed)) + + Ok(Client::new(socket, close_notify, closing)) } } impl Client { - fn new(socket: DuplexSocket, closed: Arc) -> Client { - Client { socket, closed } + fn new(socket: DuplexSocket, closed: Arc, closing: mpsc::Sender<()>) -> Client { + Client { + socket, + closed, + closing, + } } pub async fn wait_for_close(self) { diff --git a/rsocket/src/core/server.rs b/rsocket/src/core/server.rs index deb4dfb..3b39fb5 100644 --- a/rsocket/src/core/server.rs +++ b/rsocket/src/core/server.rs @@ -133,12 +133,12 @@ where match reader.next().await { Some(Ok(frame)) => { if let Err(e) = read_tx.send(frame) { - error!("read next frame failed: {}", e); + error!("forward frame failed: {}", e); break; } } Some(Err(e)) => { - error!("read next frame failed: {}", e); + error!("read frame failed: {}", e); break; } None => { @@ -150,7 +150,7 @@ where while let Some(frame) = read_rx.recv().await { if let Err(e) = socket.dispatch(frame, acceptor.as_ref().as_ref()).await { - error!("dispatch incoming frame failed: {}", e); + error!("dispatch frame failed: {}", e); break; } }