@@ -39,11 +39,10 @@ impl Scheduler for Box<dyn Scheduler> {
3939/// Allocation-free activation tracker.
4040#[ derive( Debug ) ]
4141pub struct Activations {
42- clean : usize ,
43- /// `(offset, length)`
44- bounds : Vec < ( usize , usize ) > ,
45- slices : Vec < usize > ,
46- buffer : Vec < usize > ,
42+ /// Current activations that are being served.
43+ current : ActivationsBuffer ,
44+ /// Upcoming activations that may soon be served.
45+ pending : ActivationsBuffer ,
4746
4847 // Inter-thread activations.
4948 tx : Sender < Vec < usize > > ,
@@ -60,10 +59,8 @@ impl Activations {
6059 pub fn new ( timer : Instant ) -> Self {
6160 let ( tx, rx) = crossbeam_channel:: unbounded ( ) ;
6261 Self {
63- clean : 0 ,
64- bounds : Vec :: new ( ) ,
65- slices : Vec :: new ( ) ,
66- buffer : Vec :: new ( ) ,
62+ current : ActivationsBuffer :: new ( ) ,
63+ pending : ActivationsBuffer :: new ( ) ,
6764 tx,
6865 rx,
6966 timer,
@@ -73,8 +70,7 @@ impl Activations {
7370
7471 /// Activates the task addressed by `path`.
7572 pub fn activate ( & mut self , path : & [ usize ] ) {
76- self . bounds . push ( ( self . slices . len ( ) , path. len ( ) ) ) ;
77- self . slices . extend ( path) ;
73+ self . pending . activate ( path) ;
7874 }
7975
8076 /// Schedules a future activation for the task addressed by `path`.
@@ -89,6 +85,20 @@ impl Activations {
8985 }
9086 }
9187
88+ /// Reactivates the task addressed by `path`, ideally within `delay`.
89+ ///
90+ /// The task may be activated before `delay`, in which case the task should reactivate itself if
91+ /// it requires further reactivation, as this specific `delay` may no longer be in effect.
92+ pub fn activate_within ( & mut self , path : & [ usize ] , delay : Duration ) {
93+ if delay == Duration :: new ( 0 , 0 ) {
94+ self . activate ( path)
95+ }
96+ else {
97+ let moment = self . timer . elapsed ( ) + delay;
98+ self . pending . activate_by ( path, moment)
99+ }
100+ }
101+
92102 /// Discards the current active set and presents the next active set.
93103 pub fn advance ( & mut self ) {
94104
@@ -104,37 +114,135 @@ impl Activations {
104114 self . activate ( & path[ ..] ) ;
105115 }
106116
107- self . bounds . drain ( .. self . clean ) ;
117+ self . current . clear ( ) ;
118+ self . pending . compact ( ) ;
119+ self . pending . extract_through ( & mut self . current , now) ;
120+
121+ }
122+
123+ /// Maps a function across activated paths.
124+ pub fn map_active ( & self , logic : impl Fn ( & [ usize ] ) ) {
125+ self . current . map_active ( logic)
126+ }
127+
128+ /// Sets as active any symbols that follow `path`.
129+ pub fn for_extensions ( & self , path : & [ usize ] , action : impl FnMut ( usize ) ) {
130+ self . current . for_extensions ( path, action)
131+ }
132+
133+ /// Constructs a thread-safe `SyncActivations` handle to this activator.
134+ pub fn sync ( & self ) -> SyncActivations {
135+ SyncActivations {
136+ tx : self . tx . clone ( ) ,
137+ thread : std:: thread:: current ( ) ,
138+ }
139+ }
140+
141+ /// Time until next scheduled event.
142+ ///
143+ /// This method should be used before putting a worker thread to sleep, as it
144+ /// indicates the amount of time before the thread should be unparked for the
145+ /// next scheduled activation.
146+ pub fn empty_for ( & self ) -> Option < Duration > {
147+ if !self . current . is_empty ( ) || !self . pending . is_empty ( ) {
148+ Some ( Duration :: new ( 0 , 0 ) )
149+ }
150+ else {
151+ self . queue . peek ( ) . map ( |Reverse ( ( t, _a) ) | {
152+ let elapsed = self . timer . elapsed ( ) ;
153+ if t < & elapsed { Duration :: new ( 0 , 0 ) }
154+ else { * t - elapsed }
155+ } )
156+ }
157+ }
158+ }
159+
160+ /// Manages delayed activations for path-named tasks.
161+ #[ derive( Debug ) ]
162+ pub struct ActivationsBuffer {
163+ /// `(offset, length)`, and an elapsed timer duration.
164+ /// The zero duration can be used to indicate "immediately".
165+ bounds : Vec < ( usize , usize , Duration ) > ,
166+ /// Elements of path slices.
167+ slices : Vec < usize > ,
168+ /// A spare buffer to copy into.
169+ buffer : Vec < usize > ,
170+ }
171+
172+ impl ActivationsBuffer {
173+ /// Creates a new activation tracker.
174+ pub fn new ( ) -> Self {
175+ Self {
176+ bounds : Vec :: new ( ) ,
177+ slices : Vec :: new ( ) ,
178+ buffer : Vec :: new ( ) ,
179+ }
180+ }
181+
182+ fn clear ( & mut self ) {
183+ self . bounds . clear ( ) ;
184+ self . slices . clear ( ) ;
185+ self . buffer . clear ( ) ;
186+ }
187+
188+ fn is_empty ( & self ) -> bool {
189+ self . bounds . is_empty ( )
190+ }
191+
192+ /// Activates the task addressed by `path`.
193+ pub fn activate ( & mut self , path : & [ usize ] ) {
194+ self . activate_by ( path, Duration :: new ( 0 , 0 ) ) ;
195+ }
196+
197+ /// Activates the task addressed by `path`.
198+ pub fn activate_by ( & mut self , path : & [ usize ] , duration : Duration ) {
199+ self . bounds . push ( ( self . slices . len ( ) , path. len ( ) , duration) ) ;
200+ self . slices . extend ( path) ;
201+ }
202+
203+ /// Orders activations by their path, and retains only each's most immediate duration.
204+ pub fn compact ( & mut self ) {
108205
109206 { // Scoped, to allow borrow to drop.
110207 let slices = & self . slices [ ..] ;
111- self . bounds . sort_by_key ( |x| & slices[ x. 0 .. ( x. 0 + x. 1 ) ] ) ;
208+ // Sort slices by path, and within each path by duration.
209+ self . bounds . sort_by_key ( |x| ( & slices[ x. 0 .. ( x. 0 + x. 1 ) ] , x. 2 ) ) ;
210+ // Deduplicate by path, retaining the least duration.
112211 self . bounds . dedup_by_key ( |x| & slices[ x. 0 .. ( x. 0 + x. 1 ) ] ) ;
113212 }
114213
115214 // Compact the slices.
116215 self . buffer . clear ( ) ;
117- for ( offset, length) in self . bounds . iter_mut ( ) {
216+ for ( offset, length, _duration ) in self . bounds . iter_mut ( ) {
118217 self . buffer . extend ( & self . slices [ * offset .. ( * offset + * length) ] ) ;
119218 * offset = self . buffer . len ( ) - * length;
120219 }
121220 :: std:: mem:: swap ( & mut self . buffer , & mut self . slices ) ;
221+ }
122222
123- self . clean = self . bounds . len ( ) ;
223+ /// Extracts all activations less or equal to `threshold` into `other`.
224+ pub fn extract_through ( & mut self , other : & mut Self , threshold : Duration ) {
225+ for ( offset, length, duration) in self . bounds . iter_mut ( ) {
226+ if * duration <= threshold {
227+ other. activate_by ( & self . slices [ * offset .. ( * offset + * length) ] , * duration) ;
228+ }
229+ }
230+ self . bounds . retain ( |( _off, _len, duration) | * duration > threshold) ;
231+ // Could `self.compact()` here, but it will happen as part of future work.
124232 }
125233
126234 /// Maps a function across activated paths.
127235 pub fn map_active ( & self , logic : impl Fn ( & [ usize ] ) ) {
128- for ( offset, length) in self . bounds . iter ( ) {
236+ for ( offset, length, _duration ) in self . bounds . iter ( ) {
129237 logic ( & self . slices [ * offset .. ( * offset + * length) ] ) ;
130238 }
131239 }
132-
240+
133241 /// Sets as active any symbols that follow `path`.
134242 pub fn for_extensions ( & self , path : & [ usize ] , mut action : impl FnMut ( usize ) ) {
135243
136244 let position =
137- self . bounds [ .. self . clean ]
245+ self . bounds
138246 . binary_search_by_key ( & path, |x| & self . slices [ x. 0 .. ( x. 0 + x. 1 ) ] ) ;
139247 let position = match position {
140248 Ok ( x) => x,
@@ -146,7 +254,7 @@ impl Activations {
146254 . iter ( )
147255 . cloned ( )
148256 . skip ( position)
149- . map ( |( offset, length) | & self . slices [ offset .. ( offset + length) ] )
257+ . map ( |( offset, length, _ ) | & self . slices [ offset .. ( offset + length) ] )
150258 . take_while ( |x| x. starts_with ( path) )
151259 . for_each ( |x| {
152260 // push non-empty, non-duplicate extensions.
@@ -158,32 +266,6 @@ impl Activations {
158266 }
159267 } ) ;
160268 }
161-
162- /// Constructs a thread-safe `SyncActivations` handle to this activator.
163- pub fn sync ( & self ) -> SyncActivations {
164- SyncActivations {
165- tx : self . tx . clone ( ) ,
166- thread : std:: thread:: current ( ) ,
167- }
168- }
169-
170- /// Time until next scheduled event.
171- ///
172- /// This method should be used before putting a worker thread to sleep, as it
173- /// indicates the amount of time before the thread should be unparked for the
174- /// next scheduled activation.
175- pub fn empty_for ( & self ) -> Option < Duration > {
176- if !self . bounds . is_empty ( ) {
177- Some ( Duration :: new ( 0 , 0 ) )
178- }
179- else {
180- self . queue . peek ( ) . map ( |Reverse ( ( t, _a) ) | {
181- let elapsed = self . timer . elapsed ( ) ;
182- if t < & elapsed { Duration :: new ( 0 , 0 ) }
183- else { * t - elapsed }
184- } )
185- }
186- }
187269}
188270
189271/// A thread-safe handle to an `Activations`.
0 commit comments