Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 37 additions & 73 deletions timely/src/progress/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,9 @@ impl<T> ::std::iter::IntoIterator for Antichain<T> {
/// The `MutableAntichain` implementation is done with the intent that updates to it are done in batches,
/// and it is acceptable to rebuild the frontier from scratch when a batch of updates change it. This means
/// that it can be expensive to maintain a large number of counts and change few elements near the frontier.
///
/// There is an `update_dirty` method for single updates that leave the `MutableAntichain` in a dirty state,
/// but I strongly recommend against using them unless you must (on part of timely progress tracking seems
/// to be greatly simplified by access to this)
#[derive(Clone, Debug, Abomonation, Serialize, Deserialize)]
pub struct MutableAntichain<T> {
dirty: usize,
updates: Vec<(T, i64)>,
updates: ChangeBatch<T>,
frontier: Vec<T>,
changes: ChangeBatch<T>,
}
Expand All @@ -334,8 +329,7 @@ impl<T> MutableAntichain<T> {
#[inline]
pub fn new() -> MutableAntichain<T> {
MutableAntichain {
dirty: 0,
updates: Vec::new(),
updates: ChangeBatch::new(),
frontier: Vec::new(),
changes: ChangeBatch::new(),
}
Expand All @@ -354,21 +348,11 @@ impl<T> MutableAntichain<T> {
///```
#[inline]
pub fn clear(&mut self) {
self.dirty = 0;
self.updates.clear();
self.frontier.clear();
self.changes.clear();
}

/// This method deletes the contents. Unlike `clear` it records doing so.
pub fn empty(&mut self) {
for (_, diff) in self.updates.iter_mut() {
*diff = 0;
}

self.dirty = self.updates.len();
}

/// Reveals the minimal elements with positive count.
///
/// # Examples
Expand All @@ -381,7 +365,6 @@ impl<T> MutableAntichain<T> {
///```
#[inline]
pub fn frontier(&self) -> AntichainRef<'_, T> {
debug_assert_eq!(self.dirty, 0);
AntichainRef::new(&self.frontier)
}

Expand All @@ -396,13 +379,12 @@ impl<T> MutableAntichain<T> {
/// assert!(frontier.frontier() == AntichainRef::new(&[0u64]));
///```
#[inline]
pub fn new_bottom(bottom: T) -> MutableAntichain<T>
pub fn new_bottom(bottom: T) -> MutableAntichain<T>
where
T: Clone,
T: Ord+Clone,
{
MutableAntichain {
dirty: 0,
updates: vec![(bottom.clone(), 1)],
updates: ChangeBatch::new_from(bottom.clone(), 1),
frontier: vec![bottom],
changes: ChangeBatch::new(),
}
Expand All @@ -420,7 +402,6 @@ impl<T> MutableAntichain<T> {
///```
#[inline]
pub fn is_empty(&self) -> bool {
debug_assert_eq!(self.dirty, 0);
self.frontier.is_empty()
}

Expand All @@ -441,7 +422,6 @@ impl<T> MutableAntichain<T> {
where
T: PartialOrder,
{
debug_assert_eq!(self.dirty, 0);
self.frontier().less_than(time)
}

Expand All @@ -462,22 +442,9 @@ impl<T> MutableAntichain<T> {
where
T: PartialOrder,
{
debug_assert_eq!(self.dirty, 0);
self.frontier().less_equal(time)
}

/// Allows a single-element push, but dirties the antichain and prevents inspection until cleaned.
///
/// At the moment inspection is prevented via panic, so best be careful (this should probably be fixed).
/// It is *very* important if you want to use this method that very soon afterwards you call something
/// akin to `update_iter`, perhaps with a `None` argument if you have no more data, as this method will
/// tidy up the internal representation.
#[inline]
pub fn update_dirty(&mut self, time: T, delta: i64) {
self.updates.push((time, delta));
self.dirty += 1;
}

/// Applies updates to the antichain and enumerates any changes.
///
/// # Examples
Expand All @@ -502,40 +469,28 @@ impl<T> MutableAntichain<T> {
{
let updates = updates.into_iter();

// Attempt to pre-allocate for the new updates
let (min, max) = updates.size_hint();
self.updates.reserve(max.unwrap_or(min));

for (time, delta) in updates {
self.updates.push((time, delta));
self.dirty += 1;
}

// track whether a rebuild is needed.
let mut rebuild_required = false;
for (time, delta) in updates {

// determine if recently pushed data requires rebuilding the frontier.
// note: this may be required even with an empty iterator, due to dirty data in self.updates.
while self.dirty > 0 && !rebuild_required {

let time = &self.updates[self.updates.len() - self.dirty].0;
let delta = self.updates[self.updates.len() - self.dirty].1;

let beyond_frontier = self.frontier.iter().any(|f| f.less_than(time));
let before_frontier = !self.frontier.iter().any(|f| f.less_equal(time));
rebuild_required = rebuild_required || !(beyond_frontier || (delta < 0 && before_frontier));
// If we do not yet require a rebuild, test whether we might require one
// and set the flag in that case.
if !rebuild_required {
let beyond_frontier = self.frontier.iter().any(|f| f.less_than(&time));
let before_frontier = !self.frontier.iter().any(|f| f.less_equal(&time));
rebuild_required = !(beyond_frontier || (delta < 0 && before_frontier));
}

self.dirty -= 1;
self.updates.update(time, delta);
}
self.dirty = 0;

if rebuild_required {
self.rebuild()
}
self.changes.drain()
}

/// Sorts and consolidates `self.updates` and applies `action` to any frontier changes.
/// Rebuilds `self.frontier` from `self.updates`.
///
/// This method is meant to be used for bulk updates to the frontier, and does more work than one might do
/// for single updates, but is meant to be an efficient way to process multiple updates together. This is
Expand All @@ -544,19 +499,6 @@ impl<T> MutableAntichain<T> {
where
T: Clone + PartialOrder + Ord,
{

// sort and consolidate updates; retain non-zero accumulations.
if !self.updates.is_empty() {
self.updates.sort_by(|x,y| x.0.cmp(&y.0));
for i in 0 .. self.updates.len() - 1 {
if self.updates[i].0 == self.updates[i+1].0 {
self.updates[i+1].1 += self.updates[i].1;
self.updates[i].1 = 0;
}
}
self.updates.retain(|x| x.1 != 0);
}

for time in self.frontier.drain(..) {
self.changes.update(time, -1);
}
Expand All @@ -580,6 +522,7 @@ impl<T> MutableAntichain<T> {
T: Ord,
{
self.updates
.unstable_internal_updates()
.iter()
.filter(|td| td.0.eq(query_time))
.map(|td| td.1)
Expand Down Expand Up @@ -804,4 +747,25 @@ mod tests {
assert!(!hashed.contains(&Antichain::from(vec![Elem('c', 3)])));
assert!(!hashed.contains(&Antichain::from(vec![])));
}

#[test]
fn mutable_compaction() {
let mut mutable = MutableAntichain::new();
mutable.update_iter(Some((7, 1)));
mutable.update_iter(Some((7, 1)));
mutable.update_iter(Some((7, 1)));
mutable.update_iter(Some((7, 1)));
mutable.update_iter(Some((7, 1)));
mutable.update_iter(Some((7, 1)));
mutable.update_iter(Some((8, 1)));
mutable.update_iter(Some((8, 1)));
mutable.update_iter(Some((8, 1)));
mutable.update_iter(Some((8, 1)));
mutable.update_iter(Some((8, 1)));
for _ in 0 .. 1000 {
mutable.update_iter(Some((9, 1)));
mutable.update_iter(Some((9, -1)));
}
assert!(mutable.updates.unstable_internal_updates().len() <= 32);
}
}