11use super :: codec:: LengthBasedFrameCodec ;
2- use futures:: stream:: { SplitSink , SplitStream } ;
32use futures:: { SinkExt , StreamExt } ;
4- use rsocket_rust:: async_trait;
5- use rsocket_rust:: frame:: Frame ;
6- use rsocket_rust:: transport:: { Connection , Reader , Writer } ;
7- use rsocket_rust:: { error:: RSocketError , Result } ;
3+ use rsocket_rust:: error:: RSocketError ;
4+ use rsocket_rust:: transport:: { Connection , FrameSink , FrameStream } ;
85use tokio:: net:: UnixStream ;
96use tokio_util:: codec:: Framed ;
107
@@ -13,49 +10,16 @@ pub struct UnixConnection {
1310 stream : UnixStream ,
1411}
1512
16- struct InnerWriter {
17- sink : SplitSink < Framed < UnixStream , LengthBasedFrameCodec > , Frame > ,
18- }
19-
20- struct InnerReader {
21- stream : SplitStream < Framed < UnixStream , LengthBasedFrameCodec > > ,
22- }
23-
2413impl Connection for UnixConnection {
25- fn split (
26- self ,
27- ) -> (
28- Box < dyn Writer + Send + Unpin > ,
29- Box < dyn Reader + Send + Unpin > ,
30- ) {
14+ fn split ( self ) -> ( Box < FrameSink > , Box < FrameStream > ) {
3115 let ( sink, stream) = Framed :: new ( self . stream , LengthBasedFrameCodec ) . split ( ) ;
3216 (
33- Box :: new ( InnerWriter { sink } ) ,
34- Box :: new ( InnerReader { stream } ) ,
17+ Box :: new ( sink. sink_map_err ( |e| RSocketError :: Other ( e . into ( ) ) ) ) ,
18+ Box :: new ( stream. map ( |it| it . map_err ( |e| RSocketError :: Other ( e . into ( ) ) ) ) ) ,
3519 )
3620 }
3721}
3822
39- #[ async_trait]
40- impl Writer for InnerWriter {
41- async fn write ( & mut self , frame : Frame ) -> Result < ( ) > {
42- match self . sink . send ( frame) . await {
43- Ok ( ( ) ) => Ok ( ( ) ) ,
44- Err ( e) => Err ( RSocketError :: IO ( e) . into ( ) ) ,
45- }
46- }
47- }
48-
49- #[ async_trait]
50- impl Reader for InnerReader {
51- async fn read ( & mut self ) -> Option < Result < Frame > > {
52- self . stream
53- . next ( )
54- . await
55- . map ( |it| it. map_err ( |e| RSocketError :: IO ( e) . into ( ) ) )
56- }
57- }
58-
5923impl From < UnixStream > for UnixConnection {
6024 fn from ( stream : UnixStream ) -> UnixConnection {
6125 UnixConnection { stream }
0 commit comments