11//! `Future`- and `Stream`-backed timers APIs.
22
3- use super :: sys:: * ;
4- use crate :: callback:: Timeout ;
3+ use crate :: callback:: { Timeout , Interval } ;
54
6- use futures_channel:: mpsc;
5+ use futures_channel:: { oneshot , mpsc} ;
76use futures_core:: stream:: Stream ;
87use std:: future:: Future ;
98use std:: pin:: Pin ;
10- use std:: task:: { Poll , Context , Waker } ;
11- use std:: sync:: { Arc , Mutex } ;
12- use wasm_bindgen:: JsCast ;
13- use wasm_bindgen:: prelude:: * ;
9+ use std:: task:: { Poll , Context } ;
1410
1511/// A scheduled timeout as a `Future`.
1612///
@@ -45,15 +41,7 @@ use wasm_bindgen::prelude::*;
4541#[ must_use = "futures do nothing unless polled or spawned" ]
4642pub struct TimeoutFuture {
4743 inner : Timeout ,
48- state : Arc < Mutex < TimeoutFutureState > > ,
49- }
50-
51- /// A state machine for the timeout future.
52- #[ derive( Debug ) ]
53- enum TimeoutFutureState {
54- Init ,
55- Polled ( Waker ) ,
56- Complete ,
44+ rx : oneshot:: Receiver < ( ) > ,
5745}
5846
5947impl TimeoutFuture {
@@ -75,37 +63,23 @@ impl TimeoutFuture {
7563 /// });
7664 /// ```
7765 pub fn new ( millis : u32 ) -> TimeoutFuture {
78- let state = Arc :: new ( Mutex :: new ( TimeoutFutureState :: Init ) ) ;
79- let state_ref = Arc :: downgrade ( & state) ;
66+ let ( tx, rx) = oneshot:: channel ( ) ;
8067 let inner = Timeout :: new ( millis, move || {
81- let state = match state_ref. upgrade ( ) {
82- Some ( s) => s,
83- None => return
84- } ;
85- let mut state = state. lock ( ) . expect ( "mutex should not be poisoned" ) ;
86- match & * state {
87- TimeoutFutureState :: Polled ( waker) => {
88- waker. wake_by_ref ( ) ;
89- }
90- _ => ( )
91- }
92- ( * state) = TimeoutFutureState :: Complete ;
68+ // if the receiver was dropped we do nothing.
69+ let _ = tx. send ( ( ) ) ;
9370 } ) ;
94- TimeoutFuture { inner, state }
71+ TimeoutFuture { inner, rx }
9572 }
9673}
9774
9875impl Future for TimeoutFuture {
9976 type Output = ( ) ;
10077
101- fn poll ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self :: Output > {
102- let mut state = self . state . lock ( ) . unwrap ( ) ;
103- match * state {
104- TimeoutFutureState :: Init | TimeoutFutureState :: Polled ( _) => {
105- ( * state) = TimeoutFutureState :: Polled ( cx. waker ( ) . clone ( ) ) ;
106- Poll :: Pending
107- }
108- TimeoutFutureState :: Complete => Poll :: Ready ( ( ) ) ,
78+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self :: Output > {
79+ match Future :: poll ( Pin :: new ( & mut self . rx ) , cx) {
80+ Poll :: Pending => Poll :: Pending ,
81+ Poll :: Ready ( Ok ( t) ) => Poll :: Ready ( t) ,
82+ Poll :: Ready ( Err ( e) ) => panic ! ( "{}" , e) ,
10983 }
11084 }
11185}
@@ -120,10 +94,8 @@ impl Future for TimeoutFuture {
12094#[ derive( Debug ) ]
12195#[ must_use = "streams do nothing unless polled or spawned" ]
12296pub struct IntervalStream {
123- millis : u32 ,
124- id : Option < i32 > ,
125- closure : Closure < dyn FnMut ( ) > ,
126- inner : mpsc:: UnboundedReceiver < ( ) > ,
97+ receiver : mpsc:: UnboundedReceiver < ( ) > ,
98+ inner : Interval ,
12799}
128100
129101impl IntervalStream {
@@ -136,8 +108,8 @@ impl IntervalStream {
136108 /// # Example
137109 ///
138110 /// ```no_run
139- /// use gloo_timers::future::IntervalStream;
140111 /// use futures_util::stream::StreamExt;
112+ /// use gloo_timers::future::IntervalStream;
141113 /// use wasm_bindgen_futures::spawn_local;
142114 ///
143115 /// spawn_local(async {
@@ -148,38 +120,19 @@ impl IntervalStream {
148120 /// ```
149121 pub fn new ( millis : u32 ) -> IntervalStream {
150122 let ( sender, receiver) = mpsc:: unbounded ( ) ;
151- let closure = Closure :: wrap ( Box :: new ( move || {
152- sender. unbounded_send ( ( ) ) . unwrap ( ) ;
153- } ) as Box < dyn FnMut ( ) > ) ;
154-
155- IntervalStream {
156- millis,
157- id : None ,
158- closure,
159- inner : receiver,
160- }
161- }
162- }
123+ let inner = Interval :: new ( millis, move || {
124+ // if the receiver was dropped we do nothing.
125+ let _ = sender. unbounded_send ( ( ) ) ;
126+ } ) ;
163127
164- impl Drop for IntervalStream {
165- fn drop ( & mut self ) {
166- if let Some ( id) = self . id {
167- clear_interval ( id) ;
168- }
128+ IntervalStream { receiver, inner }
169129 }
170130}
171131
172132impl Stream for IntervalStream {
173133 type Item = ( ) ;
174134
175135 fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Option < Self :: Item > > {
176- if self . id . is_none ( ) {
177- self . id = Some ( set_interval (
178- self . closure . as_ref ( ) . unchecked_ref :: < js_sys:: Function > ( ) ,
179- self . millis as i32 ,
180- ) ) ;
181- }
182-
183- Pin :: new ( & mut self . inner ) . poll_next ( cx)
136+ Stream :: poll_next ( Pin :: new ( & mut self . receiver ) , cx)
184137 }
185138}
0 commit comments