diff --git a/advent_of_code_2017/src/bin/day_07.rs b/advent_of_code_2017/src/bin/day_07.rs index 5836836f9..be4146b32 100644 --- a/advent_of_code_2017/src/bin/day_07.rs +++ b/advent_of_code_2017/src/bin/day_07.rs @@ -3,7 +3,7 @@ extern crate differential_dataflow; // taken from: https://adventofcode.com/2017/day/6 -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; @@ -1074,7 +1074,7 @@ tvhftq (35)"; let index = worker.index(); let peers = worker.peers(); - let worker_input = + let worker_input = input .split('\n') .enumerate() @@ -1102,7 +1102,7 @@ tvhftq (35)"; let weights = input.explode(|(name,weight,_)| Some((name, weight as isize))); let parents = input.flat_map(|(name,_,links)| links.into_iter().map(move |link| (link,name.to_string()))); - let total_weights: Collection<_,String> = weights + let total_weights: VecCollection<_,String> = weights .iterate(|inner| { parents.enter(&inner.scope()) .semijoin(&inner) diff --git a/advent_of_code_2017/src/bin/day_08.rs b/advent_of_code_2017/src/bin/day_08.rs index 95a0efb2d..64627fba6 100644 --- a/advent_of_code_2017/src/bin/day_08.rs +++ b/advent_of_code_2017/src/bin/day_08.rs @@ -3,7 +3,7 @@ extern crate differential_dataflow; // taken from: https://adventofcode.com/2017/day/8 -// use differential_dataflow::Collection; +// use differential_dataflow::VecCollection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; use differential_dataflow::algorithms::prefix_sum::PrefixSum; @@ -1068,7 +1068,7 @@ wui inc -120 if i > -2038"; let edits = data.map(|(pos, line)| { // Each line has a read location with a condition, and a write location with an operation. - // E.g., + // E.g., // // kxm dec 986 if xbh <= -2515 // obc inc 63 if wui >= 2800 diff --git a/advent_of_code_2017/src/bin/day_09.rs b/advent_of_code_2017/src/bin/day_09.rs index 058ef7291..8ec779552 100644 --- a/advent_of_code_2017/src/bin/day_09.rs +++ b/advent_of_code_2017/src/bin/day_09.rs @@ -20,7 +20,7 @@ fn main() { let index = worker.index(); let peers = worker.peers(); - let worker_input = + let worker_input = input .chars() .enumerate() @@ -37,7 +37,7 @@ fn main() { // { next_invalid, garbage } // // where the first bool indicates that the next character should be ignored, - // and the second bool indicates that we are in a garbage scope. We will + // and the second bool indicates that we are in a garbage scope. We will // encode this as the values 0 .. 4, where // // 0: valid, non-garbage @@ -48,7 +48,7 @@ fn main() { // Each character initially describes a substring of length one, but we will // build up the state transition for larger substrings, iteratively. - let transitions = + let transitions = input .map(|(pos, character)| (pos, match character { @@ -95,7 +95,7 @@ fn main() { } /// Accumulate data in `collection` into all powers-of-two intervals containing them. -fn pp_aggregate(collection: Collection, combine: F) -> Collection +fn pp_aggregate(collection: VecCollection, combine: F) -> VecCollection where G: Scope, D: Data, @@ -105,7 +105,7 @@ where let unit_ranges = collection.map(|(index, data)| ((index, 0), data)); unit_ranges - .iterate(|ranges| + .iterate(|ranges| // Each available range, of size less than usize::max_value(), advertises itself as the range // twice as large, aligned to integer multiples of its size. Each range, which may contain at @@ -126,10 +126,10 @@ where /// Produces the accumulated values at each of the `usize` locations in `aggregates` (and others). fn pp_broadcast( - ranges: Collection, + ranges: VecCollection, seed: B, zero: D, - combine: F) -> Collection + combine: F) -> VecCollection where G: Scope, D: Data, @@ -137,7 +137,7 @@ where F: Fn(&B, &D) -> B + 'static, { // Each range proposes an empty first child, to provide for its second child if it has no sibling. - // This is important if we want to reconstruct + // This is important if we want to reconstruct let zero_ranges = ranges .map(move |((pos, log),_)| ((pos, if log > 0 { log - 1 } else { 0 }), zero.clone())) @@ -145,7 +145,7 @@ where let aggregates = ranges.concat(&zero_ranges); - let init_state = + let init_state = Some(((0, seed), Default::default(), 1)) .to_stream(&mut aggregates.scope()) .as_collection(); diff --git a/advent_of_code_2017/src/bin/day_10.rs b/advent_of_code_2017/src/bin/day_10.rs index 8f776bb59..fb998f636 100644 --- a/advent_of_code_2017/src/bin/day_10.rs +++ b/advent_of_code_2017/src/bin/day_10.rs @@ -3,7 +3,7 @@ extern crate differential_dataflow; // taken from: https://adventofcode.com/2017/day/8 -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; @@ -61,4 +61,4 @@ fn knot_step+Clone>(iter: I, rounds: usize) -> Vec { } state -} \ No newline at end of file +} diff --git a/advent_of_code_2017/src/bin/day_14.rs b/advent_of_code_2017/src/bin/day_14.rs index 9ba81f2e6..e6dd4f958 100644 --- a/advent_of_code_2017/src/bin/day_14.rs +++ b/advent_of_code_2017/src/bin/day_14.rs @@ -3,7 +3,7 @@ extern crate differential_dataflow; // taken from: https://adventofcode.com/2017/day/8 -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; @@ -31,8 +31,8 @@ fn main() { // println!("{:?}: \t{:?}", row, hash); (0 .. 128) - .map(move |col| - if hash[col/8] & (1 << (7-(col % 8))) != 0 { + .map(move |col| + if hash[col/8] & (1 << (7-(col % 8))) != 0 { (row, col, '#') } else { @@ -92,4 +92,4 @@ fn knot_step+Clone>(iter: I, rounds: usize) -> Vec { } state -} \ No newline at end of file +} diff --git a/advent_of_code_2017/src/bin/day_15.rs b/advent_of_code_2017/src/bin/day_15.rs index 0173f70b5..a8c2a8d79 100644 --- a/advent_of_code_2017/src/bin/day_15.rs +++ b/advent_of_code_2017/src/bin/day_15.rs @@ -3,7 +3,7 @@ extern crate differential_dataflow; // taken from: https://adventofcode.com/2017/day/8 -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; @@ -43,4 +43,4 @@ fn main() { println!("part2: {:?}", equal); -} \ No newline at end of file +} diff --git a/differential-dataflow/examples/bfs.rs b/differential-dataflow/examples/bfs.rs index 037ace6ac..b95540825 100644 --- a/differential-dataflow/examples/bfs.rs +++ b/differential-dataflow/examples/bfs.rs @@ -4,7 +4,7 @@ use timely::dataflow::*; use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; @@ -91,9 +91,9 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection +fn bfs(edges: &VecCollection, roots: &VecCollection) -> VecCollection where - G: Scope, + G: Scope, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); @@ -108,4 +108,4 @@ where .concat(&nodes) .reduce(|_, s, t| t.push((*s[0].0, 1))) }) -} \ No newline at end of file +} diff --git a/differential-dataflow/examples/dynamic.rs b/differential-dataflow/examples/dynamic.rs index e83de369f..c63dec185 100644 --- a/differential-dataflow/examples/dynamic.rs +++ b/differential-dataflow/examples/dynamic.rs @@ -4,7 +4,7 @@ use timely::dataflow::*; use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; @@ -91,9 +91,9 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection +fn bfs(edges: &VecCollection, roots: &VecCollection) -> VecCollection where - G: Scope, + G: Scope, { use timely::order::Product; use iterate::Variable; @@ -115,7 +115,7 @@ where let inner = feedback_summary::(1, 1); let label = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner }); - let next = + let next = label .join_map(&edges, |_k,l,d| (*d, l+1)) .concat(&nodes) @@ -130,4 +130,4 @@ where .leave() }) -} \ No newline at end of file +} diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index 5d1e4f95e..8886e21e9 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -8,11 +8,11 @@ use timely::order::Product; use timely::dataflow::Scope; use timely::dataflow::scopes::ScopeParent; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; use differential_dataflow::input::{Input, InputSession}; use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf}; -use differential_dataflow::operators::iterate::Variable; +use differential_dataflow::operators::iterate::VecVariable; use differential_dataflow::operators::Threshold; type Node = usize; @@ -83,16 +83,16 @@ type Arrange = Arranged::Tim /// An edge variable provides arranged representations of its contents, even before they are /// completely defined, in support of recursively defined productions. pub struct EdgeVariable> { - variable: Variable, - current: Collection, + variable: VecVariable, + current: VecCollection, forward: Option>, reverse: Option>, } impl> EdgeVariable { /// Creates a new variable initialized with `source`. - pub fn from(source: &Collection, step: ::Summary) -> Self { - let variable = Variable::new(&mut source.scope(), step); + pub fn from(source: &VecCollection, step: ::Summary) -> Self { + let variable = VecVariable::new(&mut source.scope(), step); EdgeVariable { variable: variable, current: source.clone(), @@ -101,7 +101,7 @@ impl> EdgeVariable { } } /// Concatenates `production` into the definition of the variable. - pub fn add_production(&mut self, production: &Collection) { + pub fn add_production(&mut self, production: &VecCollection) { self.current = self.current.concat(production); } /// Finalizes the variable, connecting its recursive definition. @@ -153,7 +153,7 @@ impl Query { /// Creates a dataflow implementing the query, and returns input and trace handles. pub fn render_in(&self, scope: &mut G) -> IndexMap> - where + where G: Scope, { // Create new input (handle, stream) pairs diff --git a/differential-dataflow/examples/interpreted.rs b/differential-dataflow/examples/interpreted.rs index 87aab361b..89d8ac046 100644 --- a/differential-dataflow/examples/interpreted.rs +++ b/differential-dataflow/examples/interpreted.rs @@ -2,7 +2,7 @@ use std::hash::Hash; use timely::dataflow::*; use timely::dataflow::operators::*; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::*; @@ -35,13 +35,13 @@ fn main() { println!("loaded {} nodes, {} edges", nodes, edges.len()); worker.dataflow::<(),_,_>(|scope| { - interpret(&Collection::new(edges.to_stream(scope)), &[(0,2), (1,2)]); + interpret(&VecCollection::new(edges.to_stream(scope)), &[(0,2), (1,2)]); }); }).unwrap(); } -fn interpret(edges: &Collection, relations: &[(usize, usize)]) -> Collection> +fn interpret(edges: &VecCollection, relations: &[(usize, usize)]) -> VecCollection> where G: Scope, { @@ -103,4 +103,4 @@ where } results -} \ No newline at end of file +} diff --git a/differential-dataflow/examples/iterate_container.rs b/differential-dataflow/examples/iterate_container.rs index df29fd009..9c941a8cc 100644 --- a/differential-dataflow/examples/iterate_container.rs +++ b/differential-dataflow/examples/iterate_container.rs @@ -52,12 +52,12 @@ fn main() { timely::example(|scope| { let numbers = scope.new_collection_from(1 .. 10u32).1; - let numbers: Collection<_, u32, isize, _> = wrap(&numbers.inner).as_collection(); + let numbers: Collection<_, _> = wrap(&numbers.inner).as_collection(); scope.iterative::(|nested| { let summary = Product::new(Default::default(), 1); let variable = Variable::new_from(numbers.enter(nested), summary); - let mapped: Collection<_, u32, isize, _> = variable.inner.unary(Pipeline, "Map", |_,_| { + let mapped: Collection<_, _> = variable.inner.unary(Pipeline, "Map", |_,_| { |input, output| { input.for_each(|time, data| { let mut session = output.session(&time); diff --git a/differential-dataflow/examples/monoid-bfs.rs b/differential-dataflow/examples/monoid-bfs.rs index 394b14e69..87127c2ed 100644 --- a/differential-dataflow/examples/monoid-bfs.rs +++ b/differential-dataflow/examples/monoid-bfs.rs @@ -5,7 +5,7 @@ use timely::dataflow::*; use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; @@ -123,7 +123,7 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection +fn bfs(edges: &VecCollection, roots: &VecCollection) -> VecCollection where G: Scope, { diff --git a/differential-dataflow/examples/pagerank.rs b/differential-dataflow/examples/pagerank.rs index 107e34092..d72bc6712 100644 --- a/differential-dataflow/examples/pagerank.rs +++ b/differential-dataflow/examples/pagerank.rs @@ -1,7 +1,7 @@ use timely::order::Product; use timely::dataflow::{*, operators::Filter}; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::{*, iterate::Variable}; use differential_dataflow::input::InputSession; @@ -77,7 +77,7 @@ fn main() { // Returns a weighted collection in which the weight of each node is proportional // to its PageRank in the input graph `edges`. -fn pagerank(iters: Iter, edges: &Collection) -> Collection +fn pagerank(iters: Iter, edges: &VecCollection) -> VecCollection where G: Scope, { diff --git a/differential-dataflow/examples/progress.rs b/differential-dataflow/examples/progress.rs index 8ee1d6f29..8be8cb7a4 100644 --- a/differential-dataflow/examples/progress.rs +++ b/differential-dataflow/examples/progress.rs @@ -5,7 +5,7 @@ use timely::dataflow::*; use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; @@ -115,10 +115,10 @@ fn main() { /// The computation to determine this, and to maintain it as times change, is an iterative /// computation that propagates times and maintains the minimal elements at each location. fn frontier( - nodes: Collection, - edges: Collection, - times: Collection, -) -> Collection + nodes: VecCollection, + edges: VecCollection, + times: VecCollection, +) -> VecCollection where G: Scope, T: Timestamp, @@ -126,7 +126,7 @@ where // Translate node and edge transitions into a common Location to Location edge with an associated Summary. let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary))); let edges = edges.map(|(source, target)| (Location::from(source), (Location::from(target), Default::default()))); - let transitions: Collection = nodes.concat(&edges); + let transitions: VecCollection = nodes.concat(&edges); times .iterate(|reach| { @@ -149,9 +149,9 @@ where /// Summary paths from locations to operator zero inputs. fn summarize( - nodes: Collection, - edges: Collection, -) -> Collection + nodes: VecCollection, + edges: VecCollection, +) -> VecCollection where G: Scope, T: Timestamp, @@ -166,7 +166,7 @@ where // Retain node connections along "default" timestamp summaries. let nodes = nodes.map(|(target, source, summary)| (Location::from(source), (Location::from(target), summary))); let edges = edges.map(|(source, target)| (Location::from(target), (Location::from(source), Default::default()))); - let transitions: Collection = nodes.concat(&edges); + let transitions: VecCollection = nodes.concat(&edges); zero_inputs .iterate(|summaries| { @@ -192,9 +192,9 @@ where /// Identifies cycles along paths that do not increment timestamps. fn find_cycles( - nodes: Collection, - edges: Collection, -) -> Collection + nodes: VecCollection, + edges: VecCollection, +) -> VecCollection where G: Scope, T: Timestamp, @@ -209,7 +209,7 @@ where } }); let edges = edges.map(|(source, target)| (Location::from(source), Location::from(target))); - let transitions: Collection = nodes.concat(&edges); + let transitions: VecCollection = nodes.concat(&edges); // Repeatedly restrict to locations with an incoming path. transitions @@ -223,4 +223,4 @@ where .semijoin(&active) }) .consolidate() -} \ No newline at end of file +} diff --git a/differential-dataflow/examples/stackoverflow.rs b/differential-dataflow/examples/stackoverflow.rs index ab64c51a0..a3a9bd260 100644 --- a/differential-dataflow/examples/stackoverflow.rs +++ b/differential-dataflow/examples/stackoverflow.rs @@ -5,7 +5,7 @@ use timely::dataflow::*; use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::InputSession; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; @@ -105,7 +105,7 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection +fn bfs(edges: &VecCollection, roots: &VecCollection) -> VecCollection where G: Scope, { @@ -122,4 +122,4 @@ where .concat(&nodes) .reduce(|_, s, t| t.push((*s[0].0, 1))) }) -} \ No newline at end of file +} diff --git a/differential-dataflow/src/algorithms/graphs/bfs.rs b/differential-dataflow/src/algorithms/graphs/bfs.rs index 9e792e082..39703891b 100644 --- a/differential-dataflow/src/algorithms/graphs/bfs.rs +++ b/differential-dataflow/src/algorithms/graphs/bfs.rs @@ -4,12 +4,12 @@ use std::hash::Hash; use timely::dataflow::*; -use crate::{Collection, ExchangeData}; +use crate::{VecCollection, ExchangeData}; use crate::operators::*; use crate::lattice::Lattice; /// Returns pairs (node, dist) indicating distance of each node from a root. -pub fn bfs(edges: &Collection, roots: &Collection) -> Collection +pub fn bfs(edges: &VecCollection, roots: &VecCollection) -> VecCollection where G: Scope, N: ExchangeData+Hash, @@ -23,7 +23,7 @@ use crate::trace::TraceReader; use crate::operators::arrange::Arranged; /// Returns pairs (node, dist) indicating distance of each node from a root. -pub fn bfs_arranged(edges: &Arranged, roots: &Collection) -> Collection +pub fn bfs_arranged(edges: &Arranged, roots: &VecCollection) -> VecCollection where G: Scope, N: ExchangeData+Hash, @@ -42,4 +42,4 @@ where .concat(&nodes) .reduce(|_, s, t| t.push((*s[0].0, 1))) }) -} \ No newline at end of file +} diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index 171563c4e..3464e4daf 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -5,7 +5,7 @@ use std::hash::Hash; use timely::order::Product; use timely::dataflow::*; -use crate::{Collection, ExchangeData}; +use crate::{VecCollection, ExchangeData}; use crate::operators::*; use crate::lattice::Lattice; use crate::operators::iterate::Variable; @@ -20,7 +20,7 @@ use crate::operators::iterate::Variable; /// Goals that cannot reach from the source to the target are relatively expensive, as /// the entire graph must be explored to confirm this. A graph connectivity pre-filter /// could be good insurance here. -pub fn bidijkstra(edges: &Collection, goals: &Collection) -> Collection +pub fn bidijkstra(edges: &VecCollection, goals: &VecCollection) -> VecCollection where G: Scope, N: ExchangeData+Hash, @@ -38,8 +38,8 @@ use crate::operators::arrange::Arranged; pub fn bidijkstra_arranged( forward: &Arranged, reverse: &Arranged, - goals: &Collection -) -> Collection + goals: &VecCollection +) -> VecCollection where G: Scope, N: ExchangeData+Hash, diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 1ba7ad746..4d569a151 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -4,7 +4,7 @@ use std::hash::Hash; use timely::dataflow::*; -use crate::{Collection, ExchangeData}; +use crate::{VecCollection, ExchangeData}; use crate::lattice::Lattice; use crate::difference::{Abelian, Multiply}; use crate::operators::arrange::arrangement::ArrangeByKey; @@ -14,7 +14,7 @@ use crate::operators::arrange::arrangement::ArrangeByKey; /// This algorithm naively propagates all labels at once, much like standard label propagation. /// To more carefully control the label propagation, consider `propagate_core` which supports a /// method to limit the introduction of labels. -pub fn propagate(edges: &Collection, nodes: &Collection) -> Collection +pub fn propagate(edges: &VecCollection, nodes: &VecCollection) -> VecCollection where G: Scope, N: ExchangeData+Hash, @@ -31,7 +31,7 @@ where /// This algorithm naively propagates all labels at once, much like standard label propagation. /// To more carefully control the label propagation, consider `propagate_core` which supports a /// method to limit the introduction of labels. -pub fn propagate_at(edges: &Collection, nodes: &Collection, logic: F) -> Collection +pub fn propagate_at(edges: &VecCollection, nodes: &VecCollection, logic: F) -> VecCollection where G: Scope, N: ExchangeData+Hash, @@ -51,8 +51,8 @@ use crate::operators::arrange::arrangement::Arranged; /// /// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows /// a method `logic` to specify the rounds in which we introduce various labels. The output -/// of `logic should be a number in the interval [0,64], -pub fn propagate_core(edges: &Arranged, nodes: &Collection, logic: F) -> Collection +/// of `logic should be a number in the interval \[0,64\], +pub fn propagate_core(edges: &Arranged, nodes: &VecCollection, logic: F) -> VecCollection where G: Scope, N: ExchangeData+Hash, @@ -96,7 +96,7 @@ where .concat(&nodes) .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8)))); - let propagate: Collection<_, (N, L), R> = + let propagate: VecCollection<_, (N, L), R> = labels .join_core(&edges, |_k, l: &L, d| Some((d.clone(), l.clone()))); diff --git a/differential-dataflow/src/algorithms/graphs/scc.rs b/differential-dataflow/src/algorithms/graphs/scc.rs index 3c784060c..7d570b8c9 100644 --- a/differential-dataflow/src/algorithms/graphs/scc.rs +++ b/differential-dataflow/src/algorithms/graphs/scc.rs @@ -5,7 +5,7 @@ use std::hash::Hash; use timely::dataflow::*; -use crate::{Collection, ExchangeData}; +use crate::{VecCollection, ExchangeData}; use crate::operators::*; use crate::lattice::Lattice; use crate::difference::{Abelian, Multiply}; @@ -13,7 +13,7 @@ use crate::difference::{Abelian, Multiply}; use super::propagate::propagate; /// Iteratively removes nodes with no in-edges. -pub fn trim(graph: &Collection) -> Collection +pub fn trim(graph: &VecCollection) -> VecCollection where G: Scope, N: ExchangeData + Hash, @@ -33,7 +33,7 @@ where } /// Returns the subset of edges in the same strongly connected component. -pub fn strongly_connected(graph: &Collection) -> Collection +pub fn strongly_connected(graph: &VecCollection) -> VecCollection where G: Scope, N: ExchangeData + Hash, @@ -48,8 +48,8 @@ where }) } -fn trim_edges(cycle: &Collection, edges: &Collection) - -> Collection +fn trim_edges(cycle: &VecCollection, edges: &VecCollection) + -> VecCollection where G: Scope, N: ExchangeData + Hash, diff --git a/differential-dataflow/src/algorithms/graphs/sequential.rs b/differential-dataflow/src/algorithms/graphs/sequential.rs index 9ee8d52ad..598fab128 100644 --- a/differential-dataflow/src/algorithms/graphs/sequential.rs +++ b/differential-dataflow/src/algorithms/graphs/sequential.rs @@ -4,12 +4,12 @@ use std::hash::Hash; use timely::dataflow::*; -use crate::{Collection, ExchangeData}; +use crate::{VecCollection, ExchangeData}; use crate::lattice::Lattice; use crate::operators::*; use crate::hashable::Hashable; -fn _color(edges: &Collection) -> Collection)> +fn _color(edges: &VecCollection) -> VecCollection)> where G: Scope, N: ExchangeData+Hash, @@ -40,9 +40,9 @@ where /// fired, and we apply `logic` to the new state of lower neighbors and /// the old state (input) of higher neighbors. pub fn sequence( - state: &Collection, - edges: &Collection, - logic: F) -> Collection)> + state: &VecCollection, + edges: &VecCollection, + logic: F) -> VecCollection)> where G: Scope, N: ExchangeData+Hashable, diff --git a/differential-dataflow/src/algorithms/identifiers.rs b/differential-dataflow/src/algorithms/identifiers.rs index f5f17522e..88ab1fcf7 100644 --- a/differential-dataflow/src/algorithms/identifiers.rs +++ b/differential-dataflow/src/algorithms/identifiers.rs @@ -2,7 +2,7 @@ use timely::dataflow::Scope; -use crate::{Collection, ExchangeData, Hashable}; +use crate::{VecCollection, ExchangeData, Hashable}; use crate::lattice::Lattice; use crate::operators::*; use crate::difference::Abelian; @@ -28,16 +28,16 @@ pub trait Identifiers { /// .assert_empty(); /// }); /// ``` - fn identifiers(&self) -> Collection; + fn identifiers(&self) -> VecCollection; } -impl Identifiers for Collection +impl Identifiers for VecCollection where G: Scope, D: ExchangeData + ::std::hash::Hash, R: ExchangeData + Abelian, { - fn identifiers(&self) -> Collection { + fn identifiers(&self) -> VecCollection { // The design here is that we iteratively develop a collection // of pairs (round, record), where each pair is a proposal that @@ -138,4 +138,4 @@ mod tests { .assert_empty(); }); } -} \ No newline at end of file +} diff --git a/differential-dataflow/src/algorithms/prefix_sum.rs b/differential-dataflow/src/algorithms/prefix_sum.rs index 57f8d1e7b..e479c8229 100644 --- a/differential-dataflow/src/algorithms/prefix_sum.rs +++ b/differential-dataflow/src/algorithms/prefix_sum.rs @@ -2,7 +2,7 @@ use timely::dataflow::Scope; -use crate::{Collection, ExchangeData}; +use crate::{VecCollection, ExchangeData}; use crate::lattice::Lattice; use crate::operators::*; @@ -16,10 +16,10 @@ pub trait PrefixSum { fn prefix_sum(&self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static; /// Determine the prefix sum at each element of `location`. - fn prefix_sum_at(&self, locations: Collection, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static; + fn prefix_sum_at(&self, locations: VecCollection, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static; } -impl PrefixSum for Collection +impl PrefixSum for VecCollection where G: Scope, K: ExchangeData + ::std::hash::Hash, @@ -29,18 +29,18 @@ where self.prefix_sum_at(self.map(|(x,_)| x), zero, combine) } - fn prefix_sum_at(&self, locations: Collection, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static { + fn prefix_sum_at(&self, locations: VecCollection, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static { let combine1 = ::std::rc::Rc::new(combine); let combine2 = combine1.clone(); - let ranges = aggregate(self.clone(), move |k,x,y| (*combine1)(k,x,y)); + let ranges = aggregate(self.clone(), move |k,x,y| (*combine1)(k,x,y)); broadcast(ranges, locations, zero, move |k,x,y| (*combine2)(k,x,y)) } } /// Accumulate data in `collection` into all powers-of-two intervals containing them. -pub fn aggregate(collection: Collection, combine: F) -> Collection +pub fn aggregate(collection: VecCollection, combine: F) -> VecCollection where G: Scope, K: ExchangeData + ::std::hash::Hash, @@ -72,10 +72,10 @@ where /// Produces the accumulated values at each of the `usize` locations in `queries`. pub fn broadcast( - ranges: Collection, - queries: Collection, + ranges: VecCollection, + queries: VecCollection, zero: D, - combine: F) -> Collection + combine: F) -> VecCollection where G: Scope, K: ExchangeData + ::std::hash::Hash, @@ -145,4 +145,4 @@ where .distinct() }) .semijoin(&queries) -} \ No newline at end of file +} diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 8236b7257..2694e39a9 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -22,7 +22,7 @@ use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; use crate::hashable::Hashable; -/// A mutable collection of values of type `D` +/// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers. /// /// The `Collection` type is the core abstraction in differential dataflow programs. As you write your /// differential dataflow computation, you write as if the collection is a static dataset to which you @@ -31,14 +31,23 @@ use crate::hashable::Hashable; /// propagate changes through your functional computation and report the corresponding changes to the /// output collections. /// -/// Each collection has three generic parameters. The parameter `G` is for the scope in which the +/// Each vec collection has three generic parameters. The parameter `G` is for the scope in which the /// collection exists; as you write more complicated programs you may wish to introduce nested scopes /// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D` /// parameter is the type of data in your collection, for example `String`, or `(u32, Vec>)`. /// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and /// defaults to) `isize`, representing changes to the occurrence count of each record. +/// +/// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`. +pub type VecCollection = Collection::Timestamp, R)>>; + +/// An evolving collection represented by a stream of abstract containers. +/// +/// The containers purport to reperesent changes to a collection, and they must implement various traits +/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions +/// on the containers, and streams of containers, are left to the container implementor to describe. #[derive(Clone)] -pub struct Collection::Timestamp, R)>> { +pub struct Collection { /// The underlying timely dataflow stream. /// /// This field is exposed to support direct timely dataflow manipulation when required, but it is @@ -48,11 +57,9 @@ pub struct Collection::Ti /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave /// unexpectedly. pub inner: timely::dataflow::StreamCore, - /// Phantom data for unreferenced `D` and `R` types. - phantom: std::marker::PhantomData<(D, R)>, } -impl Collection { +impl Collection { /// Creates a new Collection from a timely dataflow stream. /// /// This method seems to be rarely used, with the `as_collection` method on streams being a more @@ -62,11 +69,9 @@ impl Collection { /// /// This stream should satisfy the timestamp invariant as documented on [Collection]; this /// method does not check it. - pub fn new(stream: StreamCore) -> Collection { - Collection { inner: stream, phantom: std::marker::PhantomData } - } + pub fn new(stream: StreamCore) -> Self { Self { inner: stream } } } -impl Collection { +impl Collection { /// Creates a new collection accumulating the contents of the two collections. /// /// Despite the name, differential dataflow collections are unordered. This method is so named because the @@ -128,7 +133,7 @@ impl Collection { /// /// This method is a specialization of `enter` to the case where the nested scope is a region. /// It removes the need for an operator that adjusts the timestamp. - pub fn enter_region<'a>(&self, child: &Child<'a, G, ::Timestamp>) -> Collection::Timestamp>, D, R, C> { + pub fn enter_region<'a>(&self, child: &Child<'a, G, ::Timestamp>) -> Collection::Timestamp>, C> { self.inner .enter(child) .as_collection() @@ -204,7 +209,7 @@ impl Collection { /// .assert_eq(&evens); /// }); /// ``` - pub fn negate(&self) -> Collection where C: containers::Negate { + pub fn negate(&self) -> Self where C: containers::Negate { use timely::dataflow::channels::pact::Pipeline; self.inner .unary(Pipeline, "Negate", move |_,_| move |input, output| { @@ -233,7 +238,7 @@ impl Collection { /// data.assert_eq(&result); /// }); /// ``` - pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection, D, R, ::Timestamp, T>>::InnerContainer> + pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection, ::Timestamp, T>>::InnerContainer> where C: containers::Enter<::Timestamp, T, InnerContainer: Container>, T: Refines<::Timestamp>, @@ -280,7 +285,7 @@ impl Collection { } } -impl Collection { +impl VecCollection { /// Creates a new collection by applying the supplied function to each input element. /// /// # Examples @@ -295,7 +300,7 @@ impl Collection { /// .assert_empty(); /// }); /// ``` - pub fn map(&self, mut logic: L) -> Collection + pub fn map(&self, mut logic: L) -> VecCollection where D2: Data, L: FnMut(D) -> D2 + 'static, @@ -322,7 +327,7 @@ impl Collection { /// .assert_empty(); /// }); /// ``` - pub fn map_in_place(&self, mut logic: L) -> Collection + pub fn map_in_place(&self, mut logic: L) -> VecCollection where L: FnMut(&mut D) + 'static, { @@ -346,7 +351,7 @@ impl Collection { /// .flat_map(|x| 0 .. x); /// }); /// ``` - pub fn flat_map(&self, mut logic: L) -> Collection + pub fn flat_map(&self, mut logic: L) -> VecCollection where G::Timestamp: Clone, I: IntoIterator, @@ -370,7 +375,7 @@ impl Collection { /// .assert_empty(); /// }); /// ``` - pub fn filter(&self, mut logic: L) -> Collection + pub fn filter(&self, mut logic: L) -> VecCollection where L: FnMut(&D) -> bool + 'static, { @@ -398,7 +403,7 @@ impl Collection { /// x1.assert_eq(&x2); /// }); /// ``` - pub fn explode(&self, mut logic: L) -> Collection>::Output> + pub fn explode(&self, mut logic: L) -> VecCollection>::Output> where D2: Data, R2: Semigroup+Multiply, @@ -432,7 +437,7 @@ impl Collection { /// ); /// }); /// ``` - pub fn join_function(&self, mut logic: L) -> Collection>::Output> + pub fn join_function(&self, mut logic: L) -> VecCollection>::Output> where G::Timestamp: Lattice, D2: Data, @@ -467,7 +472,7 @@ impl Collection { /// data.assert_eq(&result); /// }); /// ``` - pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection, D, R> + pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> VecCollection, D, R> where T: Timestamp+Hash, F: FnMut(&D) -> T + Clone + 'static, @@ -489,7 +494,7 @@ impl Collection { /// ordered, they should have the same order or compare equal once `func` is applied to them (this /// is because we advance the timely capability with the same logic, and it must remain `less_equal` /// to all of the data timestamps). - pub fn delay(&self, func: F) -> Collection + pub fn delay(&self, func: F) -> VecCollection where F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static, { @@ -525,7 +530,7 @@ impl Collection { /// .inspect(|x| println!("error: {:?}", x)); /// }); /// ``` - pub fn inspect(&self, func: F) -> Collection + pub fn inspect(&self, func: F) -> VecCollection where F: FnMut(&(D, G::Timestamp, R))+'static, { @@ -551,7 +556,7 @@ impl Collection { /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs)); /// }); /// ``` - pub fn inspect_batch(&self, mut func: F) -> Collection + pub fn inspect_batch(&self, mut func: F) -> VecCollection where F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static, { @@ -594,7 +599,7 @@ use timely::dataflow::scopes::ScopeParent; use timely::progress::timestamp::Refines; /// Methods requiring a nested scope. -impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static, C: Container> Collection, D, R, C> +impl<'a, G: Scope, T: Timestamp, C: Container> Collection, C> where C: containers::Leave, T: Refines<::Timestamp>, @@ -619,7 +624,7 @@ where /// data.assert_eq(&result); /// }); /// ``` - pub fn leave(&self) -> Collection>::OuterContainer> { + pub fn leave(&self) -> Collection>::OuterContainer> { use timely::dataflow::channels::pact::Pipeline; self.inner .leave() @@ -631,13 +636,13 @@ where } /// Methods requiring a region as the scope. -impl Collection, D, R, C> +impl Collection, C> { /// Returns the value of a Collection from a nested region to its containing scope. /// /// This method is a specialization of `leave` to the case that of a nested region. /// It removes the need for an operator that adjusts the timestamp. - pub fn leave_region(&self) -> Collection { + pub fn leave_region(&self) -> Collection { self.inner .leave() .as_collection() @@ -645,7 +650,7 @@ impl Collection, D } /// Methods requiring an Abelian difference, to support negation. -impl, D: Clone+'static, R: Abelian+'static> Collection { +impl, D: Clone+'static, R: Abelian+'static> VecCollection { /// Assert if the collections are ever different. /// /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation @@ -682,18 +687,18 @@ impl, D: Clone+'static, R: Abelian+'static> Collection } /// Conversion to a differential dataflow Collection. -pub trait AsCollection { +pub trait AsCollection { /// Converts the type to a differential dataflow collection. - fn as_collection(&self) -> Collection; + fn as_collection(&self) -> Collection; } -impl AsCollection for StreamCore { +impl AsCollection for StreamCore { /// Converts the type to a differential dataflow collection. /// /// By calling this method, you guarantee that the timestamp invariant (as documented on /// [Collection]) is upheld. This method will not check it. - fn as_collection(&self) -> Collection { - Collection::::new(self.clone()) + fn as_collection(&self) -> Collection { + Collection::::new(self.clone()) } } @@ -718,13 +723,11 @@ impl AsCollection for StreamCore { /// .assert_eq(&data); /// }); /// ``` -pub fn concatenate(scope: &mut G, iterator: I) -> Collection +pub fn concatenate(scope: &mut G, iterator: I) -> Collection where G: Scope, - D: Data, - R: Semigroup + 'static, C: Container, - I: IntoIterator>, + I: IntoIterator>, { scope .concatenate(iterator.into_iter().map(|x| x.inner)) diff --git a/differential-dataflow/src/dynamic/mod.rs b/differential-dataflow/src/dynamic/mod.rs index 85636a1bb..95cbc77a5 100644 --- a/differential-dataflow/src/dynamic/mod.rs +++ b/differential-dataflow/src/dynamic/mod.rs @@ -1,15 +1,15 @@ //! Types and operators for dynamically scoped iterative dataflows. -//! +//! //! Scopes in timely dataflow are expressed statically, as part of the type system. //! This affords many efficiencies, as well as type-driven reassurance of correctness. //! However, there are times you need scopes whose organization is discovered only at runtime. //! Naiad and Materialize are examples: the latter taking arbitrary SQL into iterative dataflows. -//! -//! This module provides a timestamp type `Pointstamp` that can represent an update with an +//! +//! This module provides a timestamp type `Pointstamp` that can represent an update with an //! unboundedly long sequence of some `T: Timestamp`, ordered by the product order by which times -//! in iterative dataflows are ordered. The module also provides methods for manipulating these +//! in iterative dataflows are ordered. The module also provides methods for manipulating these //! timestamps to emulate the movement of update streams in to, within, and out of iterative scopes. -//! +//! pub mod pointstamp; @@ -21,12 +21,12 @@ use timely::dataflow::channels::pact::Pipeline; use timely::progress::Antichain; use crate::difference::Semigroup; -use crate::{Collection, Data}; +use crate::{VecCollection, Data}; use crate::collection::AsCollection; use crate::dynamic::pointstamp::PointStamp; use crate::dynamic::pointstamp::PointStampSummary; -impl Collection +impl VecCollection where G: Scope>>, D: Data, @@ -67,7 +67,7 @@ where } /// Produces the summary for a feedback operator at `level`, applying `summary` to that coordinate. -pub fn feedback_summary(level: usize, summary: T::Summary) -> PointStampSummary +pub fn feedback_summary(level: usize, summary: T::Summary) -> PointStampSummary where T: Timestamp+Default, { diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index f39402fc3..ae0f69eb7 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -13,7 +13,7 @@ use timely::dataflow::scopes::ScopeParent; use crate::Data; use crate::difference::Semigroup; -use crate::collection::{Collection, AsCollection}; +use crate::collection::{VecCollection, AsCollection}; /// Create a new collection and input handle to control the collection. pub trait Input : TimelyInput { @@ -41,7 +41,7 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, Collection) + fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, VecCollection) where D: Data, R: Semigroup+'static; /// Create a new collection and input handle from initial data. /// @@ -67,7 +67,7 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, Collection) + fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, VecCollection) where I: IntoIterator + 'static; /// Create a new collection and input handle from initial data. /// @@ -93,24 +93,24 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection_from_raw(&mut self, data: I) -> (InputSession<::Timestamp, D, R>, Collection) + fn new_collection_from_raw(&mut self, data: I) -> (InputSession<::Timestamp, D, R>, VecCollection) where I: IntoIterator::Timestamp,R)>+'static, D: Data, R: Semigroup+'static; } use crate::lattice::Lattice; impl Input for G where ::Timestamp: Lattice { - fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, Collection) + fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, VecCollection) where D: Data, R: Semigroup+'static, { let (handle, stream) = self.new_input(); (InputSession::from(handle), stream.as_collection()) } - fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, Collection) + fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, VecCollection) where I: IntoIterator+'static, I::Item: Data { self.new_collection_from_raw(data.into_iter().map(|d| (d, ::minimum(), 1))) } - fn new_collection_from_raw(&mut self, data: I) -> (InputSession<::Timestamp, D, R>, Collection) + fn new_collection_from_raw(&mut self, data: I) -> (InputSession<::Timestamp, D, R>, VecCollection) where D: Data, R: Semigroup+'static, @@ -198,7 +198,7 @@ impl InputSession { impl InputSession { /// Introduces a handle as collection. - pub fn to_collection(&mut self, scope: &mut G) -> Collection + pub fn to_collection(&mut self, scope: &mut G) -> VecCollection where G: ScopeParent, { diff --git a/differential-dataflow/src/lib.rs b/differential-dataflow/src/lib.rs index 1431634e4..a2a265664 100644 --- a/differential-dataflow/src/lib.rs +++ b/differential-dataflow/src/lib.rs @@ -76,7 +76,7 @@ use std::fmt::Debug; -pub use collection::{Collection, AsCollection}; +pub use collection::{AsCollection, Collection, VecCollection}; pub use hashable::Hashable; pub use difference::Abelian as Diff; diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index f8c58f6e4..e04d6a0f0 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -26,7 +26,7 @@ use timely::progress::Timestamp; use timely::progress::Antichain; use timely::dataflow::operators::Capability; -use crate::{Data, ExchangeData, Collection, AsCollection, Hashable}; +use crate::{Data, ExchangeData, VecCollection, AsCollection, Hashable}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; @@ -136,7 +136,7 @@ where /// The underlying `Stream>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than /// supplied as arguments to an operator using the same key-value structure. - pub fn as_collection(&self, mut logic: L) -> Collection + pub fn as_collection(&self, mut logic: L) -> VecCollection where L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static, { @@ -147,7 +147,7 @@ where /// /// The supplied logic may produce an iterator over output values, allowing either /// filtering or flat mapping as part of the extraction. - pub fn flat_map_ref(&self, logic: L) -> Collection + pub fn flat_map_ref(&self, logic: L) -> VecCollection where I: IntoIterator, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, @@ -162,7 +162,7 @@ where /// /// This method exists for streams of batches without the corresponding arrangement. /// If you have the arrangement, its `flat_map_ref` method is equivalent to this. - pub fn flat_map_batches(stream: &Stream, mut logic: L) -> Collection + pub fn flat_map_batches(stream: &Stream, mut logic: L) -> VecCollection where I: IntoIterator, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, @@ -200,7 +200,7 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of the `JoinCore::join_core` method. - pub fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> + pub fn join_core(&self, other: &Arranged, mut result: L) -> VecCollection>::Output> where T2: for<'a> TraceReader=T1::Key<'a>,Time=T1::Time>+Clone+'static, T1::Diff: Multiply, @@ -215,7 +215,7 @@ where self.join_core_internal_unsafe(other, result) } /// A direct implementation of the `JoinCore::join_core_internal_unsafe` method. - pub fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> Collection + pub fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> VecCollection where T2: for<'a> TraceReader=T1::Key<'a>, Time=T1::Time>+Clone+'static, D: Data, @@ -328,7 +328,7 @@ where ; } -impl Arrange> for Collection +impl Arrange> for VecCollection where G: Scope, K: ExchangeData + Hashable, @@ -505,7 +505,7 @@ where Arranged { stream, trace: reader.unwrap() } } -impl Arrange> for Collection +impl Arrange> for VecCollection where G: Scope, { @@ -540,7 +540,7 @@ where fn arrange_by_key_named(&self, name: &str) -> Arranged>>; } -impl ArrangeByKey for Collection +impl ArrangeByKey for VecCollection where G: Scope, { @@ -574,7 +574,7 @@ where } -impl ArrangeBySelf for Collection +impl ArrangeBySelf for VecCollection where G: Scope, { diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 55e65d232..e5c17e91e 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -119,7 +119,7 @@ use super::TraceAgent; /// Arrange data from a stream of keyed upserts. /// -/// The input should be a stream of timestamped pairs of Key and Option. +/// The input should be a stream of timestamped pairs of `Key` and `Option`. /// The contents of the collection are defined key-by-key, where each optional /// value in sequence either replaces or removes the existing value, should it /// exist. diff --git a/differential-dataflow/src/operators/consolidate.rs b/differential-dataflow/src/operators/consolidate.rs index 00ad3f4b9..ef7adef89 100644 --- a/differential-dataflow/src/operators/consolidate.rs +++ b/differential-dataflow/src/operators/consolidate.rs @@ -8,7 +8,7 @@ use timely::dataflow::Scope; -use crate::{Collection, ExchangeData, Hashable}; +use crate::{VecCollection, ExchangeData, Hashable}; use crate::consolidation::ConsolidatingContainerBuilder; use crate::difference::Semigroup; @@ -17,7 +17,7 @@ use crate::lattice::Lattice; use crate::trace::{Batcher, Builder}; /// Methods which require data be arrangeable. -impl Collection +impl VecCollection where G: Scope, D: ExchangeData+Hashable, diff --git a/differential-dataflow/src/operators/count.rs b/differential-dataflow/src/operators/count.rs index 3380a3028..a554f13b5 100644 --- a/differential-dataflow/src/operators/count.rs +++ b/differential-dataflow/src/operators/count.rs @@ -6,7 +6,7 @@ use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; use crate::lattice::Lattice; -use crate::{ExchangeData, Collection}; +use crate::{ExchangeData, VecCollection}; use crate::difference::{IsZero, Semigroup}; use crate::hashable::Hashable; use crate::collection::AsCollection; @@ -30,7 +30,7 @@ pub trait CountTotal, K: ExchangeDat /// .count_total(); /// }); /// ``` - fn count_total(&self) -> Collection { + fn count_total(&self) -> VecCollection { self.count_total_core() } @@ -39,14 +39,14 @@ pub trait CountTotal, K: ExchangeDat /// This method allows `count_total` to produce collections whose difference /// type is something other than an `isize` integer, for example perhaps an /// `i32`. - fn count_total_core + 'static>(&self) -> Collection; + fn count_total_core + 'static>(&self) -> VecCollection; } -impl CountTotal for Collection +impl CountTotal for VecCollection where G: Scope, { - fn count_total_core + 'static>(&self) -> Collection { + fn count_total_core + 'static>(&self) -> VecCollection { self.arrange_by_self_named("Arrange: CountTotal") .count_total_core() } @@ -63,7 +63,7 @@ where >+Clone+'static, K: ExchangeData, { - fn count_total_core + 'static>(&self) -> Collection { + fn count_total_core + 'static>(&self) -> VecCollection { let mut trace = self.trace.clone(); diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index 57651e68e..5dc11042a 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -42,7 +42,7 @@ use timely::dataflow::scopes::child::Iterative; use timely::dataflow::operators::{Feedback, ConnectLoop}; use timely::dataflow::operators::feedback::Handle; -use crate::{Data, Collection}; +use crate::{Data, VecCollection, Collection}; use crate::difference::{Semigroup, Abelian}; use crate::lattice::Lattice; @@ -71,15 +71,15 @@ pub trait Iterate, D: Data, R: Semigroup> { /// }); /// }); /// ``` - fn iterate(&self, logic: F) -> Collection + fn iterate(&self, logic: F) -> VecCollection where - for<'a> F: FnOnce(&Collection, D, R>)->Collection, D, R>; + for<'a> F: FnOnce(&VecCollection, D, R>)->VecCollection, D, R>; } -impl, D: Ord+Data+Debug, R: Abelian+'static> Iterate for Collection { - fn iterate(&self, logic: F) -> Collection +impl, D: Ord+Data+Debug, R: Abelian+'static> Iterate for VecCollection { + fn iterate(&self, logic: F) -> VecCollection where - for<'a> F: FnOnce(&Collection, D, R>)->Collection, D, R>, + for<'a> F: FnOnce(&VecCollection, D, R>)->VecCollection, D, R>, { self.inner.scope().scoped("Iterate", |subgraph| { // create a new variable, apply logic, bind variable, return. @@ -97,9 +97,9 @@ impl, D: Ord+Data+Debug, R: Abelian+'static> Iterat } impl, D: Ord+Data+Debug, R: Semigroup+'static> Iterate for G { - fn iterate(&self, logic: F) -> Collection + fn iterate(&self, logic: F) -> VecCollection where - for<'a> F: FnOnce(&Collection, D, R>)->Collection, D, R>, + for<'a> F: FnOnce(&VecCollection, D, R>)->VecCollection, D, R>, { // TODO: This makes me think we have the wrong ownership pattern here. let mut clone = self.clone(); @@ -151,20 +151,21 @@ impl, D: Ord+Data+Debug, R: Semigroup+'static> Iter /// }); /// }) /// ``` -pub struct Variable::Timestamp, R)>> +pub struct Variable where G: Scope, - D: Data, - R: Abelian + 'static, C: Container, { - collection: Collection, + collection: Collection, feedback: Handle, - source: Option>, + source: Option>, step: ::Summary, } -impl Variable +/// A `Variable` specialized to a vector container of update triples (data, time, diff). +pub type VecVariable = Variable::Timestamp, R)>>; + +impl Variable where G: Scope, C: crate::collection::containers::Negate + crate::collection::containers::ResultsIn<::Summary>, @@ -175,14 +176,14 @@ where /// be used whenever the variable has an empty input. pub fn new(scope: &mut G, step: ::Summary) -> Self { let (feedback, updates) = scope.feedback(step.clone()); - let collection = Collection::::new(updates); + let collection = Collection::::new(updates); Self { collection, feedback, source: None, step } } /// Creates a new `Variable` from a supplied `source` stream. - pub fn new_from(source: Collection, step: ::Summary) -> Self { + pub fn new_from(source: Collection, step: ::Summary) -> Self { let (feedback, updates) = source.inner.scope().feedback(step.clone()); - let collection = Collection::::new(updates).concat(&source); + let collection = Collection::::new(updates).concat(&source); Variable { collection, feedback, source: Some(source), step } } @@ -190,7 +191,7 @@ where /// /// This method binds the `Variable` to be equal to the supplied collection, /// which may be recursively defined in terms of the variable itself. - pub fn set(self, result: &Collection) -> Collection { + pub fn set(self, result: &Collection) -> Collection { let mut in_result = result.clone(); if let Some(source) = &self.source { in_result = in_result.concat(&source.negate()); @@ -207,7 +208,7 @@ where /// /// This behavior can also be achieved by using `new` to create an empty initial /// collection, and then using `self.set(self.concat(result))`. - pub fn set_concat(self, result: &Collection) -> Collection { + pub fn set_concat(self, result: &Collection) -> Collection { let step = self.step; result .results_in(step) @@ -218,8 +219,8 @@ where } } -impl, D: Data, R: Abelian, C: Container> Deref for Variable { - type Target = Collection; +impl, C: Container> Deref for Variable { + type Target = Collection; fn deref(&self) -> &Self::Target { &self.collection } @@ -231,19 +232,17 @@ impl, D: Data, R: Abelian, C: Container> Deref for /// that do not implement `Abelian` and only implement `Semigroup`. This means /// that it can be used in settings where the difference type does not support /// negation. -pub struct SemigroupVariable::Timestamp, R)>> +pub struct SemigroupVariable where G: Scope, - D: Data, - R: Semigroup + 'static, C: Container, { - collection: Collection, + collection: Collection, feedback: Handle, step: ::Summary, } -impl SemigroupVariable +impl SemigroupVariable where G: Scope, C: crate::collection::containers::ResultsIn<::Summary>, @@ -251,12 +250,12 @@ where /// Creates a new initially empty `SemigroupVariable`. pub fn new(scope: &mut G, step: ::Summary) -> Self { let (feedback, updates) = scope.feedback(step.clone()); - let collection = Collection::::new(updates); + let collection = Collection::::new(updates); SemigroupVariable { collection, feedback, step } } /// Adds a new source of data to `self`. - pub fn set(self, result: &Collection) -> Collection { + pub fn set(self, result: &Collection) -> Collection { let step = self.step; result .results_in(step) @@ -267,8 +266,8 @@ where } } -impl Deref for SemigroupVariable where G::Timestamp: Lattice { - type Target = Collection; +impl Deref for SemigroupVariable where G::Timestamp: Lattice { + type Target = Collection; fn deref(&self) -> &Self::Target { &self.collection } diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 6392ea868..0a72976c9 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -18,7 +18,7 @@ use timely::dataflow::operators::Capability; use timely::dataflow::channels::pushers::tee::Tee; use crate::hashable::Hashable; -use crate::{Data, ExchangeData, Collection}; +use crate::{Data, ExchangeData, VecCollection}; use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf}; @@ -50,7 +50,7 @@ pub trait Join { /// .assert_eq(&z); /// }); /// ``` - fn join(&self, other: &Collection) -> Collection>::Output> + fn join(&self, other: &VecCollection) -> VecCollection>::Output> where K: ExchangeData, V2: ExchangeData, @@ -78,7 +78,7 @@ pub trait Join { /// .assert_eq(&z); /// }); /// ``` - fn join_map(&self, other: &Collection, logic: L) -> Collection>::Output> + fn join_map(&self, other: &VecCollection, logic: L) -> VecCollection>::Output> where K: ExchangeData, V2: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply, D: Data, L: FnMut(&K, &V, &V2)->D+'static; /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied. @@ -103,7 +103,7 @@ pub trait Join { /// .assert_eq(&z); /// }); /// ``` - fn semijoin(&self, other: &Collection) -> Collection>::Output> + fn semijoin(&self, other: &VecCollection) -> VecCollection>::Output> where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply; /// Subtracts the semijoin with `other` from `self`. @@ -132,32 +132,32 @@ pub trait Join { /// .assert_eq(&z); /// }); /// ``` - fn antijoin(&self, other: &Collection) -> Collection + fn antijoin(&self, other: &VecCollection) -> VecCollection where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply, R: Abelian+'static; } -impl Join for Collection +impl Join for VecCollection where G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup, { - fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> + fn join_map(&self, other: &VecCollection, mut logic: L) -> VecCollection>::Output> where R: Multiply, L: FnMut(&K, &V, &V2)->D+'static { let arranged1 = self.arrange_by_key(); let arranged2 = other.arrange_by_key(); arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) } - fn semijoin(&self, other: &Collection) -> Collection>::Output> + fn semijoin(&self, other: &VecCollection) -> VecCollection>::Output> where R: Multiply { let arranged1 = self.arrange_by_key(); let arranged2 = other.arrange_by_self(); arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) } - fn antijoin(&self, other: &Collection) -> Collection + fn antijoin(&self, other: &VecCollection) -> VecCollection where R: Multiply, R: Abelian+'static { self.concat(&self.semijoin(other).negate()) } @@ -170,7 +170,7 @@ where K: ExchangeData+Hashable, V: Data + 'static, { - fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> + fn join_map(&self, other: &VecCollection, mut logic: L) -> VecCollection>::Output> where Tr::Diff: Multiply, L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static, @@ -179,13 +179,13 @@ where self.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) } - fn semijoin(&self, other: &Collection) -> Collection>::Output> + fn semijoin(&self, other: &VecCollection) -> VecCollection>::Output> where Tr::Diff: Multiply { let arranged2 = other.arrange_by_self(); self.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) } - fn antijoin(&self, other: &Collection) -> Collection + fn antijoin(&self, other: &VecCollection) -> VecCollection where Tr::Diff: Multiply, Tr::Diff: Abelian+'static { self.as_collection(|k,v| (k.clone(), v.clone())) .concat(&self.semijoin(other).negate()) @@ -229,7 +229,7 @@ pub trait JoinCore, K: 'static + ?Sized, V: 'st /// .assert_eq(&z); /// }); /// ``` - fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> + fn join_core (&self, stream2: &Arranged, result: L) -> VecCollection>::Output> where Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, R: Multiply, @@ -270,7 +270,7 @@ pub trait JoinCore, K: 'static + ?Sized, V: 'st /// .assert_eq(&z); /// }); /// ``` - fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection + fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> VecCollection where Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, D: Data, @@ -281,14 +281,14 @@ pub trait JoinCore, K: 'static + ?Sized, V: 'st } -impl JoinCore for Collection +impl JoinCore for VecCollection where G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup, { - fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> + fn join_core (&self, stream2: &Arranged, result: L) -> VecCollection>::Output> where Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, R: Multiply, @@ -299,7 +299,7 @@ where .join_core(stream2, result) } - fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection + fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> VecCollection where Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, I: IntoIterator, diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index b573ac8f6..1079e14d8 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -7,7 +7,7 @@ use timely::container::PushInto; use crate::hashable::Hashable; -use crate::{Data, ExchangeData, Collection}; +use crate::{Data, ExchangeData, VecCollection}; use crate::difference::{Semigroup, Abelian}; use timely::order::PartialOrder; @@ -57,24 +57,24 @@ pub trait Reduce, K: Data, V: Data, R: Semigrou /// }); /// }); /// ``` - fn reduce(&self, logic: L) -> Collection + fn reduce(&self, logic: L) -> VecCollection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { self.reduce_named("Reduce", logic) } /// As `reduce` with the ability to name the operator. - fn reduce_named(&self, name: &str, logic: L) -> Collection + fn reduce_named(&self, name: &str, logic: L) -> VecCollection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static; } -impl Reduce for Collection +impl Reduce for VecCollection where G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup, { - fn reduce_named(&self, name: &str, logic: L) -> Collection + fn reduce_named(&self, name: &str, logic: L) -> VecCollection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { self.arrange_by_key_named(&format!("Arrange: {}", name)) .reduce_named(name, logic) @@ -86,7 +86,7 @@ where G: Scope, T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a V, Diff=R>+Clone+'static, { - fn reduce_named(&self, name: &str, logic: L) -> Collection + fn reduce_named(&self, name: &str, logic: L) -> VecCollection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { self.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>(name, logic) .as_collection(|k,v| (k.clone(), v.clone())) @@ -114,12 +114,12 @@ pub trait Threshold, K: Data, R1: Semigroup> { /// .threshold(|_,c| c % 2); /// }); /// ``` - fn thresholdR2+'static>(&self, thresh: F) -> Collection { + fn thresholdR2+'static>(&self, thresh: F) -> VecCollection { self.threshold_named("Threshold", thresh) } /// A `threshold` with the ability to name the operator. - fn threshold_namedR2+'static>(&self, name: &str, thresh: F) -> Collection; + fn threshold_namedR2+'static>(&self, name: &str, thresh: F) -> VecCollection; /// Reduces the collection to one occurrence of each distinct element. /// @@ -136,7 +136,7 @@ pub trait Threshold, K: Data, R1: Semigroup> { /// .distinct(); /// }); /// ``` - fn distinct(&self) -> Collection { + fn distinct(&self) -> VecCollection { self.distinct_core() } @@ -145,13 +145,13 @@ pub trait Threshold, K: Data, R1: Semigroup> { /// This method allows `distinct` to produce collections whose difference /// type is something other than an `isize` integer, for example perhaps an /// `i32`. - fn distinct_core>(&self) -> Collection { + fn distinct_core>(&self) -> VecCollection { self.threshold_named("Distinct", |_,_| R2::from(1i8)) } } -impl, K: ExchangeData+Hashable, R1: ExchangeData+Semigroup> Threshold for Collection { - fn threshold_namedR2+'static>(&self, name: &str, thresh: F) -> Collection { +impl, K: ExchangeData+Hashable, R1: ExchangeData+Semigroup> Threshold for VecCollection { + fn threshold_namedR2+'static>(&self, name: &str, thresh: F) -> VecCollection { self.arrange_by_self_named(&format!("Arrange: {}", name)) .threshold_named(name, thresh) } @@ -162,7 +162,7 @@ where G: Scope, T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R1>+Clone+'static, { - fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { + fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> VecCollection { self.reduce_abelian::<_,KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) .as_collection(|k,_| k.clone()) } @@ -185,7 +185,7 @@ pub trait Count, K: Data, R: Semigroup> { /// .count(); /// }); /// ``` - fn count(&self) -> Collection { + fn count(&self) -> VecCollection { self.count_core() } @@ -194,11 +194,11 @@ pub trait Count, K: Data, R: Semigroup> { /// This method allows `count` to produce collections whose difference /// type is something other than an `isize` integer, for example perhaps an /// `i32`. - fn count_core + 'static>(&self) -> Collection; + fn count_core + 'static>(&self) -> VecCollection; } -impl, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Count for Collection { - fn count_core + 'static>(&self) -> Collection { +impl, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Count for VecCollection { + fn count_core + 'static>(&self) -> VecCollection { self.arrange_by_self_named("Arrange: Count") .count_core() } @@ -209,7 +209,7 @@ where G: Scope, T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R>+Clone+'static, { - fn count_core + 'static>(&self) -> Collection { + fn count_core + 'static>(&self) -> VecCollection { self.reduce_abelian::<_,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) .as_collection(|k,c| (k.clone(), c.clone())) } @@ -281,7 +281,7 @@ pub trait ReduceCore, K: ToOwned + ?Sized, V: D ; } -impl ReduceCore for Collection +impl ReduceCore for VecCollection where G: Scope, G::Timestamp: Lattice+Ord, diff --git a/differential-dataflow/src/operators/threshold.rs b/differential-dataflow/src/operators/threshold.rs index df4f110cc..74a73bba5 100644 --- a/differential-dataflow/src/operators/threshold.rs +++ b/differential-dataflow/src/operators/threshold.rs @@ -9,7 +9,7 @@ use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; use crate::lattice::Lattice; -use crate::{ExchangeData, Collection}; +use crate::{ExchangeData, VecCollection}; use crate::difference::{Semigroup, Abelian}; use crate::hashable::Hashable; use crate::collection::AsCollection; @@ -19,7 +19,7 @@ use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `distinct` differential dataflow method. pub trait ThresholdTotal, K: ExchangeData, R: ExchangeData+Semigroup> { /// Reduces the collection to one occurrence of each distinct element. - fn threshold_semigroup(&self, thresh: F) -> Collection + fn threshold_semigroup(&self, thresh: F) -> VecCollection where R2: Semigroup+'static, F: FnMut(&K,&R,Option<&R>)->Option+'static, @@ -39,7 +39,7 @@ pub trait ThresholdTotal, K: Exchang /// .threshold_total(|_,c| c % 2); /// }); /// ``` - fn threshold_totalR2+'static>(&self, mut thresh: F) -> Collection { + fn threshold_totalR2+'static>(&self, mut thresh: F) -> VecCollection { self.threshold_semigroup(move |key, new, old| { let mut new = thresh(key, new); if let Some(old) = old { @@ -69,7 +69,7 @@ pub trait ThresholdTotal, K: Exchang /// .distinct_total(); /// }); /// ``` - fn distinct_total(&self) -> Collection { + fn distinct_total(&self) -> VecCollection { self.distinct_total_core() } @@ -78,17 +78,17 @@ pub trait ThresholdTotal, K: Exchang /// This method allows `distinct` to produce collections whose difference /// type is something other than an `isize` integer, for example perhaps an /// `i32`. - fn distinct_total_core+'static>(&self) -> Collection { + fn distinct_total_core+'static>(&self) -> VecCollection { self.threshold_total(|_,_| R2::from(1i8)) } } -impl ThresholdTotal for Collection +impl ThresholdTotal for VecCollection where G: Scope, { - fn threshold_semigroup(&self, thresh: F) -> Collection + fn threshold_semigroup(&self, thresh: F) -> VecCollection where R2: Semigroup+'static, F: FnMut(&K,&R,Option<&R>)->Option+'static, @@ -109,7 +109,7 @@ where >+Clone+'static, K: ExchangeData, { - fn threshold_semigroup(&self, mut thresh: F) -> Collection + fn threshold_semigroup(&self, mut thresh: F) -> VecCollection where R2: Semigroup+'static, F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option+'static, diff --git a/differential-dataflow/src/trace/implementations/chainless_batcher.rs b/differential-dataflow/src/trace/implementations/chainless_batcher.rs index 2129c2c84..b5df2dae1 100644 --- a/differential-dataflow/src/trace/implementations/chainless_batcher.rs +++ b/differential-dataflow/src/trace/implementations/chainless_batcher.rs @@ -1,14 +1,4 @@ //! A `Batcher` implementation based on merge sort. -//! -//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger". -//! The chunker receives input batches and consolidates them, producing sorted output -//! "chunks" that are fully consolidated (no adjacent updates can be accumulated). -//! The merger implements the [`Merger`] trait, and provides hooks for manipulating -//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also -//! splitting them apart based on time. -//! -//! Implementations of `MergeBatcher` can be instantiated through the choice of both -//! the chunker and the merger, provided their respective output and input types align. use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index ac3c05720..3ef79f2df 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -292,7 +292,7 @@ pub mod container { /// A merger for arbitrary containers. /// - /// `MC` is a [`Container`] that implements [`MergerChunk`]. + /// `MC` is a `Container` that implements [`MergerChunk`]. /// `CQ` is a [`ContainerQueue`] supporting `MC`. pub struct ContainerMerger { _marker: PhantomData<(MC, CQ)>, diff --git a/differential-dataflow/tests/bfs.rs b/differential-dataflow/tests/bfs.rs index 509564a35..3678823b0 100644 --- a/differential-dataflow/tests/bfs.rs +++ b/differential-dataflow/tests/bfs.rs @@ -9,7 +9,7 @@ use timely::dataflow::operators::Capture; use timely::dataflow::operators::capture::Extract; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; @@ -202,7 +202,7 @@ fn bfs_differential( } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection +fn bfs(edges: &VecCollection, roots: &VecCollection) -> VecCollection where G: Scope, { diff --git a/differential-dataflow/tests/scc.rs b/differential-dataflow/tests/scc.rs index 5aeba6a72..a0c4fee7d 100644 --- a/differential-dataflow/tests/scc.rs +++ b/differential-dataflow/tests/scc.rs @@ -12,7 +12,7 @@ use timely::dataflow::operators::Capture; use timely::dataflow::operators::capture::Extract; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; @@ -215,7 +215,7 @@ fn scc_differential( .collect() } -fn _strongly_connected(graph: &Collection) -> Collection +fn _strongly_connected(graph: &VecCollection) -> VecCollection where G: Scope, { @@ -226,7 +226,7 @@ where }) } -fn _trim_edges(cycle: &Collection, edges: &Collection) -> Collection +fn _trim_edges(cycle: &VecCollection, edges: &VecCollection) -> VecCollection where G: Scope, { @@ -243,7 +243,7 @@ where .map(|((x1,x2),_)| (x2,x1)) } -fn _reachability(edges: &Collection, nodes: &Collection) -> Collection +fn _reachability(edges: &VecCollection, nodes: &VecCollection) -> VecCollection where G: Scope, { diff --git a/dogsdogsdogs/examples/ngo.rs b/dogsdogsdogs/examples/ngo.rs index ccec6040d..d5291b678 100644 --- a/dogsdogsdogs/examples/ngo.rs +++ b/dogsdogsdogs/examples/ngo.rs @@ -2,7 +2,7 @@ use std::hash::Hash; use timely::dataflow::*; use timely::dataflow::operators::*; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::*; @@ -35,13 +35,13 @@ fn main() { println!("loaded {} nodes, {} edges", nodes, edges.len()); worker.dataflow::<(),_,_>(|scope| { - triangles(&Collection::new(edges.to_stream(scope))).inner.count().inspect(|x| println!("{:?}", x)); + triangles(&VecCollection::new(edges.to_stream(scope))).inner.count().inspect(|x| println!("{:?}", x)); }); }).unwrap(); } -fn triangles(edges: &Collection) -> Collection +fn triangles(edges: &VecCollection) -> VecCollection where G: Scope, { diff --git a/dogsdogsdogs/src/calculus.rs b/dogsdogsdogs/src/calculus.rs index 952a1b0d2..babd9e81a 100644 --- a/dogsdogsdogs/src/calculus.rs +++ b/dogsdogsdogs/src/calculus.rs @@ -15,29 +15,29 @@ use timely::dataflow::Scope; use timely::dataflow::scopes::Child; use timely::dataflow::operators::{Filter, Map}; -use differential_dataflow::{AsCollection, Collection, Data}; +use differential_dataflow::{AsCollection, VecCollection, Data}; use differential_dataflow::difference::Abelian; use crate::altneu::AltNeu; /// Produce a collection containing the changes at the moments they happen. pub trait Differentiate { - fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu>) -> Collection>, D, R>; + fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu>) -> VecCollection>, D, R>; } /// Collect instantaneous changes back in to a collection. pub trait Integrate { - fn integrate(&self) -> Collection; + fn integrate(&self) -> VecCollection; } -impl Differentiate for Collection +impl Differentiate for VecCollection where G: Scope, D: Data, R: Abelian + 'static, { // For each (data, Alt(time), diff) we add a (data, Neu(time), -diff). - fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu>) -> Collection>, D, R> { + fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu>) -> VecCollection>, D, R> { self.enter(child) .inner .flat_map(|(data, time, diff)| { @@ -51,14 +51,14 @@ where } } -impl<'a, G, D, R> Integrate for Collection>, D, R> +impl<'a, G, D, R> Integrate for VecCollection>, D, R> where G: Scope, D: Data, R: Abelian + 'static, { // We discard each `neu` variant and strip off the `alt` wrapper. - fn integrate(&self) -> Collection { + fn integrate(&self) -> VecCollection { self.inner .filter(|(_d,t,_r)| !t.neu) .as_collection() diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index c0b93b099..4984d8f2a 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -5,7 +5,7 @@ use timely::progress::Timestamp; use timely::dataflow::operators::Partition; use timely::dataflow::operators::Concatenate; -use differential_dataflow::{ExchangeData, Collection, AsCollection}; +use differential_dataflow::{ExchangeData, VecCollection, AsCollection}; use differential_dataflow::operators::Threshold; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; @@ -28,31 +28,31 @@ pub trait PrefixExtender> { /// The type to be produced as extension. type Extension; /// Annotates prefixes with the number of extensions the relation would propose. - fn count(&mut self, prefixes: &Collection, index: usize) -> Collection; + fn count(&mut self, prefixes: &VecCollection, index: usize) -> VecCollection; /// Extends each prefix with corresponding extensions. - fn propose(&mut self, prefixes: &Collection) -> Collection; + fn propose(&mut self, prefixes: &VecCollection) -> VecCollection; /// Restricts proposed extensions by those the extender would have proposed. - fn validate(&mut self, extensions: &Collection) -> Collection; + fn validate(&mut self, extensions: &VecCollection) -> VecCollection; } pub trait ProposeExtensionMethod> { - fn propose_using>(&self, extender: &mut PE) -> Collection; - fn extend(&self, extenders: &mut [&mut dyn PrefixExtender]) -> Collection; + fn propose_using>(&self, extender: &mut PE) -> VecCollection; + fn extend(&self, extenders: &mut [&mut dyn PrefixExtender]) -> VecCollection; } -impl ProposeExtensionMethod for Collection +impl ProposeExtensionMethod for VecCollection where G: Scope, P: ExchangeData+Ord, R: Monoid+Multiply+'static, { - fn propose_using(&self, extender: &mut PE) -> Collection + fn propose_using(&self, extender: &mut PE) -> VecCollection where PE: PrefixExtender { extender.propose(self) } - fn extend(&self, extenders: &mut [&mut dyn PrefixExtender]) -> Collection + fn extend(&self, extenders: &mut [&mut dyn PrefixExtender]) -> VecCollection where E: ExchangeData+Ord { @@ -84,11 +84,11 @@ where } pub trait ValidateExtensionMethod, P, E> { - fn validate_using>(&self, extender: &mut PE) -> Collection; + fn validate_using>(&self, extender: &mut PE) -> VecCollection; } -impl, P, E> ValidateExtensionMethod for Collection { - fn validate_using>(&self, extender: &mut PE) -> Collection { +impl, P, E> ValidateExtensionMethod for VecCollection { + fn validate_using>(&self, extender: &mut PE) -> VecCollection { extender.validate(self) } } @@ -139,7 +139,7 @@ where R: Monoid+Multiply+ExchangeData, { - pub fn index>(collection: &Collection) -> Self { + pub fn index>(collection: &VecCollection) -> Self { // We need to count the number of (k, v) pairs and not rely on the given Monoid R and its binary addition operation. // counts and validate can share the base arrangement let arranged = collection.arrange_by_self(); @@ -191,17 +191,17 @@ where type Prefix = P; type Extension = V; - fn count(&mut self, prefixes: &Collection, index: usize) -> Collection { + fn count(&mut self, prefixes: &VecCollection, index: usize) -> VecCollection { let counts = self.indices.count_trace.import(&prefixes.scope()); operators::count::count(prefixes, counts, self.key_selector.clone(), index) } - fn propose(&mut self, prefixes: &Collection) -> Collection { + fn propose(&mut self, prefixes: &VecCollection) -> VecCollection { let propose = self.indices.propose_trace.import(&prefixes.scope()); operators::propose::propose(prefixes, propose, self.key_selector.clone()) } - fn validate(&mut self, extensions: &Collection) -> Collection { + fn validate(&mut self, extensions: &VecCollection) -> VecCollection { let validate = self.indices.validate_trace.import(&extensions.scope()); operators::validate::validate(extensions, validate, self.key_selector.clone()) } diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index 2633d0b62..2772e241b 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -1,6 +1,6 @@ use timely::dataflow::Scope; -use differential_dataflow::{ExchangeData, Collection, Hashable}; +use differential_dataflow::{ExchangeData, VecCollection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; @@ -12,11 +12,11 @@ use differential_dataflow::trace::TraceReader; /// associated count in `arrangement`. If the found count is less than `count`, /// the `count` and `index` fields are overwritten with their new values. pub fn count( - prefixes: &Collection, + prefixes: &VecCollection, arrangement: Arranged, key_selector: F, index: usize, -) -> Collection +) -> VecCollection where G: Scope, Tr: TraceReader+Clone+'static, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index d7d9d68d6..adeb0ca4f 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -44,7 +44,7 @@ use timely::dataflow::operators::Operator; use timely::progress::Antichain; use timely::progress::frontier::AntichainRef; -use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; +use differential_dataflow::{ExchangeData, VecCollection, AsCollection, Hashable}; use differential_dataflow::difference::{Monoid, Semigroup}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; @@ -75,12 +75,12 @@ use differential_dataflow::trace::implementations::BatchContainer; /// once out of the "delta flow region", the updates will be `delay`d to the /// times specified in the payloads. pub fn half_join( - stream: &Collection, + stream: &VecCollection, arrangement: Arranged, frontier_func: FF, comparison: CF, mut output_func: S, -) -> Collection>::Output> +) -> VecCollection>::Output> where G: Scope, K: Hashable + ExchangeData, @@ -144,7 +144,7 @@ type SessionFor<'a, G, CB> = /// records. Note this is not the number of *output* records, owing mainly to /// the number of matched records being easiest to record with low overhead. pub fn half_join_internal_unsafe( - stream: &Collection, + stream: &VecCollection, mut arrangement: Arranged, frontier_func: FF, comparison: CF, diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 6eb7582eb..c2bf040cd 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -6,7 +6,7 @@ use timely::dataflow::channels::pact::{Pipeline, Exchange}; use timely::dataflow::operators::Operator; use timely::progress::Antichain; -use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; +use differential_dataflow::{ExchangeData, VecCollection, AsCollection, Hashable}; use differential_dataflow::difference::{IsZero, Semigroup, Monoid}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{Cursor, TraceReader}; @@ -18,14 +18,14 @@ use differential_dataflow::trace::implementations::BatchContainer; /// key with `key_selector` and then proposes all pair af the prefix /// and values associated with the key in `arrangement`. pub fn lookup_map( - prefixes: &Collection, + prefixes: &VecCollection, mut arrangement: Arranged, key_selector: F, mut output_func: S, supplied_key0: K, supplied_key1: K, supplied_key2: K, -) -> Collection +) -> VecCollection where G: Scope, Tr: for<'a> TraceReader< diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index b519ecf47..feeb2eb44 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -1,6 +1,6 @@ use timely::dataflow::Scope; -use differential_dataflow::{ExchangeData, Collection, Hashable}; +use differential_dataflow::{ExchangeData, VecCollection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; @@ -14,10 +14,10 @@ use differential_dataflow::trace::TraceReader; /// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case /// of delta queries. pub fn propose( - prefixes: &Collection, + prefixes: &VecCollection, arrangement: Arranged, key_selector: F, -) -> Collection +) -> VecCollection where G: Scope, Tr: for<'a> TraceReader< @@ -47,10 +47,10 @@ where /// prefixes by the number of matches in `arrangement`. This can be useful to /// avoid the need to prepare an arrangement of distinct extensions. pub fn propose_distinct( - prefixes: &Collection, + prefixes: &VecCollection, arrangement: Arranged, key_selector: F, -) -> Collection +) -> VecCollection where G: Scope, Tr: for<'a> TraceReader< diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 9c111d252..3fd3b3b69 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -2,7 +2,7 @@ use std::hash::Hash; use timely::dataflow::Scope; -use differential_dataflow::{ExchangeData, Collection}; +use differential_dataflow::{ExchangeData, VecCollection}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; @@ -13,10 +13,10 @@ use differential_dataflow::trace::TraceReader; /// key with `key_selector` and then proposes all pair af the prefix /// and values associated with the key in `arrangement`. pub fn validate( - extensions: &Collection, + extensions: &VecCollection, arrangement: Arranged, key_selector: F, -) -> Collection +) -> VecCollection where G: Scope, Tr: for<'a> TraceReader< diff --git a/doop/src/main.rs b/doop/src/main.rs index c0f0f03df..a64a71e7c 100644 --- a/doop/src/main.rs +++ b/doop/src/main.rs @@ -7,11 +7,11 @@ use std::cell::RefCell; use timely::dataflow::{Scope, ProbeHandle}; use timely::dataflow::scopes::child::Iterative as Child; -use differential_dataflow::{AsCollection, Collection, Hashable}; +use differential_dataflow::{AsCollection, VecCollection, Hashable}; use differential_dataflow::ExchangeData as Data; use differential_dataflow::lattice::Lattice; use differential_dataflow::input::Input; -use differential_dataflow::operators::iterate::Variable; +use differential_dataflow::operators::iterate::VecVariable; use differential_dataflow::operators::{Threshold, Join, JoinCore}; use differential_dataflow::operators::arrange::ArrangeByKey; @@ -56,8 +56,8 @@ type MethodInvocation = Instruction; /// Set-valued collection. pub struct Relation<'a, G: Scope, D: Data+Hashable> where G::Timestamp : Lattice { - variable: Variable, D, Diff>, - current: Collection, D, Diff>, + variable: VecVariable, D, Diff>, + current: VecCollection, D, Diff>, } impl<'a, G: Scope, D: Data+Hashable> Relation<'a, G, D> where G::Timestamp : Lattice { @@ -66,16 +66,16 @@ impl<'a, G: Scope, D: Data+Hashable> Relation<'a, G, D> where G::Timestamp : Lat Self::new_from(&::timely::dataflow::operators::generic::operator::empty(scope).as_collection()) } /// Creates a new variable initialized with `source`. - pub fn new_from(source: &Collection, D, Diff>) -> Self { + pub fn new_from(source: &VecCollection, D, Diff>) -> Self { use ::timely::order::Product; - let variable = Variable::new_from(source.clone(), Product::new(Default::default(), 1)); + let variable = VecVariable::new_from(source.clone(), Product::new(Default::default(), 1)); Relation { variable: variable, current: source.clone(), } } /// Concatenates `production` into the definition of the variable. - pub fn add_production(&mut self, production: &Collection, D, Diff>) { + pub fn add_production(&mut self, production: &VecCollection, D, Diff>) { self.current = self.current.concat(production); } /// Finalizes the variable, connecting its recursive definition. @@ -89,8 +89,8 @@ impl<'a, G: Scope, D: Data+Hashable> Relation<'a, G, D> where G::Timestamp : Lat impl<'a, G: Scope, D: Data+Hashable> ::std::ops::Deref for Relation<'a, G, D> where G::Timestamp : Lattice { - type Target = Collection, D, Diff>; - fn deref(&self) -> &Collection, D, Diff> { + type Target = VecCollection, D, Diff>; + fn deref(&self) -> &VecCollection, D, Diff> { &self.variable } } @@ -283,7 +283,7 @@ fn main() { let (input1, ClassType) = scope.new_collection_from_raw(load1(index, &prefix, "ClassType.facts", interner.clone())); inputs.0.push(input1); let (input2, ArrayType) = scope.new_collection_from_raw(load1(index, &prefix, "ArrayType.facts", interner.clone())); inputs.0.push(input2); let (input3, InterfaceType) = scope.new_collection_from_raw(load1(index, &prefix, "InterfaceType.facts", interner.clone())); inputs.0.push(input3); - // let Var_DeclaringMethod: Collection<_,(Symbol, Symbol)> = scope.new_collection_from_raw(load2(index, &prefix, "Var-DeclaringMethod.facts", interner.clone())).1; + // let Var_DeclaringMethod: VecCollection<_,(Symbol, Symbol)> = scope.new_collection_from_raw(load2(index, &prefix, "Var-DeclaringMethod.facts", interner.clone())).1; let (input4, ApplicationClass) = scope.new_collection_from_raw(load1(index, &prefix, "ApplicationClass.facts", interner.clone())); inputs.0.push(input4); let (input5, ThisVar) = scope.new_collection_from_raw(load2(index, &prefix, "ThisVar.facts", interner.clone())); inputs.1.push(input5); @@ -317,14 +317,14 @@ fn main() { let (input31, ActualParam) = scope.new_collection_from_raw(load3(index, &prefix, "ActualParam.facts", interner.clone())); inputs.2.push(input31); // Main schema - let isType: Collection<_,Type> = + let isType: VecCollection<_,Type> = ClassType .concat(&ArrayType) .concat(&InterfaceType) .concat(&ApplicationClass) .concat(&_NormalHeap.map(|(_id,ty)| ty)); - let isReferenceType: Collection<_,ReferenceType> = + let isReferenceType: VecCollection<_,ReferenceType> = ClassType .concat(&ArrayType) .concat(&InterfaceType) @@ -350,11 +350,11 @@ fn main() { .concat(&scope.new_collection_from_raw(Some(((temp2.clone(), temp1b.clone()), 0, 1))).1); // NOTE: Unused - // let MainMethodArgArray: Collection<_,HeapAllocation> = scope.new_collection_from_raw(Some(temp3.clone())).1; + // let MainMethodArgArray: VecCollection<_,HeapAllocation> = scope.new_collection_from_raw(Some(temp3.clone())).1; // NOTE: Unused - // let MainMethodArgArrayContent: Collection<_,HeapAllocation> = scope.new_collection_from_raw(Some(temp2.clone())).1; + // let MainMethodArgArrayContent: VecCollection<_,HeapAllocation> = scope.new_collection_from_raw(Some(temp2.clone())).1; - let Instruction_Method = //: Collection<_,(Instruction, Method)> = + let Instruction_Method = //: VecCollection<_,(Instruction, Method)> = _AssignHeapAllocation.map(|x| (x.0, x.4)) .concat(&_AssignLocal.map(|x| (x.0, x.4))) .concat(&_AssignCast.map(|x| (x.0, x.5))) @@ -374,7 +374,7 @@ fn main() { let isVirtualMethodInvocation_Insn = _VirtualMethodInvocation.map(|x| x.0); let isStaticMethodInvocation_Insn = _StaticMethodInvocation.map(|x| x.0); - let FieldInstruction_Signature: Collection<_,(FieldInstruction, Field)> = + let FieldInstruction_Signature: VecCollection<_,(FieldInstruction, Field)> = _StoreInstanceField.map(|x| (x.0, x.4)) .concat(&_LoadInstanceField.map(|x| (x.0, x.4))) .concat(&_StoreStaticField.map(|x| (x.0, x.3))) @@ -392,7 +392,7 @@ fn main() { let StoreArrayIndex_From = _StoreArrayIndex.map(|x| (x.0, x.2)); let StoreArrayIndex_Base = _StoreArrayIndex.map(|x| (x.0, x.3)); - let AssignInstruction_To: Collection<_,(AssignInstruction, Var)> = + let AssignInstruction_To: VecCollection<_,(AssignInstruction, Var)> = _AssignHeapAllocation.map(|x| (x.0, x.3)) .concat(&_AssignLocal.map(|x| (x.0, x.3))) .concat(&_AssignCast.map(|x| (x.0, x.3))); @@ -403,7 +403,7 @@ fn main() { let AssignHeapAllocation_Heap = _AssignHeapAllocation.map(|x| (x.0, x.2)); let ReturnNonvoid_Var = _Return.map(|x| (x.0, x.2)); - let MethodInvocation_Method: Collection<_,(MethodInvocation, Method)> = + let MethodInvocation_Method: VecCollection<_,(MethodInvocation, Method)> = _StaticMethodInvocation.map(|x| (x.0, x.2)) .concat(&_SpecialMethodInvocation.map(|x| (x.0, x.2))) .concat(&_VirtualMethodInvocation.map(|x| (x.0, x.2))); @@ -420,7 +420,7 @@ fn main() { // LoadInstanceField_Base(?insn, ?base), // FieldInstruction_Signature(?insn, ?sig), // LoadInstanceField_To(?insn, ?to). - let LoadInstanceField: Collection<_,(Var, Field, Var, Method)> = + let LoadInstanceField: VecCollection<_,(Var, Field, Var, Method)> = Instruction_Method .join(&LoadInstanceField_Base) .join(&FieldInstruction_Signature) @@ -432,7 +432,7 @@ fn main() { // StoreInstanceField_From(?insn, ?from), // StoreInstanceField_Base(?insn, ?base), // FieldInstruction_Signature(?insn, ?sig). - let StoreInstanceField: Collection<_,(Var, Var, Field, Method)> = + let StoreInstanceField: VecCollection<_,(Var, Var, Field, Method)> = Instruction_Method .join(&StoreInstanceField_From) .join(&StoreInstanceField_Base) @@ -443,7 +443,7 @@ fn main() { // Instruction_Method(?insn, ?inmethod), // FieldInstruction_Signature(?insn, ?sig), // LoadStaticField_To(?insn, ?to). - let LoadStaticField: Collection<_,(Field, Var, Method)> = + let LoadStaticField: VecCollection<_,(Field, Var, Method)> = Instruction_Method .join(&FieldInstruction_Signature) .join(&LoadStaticField_To) @@ -453,7 +453,7 @@ fn main() { // Instruction_Method(?insn, ?inmethod), // StoreStaticField_From(?insn, ?from), // FieldInstruction_Signature(?insn, ?sig). - let StoreStaticField: Collection<_,(Var, Field, Method)> = + let StoreStaticField: VecCollection<_,(Var, Field, Method)> = Instruction_Method .join(&StoreStaticField_From) .join(&FieldInstruction_Signature) @@ -463,7 +463,7 @@ fn main() { // Instruction_Method(?insn, ?inmethod), // LoadArrayIndex_Base(?insn, ?base), // LoadArrayIndex_To(?insn, ?to). - let LoadArrayIndex: Collection<_,(Var, Var, Method)> = + let LoadArrayIndex: VecCollection<_,(Var, Var, Method)> = Instruction_Method .join(&LoadArrayIndex_Base) .join(&LoadArrayIndex_To) @@ -473,7 +473,7 @@ fn main() { // Instruction_Method(?insn, ?inmethod), // StoreArrayIndex_From(?insn, ?from), // StoreArrayIndex_Base(?insn, ?base). - let StoreArrayIndex: Collection<_,(Var, Var, Method)> = + let StoreArrayIndex: VecCollection<_,(Var, Var, Method)> = Instruction_Method .join(&StoreArrayIndex_From) .join(&StoreArrayIndex_Base) @@ -484,7 +484,7 @@ fn main() { // AssignCast_From(?insn, ?from), // AssignInstruction_To(?insn, ?to), // AssignCast_Type(?insn, ?type). - let AssignCast: Collection<_,(Type, Var, Var, Method)> = + let AssignCast: VecCollection<_,(Type, Var, Var, Method)> = Instruction_Method .join(&AssignCast_From) .join(&AssignInstruction_To) @@ -495,7 +495,7 @@ fn main() { // AssignInstruction_To(?insn, ?to), // Instruction_Method(?insn, ?inmethod), // AssignLocal_From(?insn, ?from). - let AssignLocal: Collection<_,(Var, Var, Method)> = + let AssignLocal: VecCollection<_,(Var, Var, Method)> = Instruction_Method .join(&AssignInstruction_To) .join(&AssignLocal_From) @@ -505,7 +505,7 @@ fn main() { // Instruction_Method(?insn, ?inmethod), // AssignHeapAllocation_Heap(?insn, ?heap), // AssignInstruction_To(?insn, ?to). - let AssignHeapAllocation: Collection<_,(HeapAllocation, Var, Method)> = + let AssignHeapAllocation: VecCollection<_,(HeapAllocation, Var, Method)> = Instruction_Method .join(&AssignHeapAllocation_Heap) .join(&AssignInstruction_To) @@ -514,7 +514,7 @@ fn main() { // ReturnVar(?var, ?method) :- // Instruction_Method(?insn, ?method), // ReturnNonvoid_Var(?insn, ?var). - let ReturnVar: Collection<_,(Var, Method)> = + let ReturnVar: VecCollection<_,(Var, Method)> = Instruction_Method .join(&ReturnNonvoid_Var) .map(|(_insn, (inmethod, var))| (var, inmethod)); @@ -523,7 +523,7 @@ fn main() { // isStaticMethodInvocation_Insn(?invocation), // Instruction_Method(?invocation, ?inmethod), // MethodInvocation_Method(?invocation, ?signature). - let StaticMethodInvocation: Collection<_,(MethodInvocation, Method, Method)> = + let StaticMethodInvocation: VecCollection<_,(MethodInvocation, Method, Method)> = Instruction_Method .semijoin(&isStaticMethodInvocation_Insn) .join(&MethodInvocation_Method) diff --git a/experiments/src/bin/deals-interactive.rs b/experiments/src/bin/deals-interactive.rs index 368a4ae5c..da053de68 100644 --- a/experiments/src/bin/deals-interactive.rs +++ b/experiments/src/bin/deals-interactive.rs @@ -5,7 +5,7 @@ use timely::dataflow::*; use timely::WorkerConfig; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::lattice::Lattice; @@ -207,10 +207,10 @@ fn main() { fn interactive( edges: &Arrange, - tc_1: Collection, - tc_2: Collection, - sg_x: Collection -) -> Collection + tc_1: VecCollection, + tc_2: VecCollection, + sg_x: VecCollection +) -> VecCollection where G::Timestamp: Lattice{ // descendants of tc_1: @@ -283,4 +283,4 @@ where G::Timestamp: Lattice{ .semijoin(&sg_x); query1.concat(&query2).concat(&query3).map(|(q,_)| q) -} \ No newline at end of file +} diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index f9776d17a..7730a449d 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -3,7 +3,7 @@ use std::time::Instant; use timely::dataflow::*; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder}; @@ -83,7 +83,7 @@ fn main() { use timely::order::Product; // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn tc>(edges: &EdgeArranged) -> Collection { +fn tc>(edges: &EdgeArranged) -> VecCollection { // repeatedly update minimal distances each node can be reached from each root edges.stream.scope().iterative::(|scope| { @@ -108,7 +108,7 @@ fn tc>(edges: &EdgeArranged) -> C } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn sg>(edges: &EdgeArranged) -> Collection { +fn sg>(edges: &EdgeArranged) -> VecCollection { let peers = edges.join_core(&edges, |_,&x,&y| Some((x,y))).filter(|&(x,y)| x != y); diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 18dc0923f..c9bfe590a 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -7,7 +7,7 @@ use timely::dataflow::operators::probe::Handle; use timely::order::Product; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::iterate::Variable; @@ -266,7 +266,7 @@ type Arrange = Arranged( forward_graph: &Arrange, reverse_graph: &Arrange, - goals: &Collection) -> Collection + goals: &VecCollection) -> VecCollection where G::Timestamp: Lattice+Ord { let sources = goals.map(|(x,_)| x); @@ -293,7 +293,7 @@ where G::Timestamp: Lattice+Ord { fn _bidijkstra( forward_graph: &Arrange, reverse_graph: &Arrange, - goals: &Collection) -> Collection + goals: &VecCollection) -> VecCollection where G::Timestamp: Lattice+Ord { goals.scope().iterative::(|inner| { @@ -363,7 +363,7 @@ where G::Timestamp: Lattice+Ord { } -fn connected_components(graph: &Arrange) -> Collection +fn connected_components(graph: &Arrange) -> VecCollection where G::Timestamp: Lattice { // each edge (x,y) means that we need at least a label for the min of x and y. diff --git a/experiments/src/bin/graphs-interactive-neu-zwei.rs b/experiments/src/bin/graphs-interactive-neu-zwei.rs index dc877d27a..39d009413 100644 --- a/experiments/src/bin/graphs-interactive-neu-zwei.rs +++ b/experiments/src/bin/graphs-interactive-neu-zwei.rs @@ -4,7 +4,7 @@ use timely::dataflow::*; use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; // use differential_dataflow::operators::iterate::Variable; @@ -236,7 +236,7 @@ type Arrange = Arranged( forward_graph: &Arrange, reverse_graph: &Arrange, - goals: &Collection) -> Collection + goals: &VecCollection) -> VecCollection where G::Timestamp: Lattice+Ord { let sources = goals.map(|(x,_)| x); @@ -263,8 +263,8 @@ where G::Timestamp: Lattice+Ord { // fn bidijkstra( // forward_graph: &Arrange, // reverse_graph: &Arrange, -// goals: &Collection, -// bound: u64) -> Collection +// goals: &VecCollection, +// bound: u64) -> VecCollection // where G::Timestamp: Lattice+Ord { // goals.scope().scoped(|inner| { @@ -331,4 +331,4 @@ where G::Timestamp: Lattice+Ord { // reached.leave() // }) -// } \ No newline at end of file +// } diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index 6210113ed..dc28a6c51 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -7,7 +7,7 @@ use timely::dataflow::operators::probe::Handle; use timely::order::Product; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::iterate::Variable; @@ -300,7 +300,7 @@ type Arrange = Arranged( forward_graph: &Arrange, reverse_graph: &Arrange, - goals: &Collection) -> Collection + goals: &VecCollection) -> VecCollection where G::Timestamp: Lattice+Ord { let sources = goals.map(|(x,_)| x); @@ -327,7 +327,7 @@ where G::Timestamp: Lattice+Ord { fn _bidijkstra( forward_graph: &Arrange, reverse_graph: &Arrange, - goals: &Collection) -> Collection + goals: &VecCollection) -> VecCollection where G::Timestamp: Lattice+Ord { goals.scope().iterative::(|inner| { diff --git a/experiments/src/bin/graphs-interactive.rs b/experiments/src/bin/graphs-interactive.rs index ba96779ac..c6d0ad36d 100644 --- a/experiments/src/bin/graphs-interactive.rs +++ b/experiments/src/bin/graphs-interactive.rs @@ -5,7 +5,7 @@ use timely::dataflow::operators::probe::Handle; use timely::order::Product; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::iterate::Variable; @@ -204,7 +204,7 @@ type Arrange = Arranged( forward_graph: &Arrange, reverse_graph: &Arrange, - goals: &Collection) -> Collection + goals: &VecCollection) -> VecCollection where G::Timestamp: Lattice+Ord { let sources = goals.map(|(x,_)| x); @@ -231,7 +231,7 @@ where G::Timestamp: Lattice+Ord { fn _bidijkstra( forward_graph: &Arrange, reverse_graph: &Arrange, - goals: &Collection) -> Collection + goals: &VecCollection) -> VecCollection where G::Timestamp: Lattice+Ord { goals.scope().iterative::(|inner| { @@ -298,4 +298,4 @@ where G::Timestamp: Lattice+Ord { reached.leave() }) -} \ No newline at end of file +} diff --git a/experiments/src/bin/graphs-static.rs b/experiments/src/bin/graphs-static.rs index e57136acc..1f764e81a 100644 --- a/experiments/src/bin/graphs-static.rs +++ b/experiments/src/bin/graphs-static.rs @@ -4,7 +4,7 @@ use timely::order::Product; use timely::dataflow::operators::ToStream; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::arrange::ArrangeBySelf; @@ -110,8 +110,8 @@ type TraceHandle = TraceAgent; fn reach> ( graph: &mut TraceHandle, - roots: Collection -) -> Collection { + roots: VecCollection +) -> VecCollection { let graph = graph.import(&roots.scope()); @@ -135,8 +135,8 @@ fn reach> ( fn bfs> ( graph: &mut TraceHandle, - roots: Collection -) -> Collection { + roots: VecCollection +) -> VecCollection { let graph = graph.import(&roots.scope()); let roots = roots.map(|r| (r,0)); @@ -161,7 +161,7 @@ fn connected_components>( scope: &mut G, forward: &mut TraceHandle, reverse: &mut TraceHandle, -) -> Collection { +) -> VecCollection { let forward = forward.import(scope); let reverse = reverse.import(scope); diff --git a/experiments/src/bin/graphs.rs b/experiments/src/bin/graphs.rs index e1b11404a..be4676628 100644 --- a/experiments/src/bin/graphs.rs +++ b/experiments/src/bin/graphs.rs @@ -3,7 +3,7 @@ use rand::{Rng, SeedableRng, StdRng}; use timely::dataflow::*; use differential_dataflow::input::Input; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::arrange::ArrangeBySelf; @@ -92,8 +92,8 @@ type TraceHandle = TraceAgent; fn reach> ( graph: &mut TraceHandle, - roots: Collection -) -> Collection { + roots: VecCollection +) -> VecCollection { let graph = graph.import(&roots.scope()); @@ -114,8 +114,8 @@ fn reach> ( fn bfs> ( graph: &mut TraceHandle, - roots: Collection -) -> Collection { + roots: VecCollection +) -> VecCollection { let graph = graph.import(&roots.scope()); let roots = roots.map(|r| (r,0)); @@ -133,7 +133,7 @@ fn bfs> ( // fn connected_components>( // graph: &mut TraceHandle -// ) -> Collection { +// ) -> VecCollection { // // each edge (x,y) means that we need at least a label for the min of x and y. // let nodes = diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index 042e9f486..dd5563d7b 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -6,7 +6,7 @@ use timely::order::Product; use differential_dataflow::operators::iterate::SemigroupVariable; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; @@ -92,13 +92,13 @@ fn unoptimized() { ; // MA(a,b) <- D(x,a),VA(x,y),D(y,b) - let memory_alias_next: Collection<_,_,Present> = + let memory_alias_next: VecCollection<_,_,Present> = value_alias_next .join_core(&dereference, |_x,&y,&a| Some((y,a))) .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_y,&a,&b| Some((a,b))); - let memory_alias_next: Collection<_,_,Present> = + let memory_alias_next: VecCollection<_,_,Present> = memory_alias_next .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() diff --git a/interactive/src/plan/concat.rs b/interactive/src/plan/concat.rs index c688a1015..4c27941bb 100644 --- a/interactive/src/plan/concat.rs +++ b/interactive/src/plan/concat.rs @@ -22,7 +22,7 @@ impl Render for Concat { fn render>( &self, scope: &mut S, - arrangements: &mut TraceManager) -> Collection, Diff> + arrangements: &mut TraceManager) -> VecCollection, Diff> { use timely::dataflow::operators::Concatenate; use differential_dataflow::AsCollection; diff --git a/interactive/src/plan/filter.rs b/interactive/src/plan/filter.rs index f942e7af9..ba508f151 100644 --- a/interactive/src/plan/filter.rs +++ b/interactive/src/plan/filter.rs @@ -4,7 +4,7 @@ use std::hash::Hash; use serde::{Deserialize, Serialize}; use timely::dataflow::Scope; -use differential_dataflow::{Collection, ExchangeData}; +use differential_dataflow::{VecCollection, ExchangeData}; use crate::plan::{Plan, Render}; use crate::{TraceManager, Time, Diff, Datum}; @@ -87,13 +87,13 @@ impl Render for Filter { fn render>( &self, scope: &mut S, - collections: &mut std::collections::HashMap, Collection, Diff>>, + collections: &mut std::collections::HashMap, VecCollection, Diff>>, arrangements: &mut TraceManager, - ) -> Collection, Diff> + ) -> VecCollection, Diff> { let predicate = self.predicate.clone(); self.plan .render(scope, collections, arrangements) .filter(move |tuple| predicate.satisfied(tuple)) } -} \ No newline at end of file +} diff --git a/interactive/src/plan/join.rs b/interactive/src/plan/join.rs index 4fecefa52..61696b43e 100644 --- a/interactive/src/plan/join.rs +++ b/interactive/src/plan/join.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use timely::dataflow::Scope; -use differential_dataflow::{Collection, ExchangeData}; +use differential_dataflow::{VecCollection, ExchangeData}; use crate::plan::{Plan, Render}; use crate::{TraceManager, Time, Diff, Datum}; @@ -29,9 +29,9 @@ impl Render for Join { fn render>( &self, scope: &mut S, - collections: &mut std::collections::HashMap, Collection, Diff>>, + collections: &mut std::collections::HashMap, VecCollection, Diff>>, arrangements: &mut TraceManager, - ) -> Collection, Diff> + ) -> VecCollection, Diff> { use differential_dataflow::operators::arrange::ArrangeByKey; diff --git a/interactive/src/plan/map.rs b/interactive/src/plan/map.rs index 876d97435..20647ce81 100644 --- a/interactive/src/plan/map.rs +++ b/interactive/src/plan/map.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use timely::dataflow::Scope; -use differential_dataflow::{Collection, ExchangeData}; +use differential_dataflow::{VecCollection, ExchangeData}; use crate::plan::{Plan, Render}; use crate::{TraceManager, Time, Diff, Datum}; @@ -28,9 +28,9 @@ impl Render for Map { fn render>( &self, scope: &mut S, - collections: &mut std::collections::HashMap, Collection, Diff>>, + collections: &mut std::collections::HashMap, VecCollection, Diff>>, arrangements: &mut TraceManager, - ) -> Collection, Diff> + ) -> VecCollection, Diff> { let expressions = self.expressions.clone(); diff --git a/interactive/src/plan/mod.rs b/interactive/src/plan/mod.rs index a8cbadf2e..e0ae29f6c 100644 --- a/interactive/src/plan/mod.rs +++ b/interactive/src/plan/mod.rs @@ -4,7 +4,7 @@ use std::hash::Hash; use serde::{Deserialize, Serialize}; use timely::dataflow::Scope; -use differential_dataflow::{Collection, ExchangeData}; +use differential_dataflow::{VecCollection, ExchangeData}; use crate::{TraceManager, Time, Diff}; @@ -35,9 +35,9 @@ pub trait Render : Sized { fn render>( &self, scope: &mut S, - collections: &mut std::collections::HashMap, Collection, Diff>>, + collections: &mut std::collections::HashMap, VecCollection, Diff>>, arrangements: &mut TraceManager, - ) -> Collection, Diff>; + ) -> VecCollection, Diff>; } /// Possible query plan types. @@ -145,9 +145,9 @@ impl Render for Plan { fn render>( &self, scope: &mut S, - collections: &mut std::collections::HashMap, Collection, Diff>>, + collections: &mut std::collections::HashMap, VecCollection, Diff>>, arrangements: &mut TraceManager, - ) -> Collection, Diff> + ) -> VecCollection, Diff> { if collections.get(self).is_none() { diff --git a/interactive/src/plan/sfw.rs b/interactive/src/plan/sfw.rs index 9709d4375..feca49b1a 100644 --- a/interactive/src/plan/sfw.rs +++ b/interactive/src/plan/sfw.rs @@ -31,7 +31,7 @@ use timely::dataflow::Scope; use differential_dataflow::operators::arrange::{ArrangeBySelf, ArrangeByKey}; -use differential_dataflow::{Collection, ExchangeData}; +use differential_dataflow::{VecCollection, ExchangeData}; use crate::plan::{Plan, Render}; use crate::{TraceManager, Time, Diff, Datum}; @@ -70,9 +70,9 @@ impl Render for MultiwayJoin { fn render>( &self, scope: &mut S, - collections: &mut std::collections::HashMap, Collection, Diff>>, + collections: &mut std::collections::HashMap, VecCollection, Diff>>, arrangements: &mut TraceManager, - ) -> Collection, Diff> + ) -> VecCollection, Diff> { // The idea here is the following: // diff --git a/mdbook/src/chapter_2/chapter_2_1.md b/mdbook/src/chapter_2/chapter_2_1.md index 6a3ac46bd..320210f21 100644 --- a/mdbook/src/chapter_2/chapter_2_1.md +++ b/mdbook/src/chapter_2/chapter_2_1.md @@ -8,10 +8,10 @@ As an example, our example program used `map` to reverse the pairs of identifier # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; -# use differential_dataflow::Collection; +# use differential_dataflow::VecCollection; # use differential_dataflow::lattice::Lattice; # use differential_dataflow::operators::Join; -# fn example(manages: &Collection) +# fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { manages @@ -27,9 +27,9 @@ If instead we had just written # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; -# use differential_dataflow::Collection; +# use differential_dataflow::VecCollection; # use differential_dataflow::lattice::Lattice; -# fn example(manages: &Collection) +# fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { manages diff --git a/mdbook/src/chapter_2/chapter_2_2.md b/mdbook/src/chapter_2/chapter_2_2.md index 014984279..ee99cfdb9 100644 --- a/mdbook/src/chapter_2/chapter_2_2.md +++ b/mdbook/src/chapter_2/chapter_2_2.md @@ -8,9 +8,9 @@ As an example, we might select out those management relation where the manager h # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; -# use differential_dataflow::Collection; +# use differential_dataflow::VecCollection; # use differential_dataflow::lattice::Lattice; -# fn example(manages: &Collection) +# fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { manages diff --git a/mdbook/src/chapter_2/chapter_2_3.md b/mdbook/src/chapter_2/chapter_2_3.md index 16f3c7679..0f58a16c1 100644 --- a/mdbook/src/chapter_2/chapter_2_3.md +++ b/mdbook/src/chapter_2/chapter_2_3.md @@ -8,9 +8,9 @@ For example, we might form the symmetric "management relation" by concatenating # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; -# use differential_dataflow::Collection; +# use differential_dataflow::VecCollection; # use differential_dataflow::lattice::Lattice; -# fn example(manages: &Collection) +# fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { manages diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index b9149792e..5f002bb28 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -10,10 +10,10 @@ As an example, if we were to inspect # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; -# use differential_dataflow::Collection; +# use differential_dataflow::VecCollection; # use differential_dataflow::lattice::Lattice; # use differential_dataflow::operators::Reduce; -# fn example(manages: &Collection) +# fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { manages @@ -36,9 +36,9 @@ However, by introducing `consolidate` # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; -# use differential_dataflow::Collection; +# use differential_dataflow::VecCollection; # use differential_dataflow::lattice::Lattice; -# fn example(manages: &Collection) +# fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { manages diff --git a/mdbook/src/chapter_2/chapter_2_5.md b/mdbook/src/chapter_2/chapter_2_5.md index 85065e927..508c30398 100644 --- a/mdbook/src/chapter_2/chapter_2_5.md +++ b/mdbook/src/chapter_2/chapter_2_5.md @@ -8,10 +8,10 @@ Our example from earlier uses a join to match up pairs `(m2, m1)` and `(m1, p)` # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; -# use differential_dataflow::Collection; +# use differential_dataflow::VecCollection; # use differential_dataflow::operators::Join; # use differential_dataflow::lattice::Lattice; -# fn example(manages: &Collection) +# fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { manages diff --git a/mdbook/src/chapter_2/chapter_2_6.md b/mdbook/src/chapter_2/chapter_2_6.md index 10ff885b1..f966f5002 100644 --- a/mdbook/src/chapter_2/chapter_2_6.md +++ b/mdbook/src/chapter_2/chapter_2_6.md @@ -8,10 +8,10 @@ For example, to produce for each manager their managee with the lowest identifie # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; -# use differential_dataflow::Collection; +# use differential_dataflow::VecCollection; # use differential_dataflow::lattice::Lattice; # use differential_dataflow::operators::Reduce; -# fn example(manages: &Collection) +# fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { manages diff --git a/mdbook/src/chapter_2/chapter_2_7.md b/mdbook/src/chapter_2/chapter_2_7.md index d079d7924..0099cf820 100644 --- a/mdbook/src/chapter_2/chapter_2_7.md +++ b/mdbook/src/chapter_2/chapter_2_7.md @@ -8,10 +8,10 @@ As an example, we can take our `manages` relation and determine for all employee # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; -# use differential_dataflow::Collection; +# use differential_dataflow::VecCollection; # use differential_dataflow::operators::{Join, Iterate, Threshold}; # use differential_dataflow::lattice::Lattice; -# fn example(manages: &Collection) +# fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { manages // transitive contains (manager, person) for many hops. @@ -44,11 +44,11 @@ In the example above, we could rewrite # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; -# use differential_dataflow::Collection; +# use differential_dataflow::VecCollection; # use differential_dataflow::operators::{Join, Threshold}; -# use differential_dataflow::operators::{Iterate, iterate::Variable}; +# use differential_dataflow::operators::{Iterate, iterate::VecVariable}; # use differential_dataflow::lattice::Lattice; -# fn example(manages: &Collection) +# fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { manages // transitive contains (manager, person) for many hops. @@ -86,19 +86,19 @@ As an example, the implementation of the `iterate` operator looks something like # use timely::dataflow::Scope; # use timely::dataflow::scopes::Child; # use timely::progress::Antichain; -# use differential_dataflow::Collection; -# use differential_dataflow::operators::{Iterate, iterate::Variable}; +# use differential_dataflow::VecCollection; +# use differential_dataflow::operators::{Iterate, iterate::VecVariable}; # use differential_dataflow::lattice::Lattice; -# fn logic<'a, G: Scope>(variable: &Variable, (u64, u64), isize>) -> Collection, (u64, u64)> +# fn logic<'a, G: Scope>(variable: &VecVariable, (u64, u64), isize>) -> VecCollection, (u64, u64)> # where G::Timestamp: Lattice # { # (*variable).clone() # } -# fn example<'a, G: Scope>(collection: &Collection) //, logic: impl Fn(&Variable, (u64, u64), isize>) -> Collection, (u64, u64)>) +# fn example<'a, G: Scope>(collection: &VecCollection) //, logic: impl Fn(&VecVariable, (u64, u64), isize>) -> VecCollection, (u64, u64)>) # where G::Timestamp: Lattice # { collection.scope().scoped("inner", |subgraph| { - let variable = Variable::new_from(collection.enter(subgraph), 1); + let variable = VecVariable::new_from(collection.enter(subgraph), 1); let result = logic(&variable); variable.set(&result); result.leave() diff --git a/tpchlike/src/lib.rs b/tpchlike/src/lib.rs index b323596bd..197b41997 100644 --- a/tpchlike/src/lib.rs +++ b/tpchlike/src/lib.rs @@ -10,7 +10,7 @@ use std::rc::Rc; use timely::dataflow::*; use timely::dataflow::operators::CapabilitySet; -use differential_dataflow::Collection; +use differential_dataflow::VecCollection; use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::arrange::ShutdownButton; @@ -66,27 +66,27 @@ impl InputHandles { } pub struct Collections { - customers: Collection, - lineitems: Collection, isize>, - nations: Collection, - orders: Collection, - parts: Collection, - partsupps: Collection, - regions: Collection, - suppliers: Collection, + customers: VecCollection, + lineitems: VecCollection, isize>, + nations: VecCollection, + orders: VecCollection, + parts: VecCollection, + partsupps: VecCollection, + regions: VecCollection, + suppliers: VecCollection, used: [bool; 8], } impl Collections { pub fn new( - customers: Collection, - lineitems: Collection, isize>, - nations: Collection, - orders: Collection, - parts: Collection, - partsupps: Collection, - regions: Collection, - suppliers: Collection, + customers: VecCollection, + lineitems: VecCollection, isize>, + nations: VecCollection, + orders: VecCollection, + parts: VecCollection, + partsupps: VecCollection, + regions: VecCollection, + suppliers: VecCollection, ) -> Self { Collections { @@ -102,14 +102,14 @@ impl Collections { } } - pub fn customers(&mut self) -> &Collection { self.used[0] = true; &self.customers } - pub fn lineitems(&mut self) -> &Collection, isize> { self.used[1] = true; &self.lineitems } - pub fn nations(&mut self) -> &Collection { self.used[2] = true; &self.nations } - pub fn orders(&mut self) -> &Collection { self.used[3] = true; &self.orders } - pub fn parts(&mut self) -> &Collection { self.used[4] = true; &self.parts } - pub fn partsupps(&mut self) -> &Collection { self.used[5] = true; &self.partsupps } - pub fn regions(&mut self) -> &Collection { self.used[6] = true; &self.regions } - pub fn suppliers(&mut self) -> &Collection { self.used[7] = true; &self.suppliers } + pub fn customers(&mut self) -> &VecCollection { self.used[0] = true; &self.customers } + pub fn lineitems(&mut self) -> &VecCollection, isize> { self.used[1] = true; &self.lineitems } + pub fn nations(&mut self) -> &VecCollection { self.used[2] = true; &self.nations } + pub fn orders(&mut self) -> &VecCollection { self.used[3] = true; &self.orders } + pub fn parts(&mut self) -> &VecCollection { self.used[4] = true; &self.parts } + pub fn partsupps(&mut self) -> &VecCollection { self.used[5] = true; &self.partsupps } + pub fn regions(&mut self) -> &VecCollection { self.used[6] = true; &self.regions } + pub fn suppliers(&mut self) -> &VecCollection { self.used[7] = true; &self.suppliers } pub fn used(&self) -> [bool; 8] { self.used } } @@ -268,7 +268,7 @@ impl Experiment { buttons: Vec::new(), } } - pub fn lineitem>(&mut self, scope: &mut G) -> Collection, isize> { + pub fn lineitem>(&mut self, scope: &mut G) -> VecCollection, isize> { use timely::dataflow::operators::Input; use differential_dataflow::AsCollection; scope.input_from(&mut self.lineitem).as_collection() @@ -278,4 +278,4 @@ impl Experiment { for mut button in self.buttons.drain(..) { button.press(); } self.token } -} \ No newline at end of file +}