diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index ea52ba63e..53699dfb8 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -308,14 +308,9 @@ impl ::std::iter::IntoIterator for Antichain { /// The `MutableAntichain` implementation is done with the intent that updates to it are done in batches, /// and it is acceptable to rebuild the frontier from scratch when a batch of updates change it. This means /// that it can be expensive to maintain a large number of counts and change few elements near the frontier. -/// -/// There is an `update_dirty` method for single updates that leave the `MutableAntichain` in a dirty state, -/// but I strongly recommend against using them unless you must (on part of timely progress tracking seems -/// to be greatly simplified by access to this) #[derive(Clone, Debug, Abomonation, Serialize, Deserialize)] pub struct MutableAntichain { - dirty: usize, - updates: Vec<(T, i64)>, + updates: ChangeBatch, frontier: Vec, changes: ChangeBatch, } @@ -334,8 +329,7 @@ impl MutableAntichain { #[inline] pub fn new() -> MutableAntichain { MutableAntichain { - dirty: 0, - updates: Vec::new(), + updates: ChangeBatch::new(), frontier: Vec::new(), changes: ChangeBatch::new(), } @@ -354,21 +348,11 @@ impl MutableAntichain { ///``` #[inline] pub fn clear(&mut self) { - self.dirty = 0; self.updates.clear(); self.frontier.clear(); self.changes.clear(); } - /// This method deletes the contents. Unlike `clear` it records doing so. - pub fn empty(&mut self) { - for (_, diff) in self.updates.iter_mut() { - *diff = 0; - } - - self.dirty = self.updates.len(); - } - /// Reveals the minimal elements with positive count. /// /// # Examples @@ -381,7 +365,6 @@ impl MutableAntichain { ///``` #[inline] pub fn frontier(&self) -> AntichainRef<'_, T> { - debug_assert_eq!(self.dirty, 0); AntichainRef::new(&self.frontier) } @@ -396,13 +379,12 @@ impl MutableAntichain { /// assert!(frontier.frontier() == AntichainRef::new(&[0u64])); ///``` #[inline] - pub fn new_bottom(bottom: T) -> MutableAntichain + pub fn new_bottom(bottom: T) -> MutableAntichain where - T: Clone, + T: Ord+Clone, { MutableAntichain { - dirty: 0, - updates: vec![(bottom.clone(), 1)], + updates: ChangeBatch::new_from(bottom.clone(), 1), frontier: vec![bottom], changes: ChangeBatch::new(), } @@ -420,7 +402,6 @@ impl MutableAntichain { ///``` #[inline] pub fn is_empty(&self) -> bool { - debug_assert_eq!(self.dirty, 0); self.frontier.is_empty() } @@ -441,7 +422,6 @@ impl MutableAntichain { where T: PartialOrder, { - debug_assert_eq!(self.dirty, 0); self.frontier().less_than(time) } @@ -462,22 +442,9 @@ impl MutableAntichain { where T: PartialOrder, { - debug_assert_eq!(self.dirty, 0); self.frontier().less_equal(time) } - /// Allows a single-element push, but dirties the antichain and prevents inspection until cleaned. - /// - /// At the moment inspection is prevented via panic, so best be careful (this should probably be fixed). - /// It is *very* important if you want to use this method that very soon afterwards you call something - /// akin to `update_iter`, perhaps with a `None` argument if you have no more data, as this method will - /// tidy up the internal representation. - #[inline] - pub fn update_dirty(&mut self, time: T, delta: i64) { - self.updates.push((time, delta)); - self.dirty += 1; - } - /// Applies updates to the antichain and enumerates any changes. /// /// # Examples @@ -502,32 +469,20 @@ impl MutableAntichain { { let updates = updates.into_iter(); - // Attempt to pre-allocate for the new updates - let (min, max) = updates.size_hint(); - self.updates.reserve(max.unwrap_or(min)); - - for (time, delta) in updates { - self.updates.push((time, delta)); - self.dirty += 1; - } - // track whether a rebuild is needed. let mut rebuild_required = false; + for (time, delta) in updates { - // determine if recently pushed data requires rebuilding the frontier. - // note: this may be required even with an empty iterator, due to dirty data in self.updates. - while self.dirty > 0 && !rebuild_required { - - let time = &self.updates[self.updates.len() - self.dirty].0; - let delta = self.updates[self.updates.len() - self.dirty].1; - - let beyond_frontier = self.frontier.iter().any(|f| f.less_than(time)); - let before_frontier = !self.frontier.iter().any(|f| f.less_equal(time)); - rebuild_required = rebuild_required || !(beyond_frontier || (delta < 0 && before_frontier)); + // If we do not yet require a rebuild, test whether we might require one + // and set the flag in that case. + if !rebuild_required { + let beyond_frontier = self.frontier.iter().any(|f| f.less_than(&time)); + let before_frontier = !self.frontier.iter().any(|f| f.less_equal(&time)); + rebuild_required = !(beyond_frontier || (delta < 0 && before_frontier)); + } - self.dirty -= 1; + self.updates.update(time, delta); } - self.dirty = 0; if rebuild_required { self.rebuild() @@ -535,7 +490,7 @@ impl MutableAntichain { self.changes.drain() } - /// Sorts and consolidates `self.updates` and applies `action` to any frontier changes. + /// Rebuilds `self.frontier` from `self.updates`. /// /// This method is meant to be used for bulk updates to the frontier, and does more work than one might do /// for single updates, but is meant to be an efficient way to process multiple updates together. This is @@ -544,19 +499,6 @@ impl MutableAntichain { where T: Clone + PartialOrder + Ord, { - - // sort and consolidate updates; retain non-zero accumulations. - if !self.updates.is_empty() { - self.updates.sort_by(|x,y| x.0.cmp(&y.0)); - for i in 0 .. self.updates.len() - 1 { - if self.updates[i].0 == self.updates[i+1].0 { - self.updates[i+1].1 += self.updates[i].1; - self.updates[i].1 = 0; - } - } - self.updates.retain(|x| x.1 != 0); - } - for time in self.frontier.drain(..) { self.changes.update(time, -1); } @@ -580,6 +522,7 @@ impl MutableAntichain { T: Ord, { self.updates + .unstable_internal_updates() .iter() .filter(|td| td.0.eq(query_time)) .map(|td| td.1) @@ -804,4 +747,25 @@ mod tests { assert!(!hashed.contains(&Antichain::from(vec![Elem('c', 3)]))); assert!(!hashed.contains(&Antichain::from(vec![]))); } + + #[test] + fn mutable_compaction() { + let mut mutable = MutableAntichain::new(); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((8, 1))); + mutable.update_iter(Some((8, 1))); + mutable.update_iter(Some((8, 1))); + mutable.update_iter(Some((8, 1))); + mutable.update_iter(Some((8, 1))); + for _ in 0 .. 1000 { + mutable.update_iter(Some((9, 1))); + mutable.update_iter(Some((9, -1))); + } + assert!(mutable.updates.unstable_internal_updates().len() <= 32); + } }