7474// happens-before semantics required for the acquire / release semantics used
7575// by the queue structure.
7676
77+ use std:: mem:: Pin ;
78+ use std:: marker:: Unpin ;
7779use std:: fmt;
7880use std:: error:: Error ;
7981use std:: any:: Any ;
@@ -84,8 +86,7 @@ use std::thread;
8486use std:: usize;
8587
8688use futures_core:: task:: { self , Waker } ;
87- use futures_core:: { Async , Poll , Stream } ;
88- use futures_core:: never:: Never ;
89+ use futures_core:: { Poll , Stream } ;
8990
9091use mpsc:: queue:: { Queue , PopResult } ;
9192
@@ -109,6 +110,9 @@ pub struct Sender<T> {
109110 maybe_parked : bool ,
110111}
111112
113+ // Safe because we treat the `T` opaquely
114+ unsafe impl < T > Unpin for Sender < T > { }
115+
112116/// The transmission end of an unbounded mpsc channel.
113117///
114118/// This value is created by the [`unbounded`](unbounded) function.
@@ -126,12 +130,18 @@ pub struct Receiver<T> {
126130 inner : Arc < Inner < T > > ,
127131}
128132
133+ // Safe because we treat the `T` opaquely
134+ unsafe impl < T > Unpin for Receiver < T > { }
135+
129136/// The receiving end of an unbounded mpsc channel.
130137///
131138/// This value is created by the [`unbounded`](unbounded) function.
132139#[ derive( Debug ) ]
133140pub struct UnboundedReceiver < T > ( Receiver < T > ) ;
134141
142+ // Safe because we treat the `T` opaquely
143+ unsafe impl < T > Unpin for UnboundedReceiver < T > { }
144+
135145/// The error type for [`Sender`s](Sender) used as `Sink`s.
136146#[ derive( Clone , Debug , PartialEq , Eq ) ]
137147pub struct SendError {
@@ -511,14 +521,14 @@ impl<T> Sender<T> {
511521 Ok ( ( ) )
512522 }
513523
514- fn poll_ready_nb ( & self ) -> Poll < ( ) , SendError > {
524+ fn poll_ready_nb ( & self ) -> Poll < Result < ( ) , SendError > > {
515525 let state = decode_state ( self . inner . state . load ( SeqCst ) ) ;
516526 if state. is_open {
517- Ok ( Async :: Ready ( ( ) ) )
527+ Poll :: Ready ( Ok ( ( ) ) )
518528 } else {
519- Err ( SendError {
529+ Poll :: Ready ( Err ( SendError {
520530 kind : SendErrorKind :: Full ,
521- } )
531+ } ) )
522532 }
523533 }
524534
@@ -637,15 +647,15 @@ impl<T> Sender<T> {
637647 /// - `Ok(Async::Pending)` if the channel may not have
638648 /// capacity, in which case the current task is queued to be notified once capacity is available;
639649 /// - `Err(SendError)` if the receiver has been dropped.
640- pub fn poll_ready ( & mut self , cx : & mut task:: Context ) -> Poll < ( ) , SendError > {
650+ pub fn poll_ready ( & mut self , cx : & mut task:: Context ) -> Poll < Result < ( ) , SendError > > {
641651 let state = decode_state ( self . inner . state . load ( SeqCst ) ) ;
642652 if !state. is_open {
643- return Err ( SendError {
653+ return Poll :: Ready ( Err ( SendError {
644654 kind : SendErrorKind :: Disconnected ,
645- } ) ;
655+ } ) ) ;
646656 }
647657
648- Ok ( self . poll_unparked ( Some ( cx) ) )
658+ self . poll_unparked ( Some ( cx) ) . map ( Ok )
649659 }
650660
651661 /// Returns whether this channel is closed without needing a context.
@@ -662,7 +672,7 @@ impl<T> Sender<T> {
662672 let _ = self . do_send_nb ( None ) ;
663673 }
664674
665- fn poll_unparked ( & mut self , cx : Option < & mut task:: Context > ) -> Async < ( ) > {
675+ fn poll_unparked ( & mut self , cx : Option < & mut task:: Context > ) -> Poll < ( ) > {
666676 // First check the `maybe_parked` variable. This avoids acquiring the
667677 // lock in most cases
668678 if self . maybe_parked {
@@ -671,7 +681,7 @@ impl<T> Sender<T> {
671681
672682 if !task. is_parked {
673683 self . maybe_parked = false ;
674- return Async :: Ready ( ( ) )
684+ return Poll :: Ready ( ( ) )
675685 }
676686
677687 // At this point, an unpark request is pending, so there will be an
@@ -682,16 +692,16 @@ impl<T> Sender<T> {
682692 // task
683693 task. task = cx. map ( |cx| cx. waker ( ) . clone ( ) ) ;
684694
685- Async :: Pending
695+ Poll :: Pending
686696 } else {
687- Async :: Ready ( ( ) )
697+ Poll :: Ready ( ( ) )
688698 }
689699 }
690700}
691701
692702impl < T > UnboundedSender < T > {
693703 /// Check if the channel is ready to receive a message.
694- pub fn poll_ready ( & self , _: & mut task:: Context ) -> Poll < ( ) , SendError > {
704+ pub fn poll_ready ( & self , _: & mut task:: Context ) -> Poll < Result < ( ) , SendError > > {
695705 self . 0 . poll_ready_nb ( )
696706 }
697707
@@ -832,14 +842,14 @@ impl<T> Receiver<T> {
832842 /// no longer empty.
833843 pub fn try_next ( & mut self ) -> Result < Option < T > , TryRecvError > {
834844 match self . next_message ( ) {
835- Async :: Ready ( msg) => {
845+ Poll :: Ready ( msg) => {
836846 Ok ( msg)
837847 } ,
838- Async :: Pending => Err ( TryRecvError { _inner : ( ) } ) ,
848+ Poll :: Pending => Err ( TryRecvError { _inner : ( ) } ) ,
839849 }
840850 }
841851
842- fn next_message ( & mut self ) -> Async < Option < T > > {
852+ fn next_message ( & mut self ) -> Poll < Option < T > > {
843853 // Pop off a message
844854 loop {
845855 match unsafe { self . inner . message_queue . pop ( ) } {
@@ -851,11 +861,11 @@ impl<T> Receiver<T> {
851861 // Decrement number of messages
852862 self . dec_num_messages ( ) ;
853863
854- return Async :: Ready ( msg) ;
864+ return Poll :: Ready ( msg) ;
855865 }
856866 PopResult :: Empty => {
857867 // The queue is empty, return Pending
858- return Async :: Pending ;
868+ return Poll :: Pending ;
859869 }
860870 PopResult :: Inconsistent => {
861871 // Inconsistent means that there will be a message to pop
@@ -937,27 +947,26 @@ impl<T> Receiver<T> {
937947
938948impl < T > Stream for Receiver < T > {
939949 type Item = T ;
940- type Error = Never ;
941950
942- fn poll_next ( & mut self , cx : & mut task:: Context ) -> Poll < Option < Self :: Item > , Self :: Error > {
951+ fn poll_next ( mut self : Pin < Self > , cx : & mut task:: Context ) -> Poll < Option < T > > {
943952 loop {
944953 // Try to read a message off of the message queue.
945954 let msg = match self . next_message ( ) {
946- Async :: Ready ( msg) => msg,
947- Async :: Pending => {
955+ Poll :: Ready ( msg) => msg,
956+ Poll :: Pending => {
948957 // There are no messages to read, in this case, attempt to
949958 // park. The act of parking will verify that the channel is
950959 // still empty after the park operation has completed.
951960 match self . try_park ( cx) {
952961 TryPark :: Parked => {
953962 // The task was parked, and the channel is still
954963 // empty, return Pending.
955- return Ok ( Async :: Pending ) ;
964+ return Poll :: Pending ;
956965 }
957966 TryPark :: Closed => {
958967 // The channel is closed, there will be no further
959968 // messages.
960- return Ok ( Async :: Ready ( None ) ) ;
969+ return Poll :: Ready ( None ) ;
961970 }
962971 TryPark :: NotEmpty => {
963972 // A message has been sent while attempting to
@@ -969,7 +978,7 @@ impl<T> Stream for Receiver<T> {
969978 }
970979 } ;
971980 // Return the message
972- return Ok ( Async :: Ready ( msg) ) ;
981+ return Poll :: Ready ( msg) ;
973982 }
974983 }
975984}
@@ -1005,10 +1014,9 @@ impl<T> UnboundedReceiver<T> {
10051014
10061015impl < T > Stream for UnboundedReceiver < T > {
10071016 type Item = T ;
1008- type Error = Never ;
10091017
1010- fn poll_next ( & mut self , cx : & mut task:: Context ) -> Poll < Option < Self :: Item > , Self :: Error > {
1011- self . 0 . poll_next ( cx)
1018+ fn poll_next ( mut self : Pin < Self > , cx : & mut task:: Context ) -> Poll < Option < T > > {
1019+ Pin :: new ( & mut self . 0 ) . poll_next ( cx)
10121020 }
10131021}
10141022
0 commit comments