Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::fmt::Debug;
use std::collections::BTreeMap;

use timely::dataflow::operators::probe::Handle;
use timely::progress::frontier::AntichainRef;

use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;
Expand Down Expand Up @@ -77,8 +78,8 @@ fn main() {
graph.close();
for i in 1..rounds + 1 {
/* Advance the trace frontier to enable trace compaction. */
graph_trace.distinguish_since(&[i]);
graph_trace.advance_by(&[i]);
graph_trace.distinguish_since(AntichainRef::new(&[i]));
graph_trace.advance_by(AntichainRef::new(&[i]));
worker.step_while(|| probe.less_than(&i));
dump_cursor(i, worker.index(), &mut graph_trace);
}
Expand All @@ -92,8 +93,8 @@ fn main() {
}
graph.advance_to(i);
graph.flush();
graph_trace.distinguish_since(&[i]);
graph_trace.advance_by(&[i]);
graph_trace.distinguish_since(AntichainRef::new(&[i]));
graph_trace.advance_by(AntichainRef::new(&[i]));
worker.step_while(|| probe.less_than(graph.time()));
dump_cursor(i, worker.index(), &mut graph_trace);
}
Expand Down
84 changes: 73 additions & 11 deletions src/lattice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
//! `Lattice` trait, and all reasoning in operators are done it terms of `Lattice` methods.

use timely::order::PartialOrder;
use timely::progress::{Timestamp, Antichain};
use timely::progress::{Antichain, frontier::AntichainRef};

/// A bounded partially ordered type supporting joins and meets.
pub trait Lattice : PartialOrder+Timestamp {
pub trait Lattice : PartialOrder {

/// The smallest element greater than or equal to both arguments.
///
Expand Down Expand Up @@ -118,17 +118,19 @@ pub trait Lattice : PartialOrder+Timestamp {
/// # use differential_dataflow::lattice::Lattice;
/// # fn main() {
///
/// use timely::progress::frontier::{Antichain, AntichainRef};
///
/// let time = Product::new(3, 7);
/// let mut advanced = Product::new(3, 7);
/// let frontier = vec![Product::new(4, 8), Product::new(5, 3)];
/// advanced.advance_by(&frontier[..]);
/// let frontier = Antichain::from(vec![Product::new(4, 8), Product::new(5, 3)]);
/// advanced.advance_by(frontier.borrow());
///
/// // `time` and `advanced` are indistinguishable to elements >= an element of `frontier`
/// for i in 0 .. 10 {
/// for j in 0 .. 10 {
/// let test = Product::new(i, j);
/// // for `test` in the future of `frontier` ..
/// if frontier.iter().any(|t| t.less_equal(&test)) {
/// if frontier.less_equal(&test) {
/// assert_eq!(time.less_equal(&test), advanced.less_equal(&test));
/// }
/// }
Expand All @@ -138,10 +140,11 @@ pub trait Lattice : PartialOrder+Timestamp {
/// # }
/// ```
#[inline]
fn advance_by(&mut self, frontier: &[Self]) where Self: Sized {
if let Some(first) = frontier.get(0) {
fn advance_by(&mut self, frontier: AntichainRef<Self>) where Self: Sized {
let mut iter = frontier.iter();
if let Some(first) = iter.next() {
let mut result = self.join(first);
for f in &frontier[1..] {
for f in iter {
result.meet_assign(&self.join(f));
}
*self = result;
Expand Down Expand Up @@ -194,8 +197,11 @@ implement_lattice!(i16, 0);
implement_lattice!(i8, 0);
implement_lattice!((), ());

/// Given two slices representing minimal antichains,
/// returns the "smallest" minimal antichain "greater or equal" to them.
/// Returns the "smallest" minimal antichain "greater or equal" to both inputs.
///
/// This method is primarily meant for cases where one cannot use the methods
/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
/// references rather than owned antichains.
///
/// # Examples
///
Expand All @@ -211,7 +217,7 @@ implement_lattice!((), ());
/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
/// let f2 = &[Product::new(4, 6)];
/// let join = antichain_join(f1, f2);
/// assert_eq!(join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
/// # }
/// ```
pub fn antichain_join<T: Lattice>(one: &[T], other: &[T]) -> Antichain<T> {
Expand All @@ -223,3 +229,59 @@ pub fn antichain_join<T: Lattice>(one: &[T], other: &[T]) -> Antichain<T> {
}
upper
}

/// Returns the "greatest" minimal antichain "less or equal" to both inputs.
///
/// This method is primarily meant for cases where one cannot use the methods
/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
/// references rather than owned antichains.
///
/// # Examples
///
/// ```
/// # extern crate timely;
/// # extern crate differential_dataflow;
/// # use timely::PartialOrder;
/// # use timely::order::Product;
/// # use differential_dataflow::lattice::Lattice;
/// # use differential_dataflow::lattice::antichain_meet;
/// # fn main() {
///
/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
/// let f2 = &[Product::new(4, 6)];
/// let meet = antichain_meet(f1, f2);
/// assert_eq!(&*meet.elements(), &[Product::new(3, 7), Product::new(4, 6)]);
/// # }
/// ```
pub fn antichain_meet<T: Lattice+Clone>(one: &[T], other: &[T]) -> Antichain<T> {
let mut upper = Antichain::new();
for time1 in one {
upper.insert(time1.clone());
}
for time2 in other {
upper.insert(time2.clone());
}
upper
}

impl<T: Lattice+Clone> Lattice for Antichain<T> {
fn join(&self, other: &Self) -> Self {
let mut upper = Antichain::new();
for time1 in self.elements().iter() {
for time2 in other.elements().iter() {
upper.insert(time1.join(time2));
}
}
upper
}
fn meet(&self, other: &Self) -> Self {
let mut upper = Antichain::new();
for time1 in self.elements().iter() {
upper.insert(time1.clone());
}
for time2 in other.elements().iter() {
upper.insert(time2.clone());
}
upper
}
}
54 changes: 29 additions & 25 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::collections::VecDeque;
use timely::dataflow::Scope;
use timely::dataflow::operators::generic::source;
use timely::progress::Timestamp;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::dataflow::operators::CapabilitySet;

use lattice::Lattice;
Expand All @@ -33,8 +34,8 @@ where
{
trace: Rc<RefCell<TraceBox<Tr>>>,
queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
advance: Vec<Tr::Time>,
through: Vec<Tr::Time>,
advance: Antichain<Tr::Time>,
through: Antichain<Tr::Time>,

operator: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<::logging::Logger>,
Expand All @@ -53,23 +54,23 @@ where
type Batch = Tr::Batch;
type Cursor = Tr::Cursor;

fn advance_by(&mut self, frontier: &[Tr::Time]) {
self.trace.borrow_mut().adjust_advance_frontier(&self.advance[..], frontier);
fn advance_by(&mut self, frontier: AntichainRef<Tr::Time>) {
self.trace.borrow_mut().adjust_advance_frontier(self.advance.borrow(), frontier);
self.advance.clear();
self.advance.extend(frontier.iter().cloned());
}
fn advance_frontier(&mut self) -> &[Tr::Time] {
&self.advance[..]
fn advance_frontier(&mut self) -> AntichainRef<Tr::Time> {
self.advance.borrow()
}
fn distinguish_since(&mut self, frontier: &[Tr::Time]) {
self.trace.borrow_mut().adjust_through_frontier(&self.through[..], frontier);
fn distinguish_since(&mut self, frontier: AntichainRef<Tr::Time>) {
self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), frontier);
self.through.clear();
self.through.extend(frontier.iter().cloned());
}
fn distinguish_frontier(&mut self) -> &[Tr::Time] {
&self.through[..]
fn distinguish_frontier(&mut self) -> AntichainRef<Tr::Time> {
self.through.borrow()
}
fn cursor_through(&mut self, frontier: &[Tr::Time]) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
self.trace.borrow_mut().trace.cursor_through(frontier)
}
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, f: F) { self.trace.borrow_mut().trace.map_batches(f) }
Expand Down Expand Up @@ -98,8 +99,8 @@ where
let reader = TraceAgent {
trace: trace.clone(),
queues: Rc::downgrade(&queues),
advance: trace.borrow().advance_frontiers.frontier().to_vec(),
through: trace.borrow().through_frontiers.frontier().to_vec(),
advance: trace.borrow().advance_frontiers.frontier().to_owned(),
through: trace.borrow().through_frontiers.frontier().to_owned(),
operator,
logging,
};
Expand Down Expand Up @@ -130,7 +131,7 @@ where
.trace
.map_batches(|batch| {
new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(<Tr::Time as Timestamp>::minimum())));
upper = Some(batch.upper().to_vec());
upper = Some(batch.upper().clone());
});

if let Some(upper) = upper {
Expand Down Expand Up @@ -318,7 +319,7 @@ where
for instruction in borrow.drain(..) {
match instruction {
TraceReplayInstruction::Frontier(frontier) => {
capabilities.downgrade(&frontier[..]);
capabilities.downgrade(&frontier.borrow()[..]);
},
TraceReplayInstruction::Batch(batch, hint) => {
if let Some(time) = hint {
Expand Down Expand Up @@ -349,6 +350,7 @@ where
/// extern crate differential_dataflow;
///
/// use timely::Configuration;
/// use timely::progress::frontier::AntichainRef;
/// use timely::dataflow::ProbeHandle;
/// use timely::dataflow::operators::Probe;
/// use timely::dataflow::operators::Inspect;
Expand Down Expand Up @@ -380,7 +382,7 @@ where
/// handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
/// handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
///
/// trace.advance_by(&[5]);
/// trace.advance_by(AntichainRef::new(&[5]));
///
/// // create a second dataflow
/// let mut shutdown = worker.dataflow(|scope| {
Expand Down Expand Up @@ -416,19 +418,19 @@ where
Tr: TraceReader,
{
// This frontier describes our only guarantee on the compaction frontier.
let frontier = self.advance_frontier().to_vec();
let frontier = self.advance_frontier().to_owned();
self.import_frontier_core(scope, name, frontier)
}

/// Import a trace advanced to a specific frontier.
pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, frontier:Vec<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, frontier: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp+ Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
let trace = self.clone();
let trace = TraceFrontier::make_from(trace, &frontier[..]);
let trace = TraceFrontier::make_from(trace, frontier.borrow());

let mut shutdown_button = None;

Expand Down Expand Up @@ -456,13 +458,13 @@ where
for instruction in borrow.drain(..) {
match instruction {
TraceReplayInstruction::Frontier(frontier) => {
capabilities.downgrade(&frontier[..]);
capabilities.downgrade(&frontier.borrow()[..]);
},
TraceReplayInstruction::Batch(batch, hint) => {
if let Some(time) = hint {
if !batch.is_empty() {
let delayed = capabilities.delayed(&time);
output.session(&delayed).give(BatchFrontier::make_from(batch, &frontier[..]));
output.session(&delayed).give(BatchFrontier::make_from(batch, frontier.borrow()));
}
}
}
Expand Down Expand Up @@ -530,8 +532,9 @@ where
}

// increase counts for wrapped `TraceBox`.
self.trace.borrow_mut().adjust_advance_frontier(&[], &self.advance[..]);
self.trace.borrow_mut().adjust_through_frontier(&[], &self.through[..]);
let empty_frontier = Antichain::new();
self.trace.borrow_mut().adjust_advance_frontier(empty_frontier.borrow(), self.advance.borrow());
self.trace.borrow_mut().adjust_through_frontier(empty_frontier.borrow(), self.through.borrow());

TraceAgent {
trace: self.trace.clone(),
Expand All @@ -558,7 +561,8 @@ where
}

// decrement borrow counts to remove all holds
self.trace.borrow_mut().adjust_advance_frontier(&self.advance[..], &[]);
self.trace.borrow_mut().adjust_through_frontier(&self.through[..], &[]);
let empty_frontier = Antichain::new();
self.trace.borrow_mut().adjust_advance_frontier(self.advance.borrow(), empty_frontier.borrow());
self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), empty_frontier.borrow());
}
}
15 changes: 8 additions & 7 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use timely::dataflow::{Scope, Stream};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
use timely::progress::Timestamp;
use timely::progress::frontier::Antichain;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::dataflow::operators::Capability;

use timely_sort::Unsigned;
Expand Down Expand Up @@ -263,7 +263,7 @@ where

let mut trace = Some(self.trace.clone());
// release `distinguish_since` capability.
trace.as_mut().unwrap().distinguish_since(&[]);
trace.as_mut().unwrap().distinguish_since(Antichain::new().borrow());

let mut stash = Vec::new();
let mut capability: Option<Capability<G::Timestamp>> = None;
Expand Down Expand Up @@ -390,13 +390,14 @@ where
}

// Determine new frontier on queries that may be issued.
// TODO: This code looks very suspect; explain better or fix.
let frontier = [
capability.as_ref().map(|c| c.time().clone()),
input1.frontier().frontier().get(0).cloned(),
].into_iter().cloned().filter_map(|t| t).min();

if let Some(frontier) = frontier {
trace.as_mut().map(|t| t.advance_by(&[frontier]));
trace.as_mut().map(|t| t.advance_by(AntichainRef::new(&[frontier])));
}
else {
trace = None;
Expand Down Expand Up @@ -617,7 +618,7 @@ where
}

// Extract updates not in advance of `upper`.
let batch = batcher.seal(upper.elements());
let batch = batcher.seal(upper.clone());

writer.insert(batch.clone(), Some(capability.time().clone()));

Expand All @@ -632,7 +633,7 @@ where
// in messages with new capabilities.

let mut new_capabilities = Antichain::new();
for time in batcher.frontier() {
for time in batcher.frontier().iter() {
if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
new_capabilities.insert(capability.delayed(time));
}
Expand All @@ -645,8 +646,8 @@ where
}
else {
// Announce progress updates, even without data.
let _batch = batcher.seal(&input.frontier().frontier()[..]);
writer.seal(&input.frontier().frontier());
let _batch = batcher.seal(input.frontier().frontier().to_owned());
writer.seal(input.frontier().frontier().to_owned());
}

input_frontier.clear();
Expand Down
Loading