Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ fn main() {
.unary(Pipeline, "increment", |capability, info| {

move |input, output| {
while let Some((time, data)) = input.next() {
input.for_each_time(|time, data| {
let mut session = output.session(&time);
for datum in data.drain(..) {
for datum in data.flat_map(|d| d.drain(..)) {
session.give(datum + 1);
}
}
});
}
})
.container::<Vec<_>>();
Expand Down Expand Up @@ -136,15 +136,15 @@ fn main() {
let mut maximum = 0; // define this here; use in the closure

move |input, output| {
while let Some((time, data)) = input.next() {
input.for_each_time(|time, data| {
let mut session = output.session(&time);
for datum in data.drain(..) {
for datum in data.flat_map(|d| d.drain(..)) {
if datum > maximum {
session.give(datum + 1);
maximum = datum;
}
}
}
});
}
})
.container::<Vec<_>>();
Expand Down Expand Up @@ -187,21 +187,21 @@ fn main() {
let mut notificator = FrontierNotificator::default();
let mut stash = HashMap::new();

move |input1, input2, output| {
while let Some((time, data)) = input1.next() {
move |(input1, frontier1), (input2, frontier2), output| {
input1.for_each_time(|time, data| {
stash.entry(time.time().clone())
.or_insert(Vec::new())
.push(std::mem::take(data));
.extend(data.map(std::mem::take));
notificator.notify_at(time.retain());
}
while let Some((time, data)) = input2.next() {
});
input2.for_each_time(|time, data| {
stash.entry(time.time().clone())
.or_insert(Vec::new())
.push(std::mem::take(data));
.extend(data.map(std::mem::take));
notificator.notify_at(time.retain());
}
});

notificator.for_each(&[input1.frontier(), input2.frontier()], |time, notificator| {
notificator.for_each(&[frontier1, frontier2], |time, notificator| {
let mut session = output.session(&time);
if let Some(list) = stash.remove(time.time()) {
for mut vector in list.into_iter() {
Expand Down Expand Up @@ -237,21 +237,21 @@ fn main() {

let mut stash = HashMap::new();

move |input1, input2, output| {
move |(input1, frontier1), (input2, frontier2), output| {

while let Some((time, data)) = input1.next() {
input1.for_each_time(|time, data| {
stash.entry(time.retain())
.or_insert(Vec::new())
.push(std::mem::take(data));
}
while let Some((time, data)) = input2.next() {
.extend(data.map(std::mem::take));
});
input2.for_each_time(|time, data| {
stash.entry(time.retain())
.or_insert(Vec::new())
.push(std::mem::take(data));
}
.extend(data.map(std::mem::take));
});

// consider sending everything in `stash`.
let frontiers = &[input1.frontier(), input2.frontier()];
let frontiers = &[frontier1, frontier2];
for (time, list) in stash.iter_mut() {
// if neither input can produce data at `time`, ship `list`.
if frontiers.iter().all(|f| !f.less_equal(time.time())) {
Expand Down
10 changes: 5 additions & 5 deletions mdbook/src/chapter_2/chapter_2_5.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,18 @@ As before, I'm just going to show you the new code, which now lives just after `
let mut counts = HashMap::new();
let mut buffer = Vec::new();

move |input, output| {
move |(input, frontier), output| {

// for each input batch, stash it at `time`.
while let Some((time, data)) = input.next() {
input.for_each_time(|time, data| {
queues.entry(time.retain())
.or_insert(Vec::new())
.extend(std::mem::take(data));
}
.extend(data.flat_map(|d| d.drain(..)));
});

// enable each stashed time if ready.
for (time, vals) in queues.iter_mut() {
if !input.frontier().less_equal(time.time()) {
if !frontier.less_equal(time.time()) {
let vals = std::mem::replace(vals, Vec::new());
buffer.push((time.clone(), vals));
}
Expand Down
8 changes: 4 additions & 4 deletions mdbook/src/chapter_4/chapter_4_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,20 @@ fn main() {
// Buffer records until all prior timestamps have completed.
.binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {

move |input1, input2, output| {
move |(input1, frontier1), (input2, frontier2), output| {

// Stash received data.
input1.for_each(|time, data| {
input1.for_each_time(|time, data| {
stash.entry(time.retain())
.or_insert(Vec::new())
.extend(data.drain(..));
.extend(data.flat_map(|d| d.drain(..)));
});

// Consider sending stashed data.
for (time, data) in stash.iter_mut() {
// Only send data once the probe is not less than the time.
// That is, once we have finished all strictly prior work.
if !input2.frontier().less_than(time.time()) {
if !frontier2.less_than(time.time()) {
output.session(&time).give_iterator(data.drain(..));
}
}
Expand Down
8 changes: 4 additions & 4 deletions timely/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ fn main() {
move |input1, input2, output, notify| {

// receive edges, start to sort them
input1.for_each(|time, data| {
input1.for_each_time(|time, data| {
notify.notify_at(time.retain());
edge_list.push(std::mem::take(data));
edge_list.extend(data.map(std::mem::take));
});

// receive (node, worker) pairs, note any new ones.
input2.for_each(|time, data| {
input2.for_each_time(|time, data| {
node_lists.entry(*time.time())
.or_insert_with(|| {
notify.notify_at(time.retain());
Vec::new()
})
.push(std::mem::take(data));
.extend(data.map(std::mem::take));
});

notify.for_each(|time, _num, _notify| {
Expand Down
25 changes: 14 additions & 11 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ fn main() {
"Split",
|_cap, _info| {
move |input, output| {
while let Some((time, data)) = input.next() {
input.for_each_time(|time, data| {
let mut session = output.session(&time);
for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
}) {
session.give(wordcount);
for data in data {
for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
}) {
session.give(wordcount);
}
}
}
});
}
},
)
Expand All @@ -63,16 +65,17 @@ fn main() {
let mut queues = HashMap::new();
let mut counts = HashMap::new();

move |input, output| {
while let Some((time, data)) = input.next() {
move |(input, frontier), output| {
input.for_each_time(|time, data| {
queues
.entry(time.retain())
.or_insert(Vec::new())
.push(std::mem::take(data));
}
.extend(data.map(std::mem::take));

});

for (key, val) in queues.iter_mut() {
if !input.frontier().less_equal(key.time()) {
if !frontier.less_equal(key.time()) {
let mut session = output.session(key);
for batch in val.drain(..) {
for wordcount in batch.borrow().into_index_iter() {
Expand Down
14 changes: 8 additions & 6 deletions timely/examples/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ fn main() {
scope.input_from(&mut input)
.unary(Exchange::new(|x| *x), "Distinct", move |_, _|
move |input, output| {
input.for_each(|time, data| {
input.for_each_time(|time, data| {
let counts =
counts_by_time
.entry(*time.time())
.or_insert(HashMap::new());
let mut session = output.session(&time);
for &datum in data.iter() {
let count = counts.entry(datum).or_insert(0);
if *count == 0 {
session.give(datum);
for data in data {
for &datum in data.iter() {
let count = counts.entry(datum).or_insert(0);
if *count == 0 {
session.give(datum);
}
*count += 1;
}
*count += 1;
}
})
})
Expand Down
8 changes: 4 additions & 4 deletions timely/examples/hashjoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ fn main() {
move |input1, input2, output| {

// Drain first input, check second map, update first map.
input1.for_each(|time, data| {
input1.for_each_time(|time, data| {
let mut session = output.session(&time);
for (key, val1) in data.drain(..) {
for (key, val1) in data.flat_map(|d| d.drain(..)) {
if let Some(values) = map2.get(&key) {
for val2 in values.iter() {
session.give((val1, *val2));
Expand All @@ -54,9 +54,9 @@ fn main() {
});

// Drain second input, check first map, update second map.
input2.for_each(|time, data| {
input2.for_each_time(|time, data| {
let mut session = output.session(&time);
for (key, val2) in data.drain(..) {
for (key, val2) in data.flat_map(|d| d.drain(..)) {
if let Some(values) = map1.get(&key) {
for val1 in values.iter() {
session.give((*val1, val2));
Expand Down
14 changes: 8 additions & 6 deletions timely/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,21 @@ fn main() {

let timer = ::std::time::Instant::now();

move |input1, input2, output| {
move |(input1, frontier1), (input2, frontier2), output| {

// hold on to edge changes until it is time.
input1.for_each(|time, data| {
edge_stash.entry(time.retain()).or_default().append(data);
input1.for_each_time(|time, data| {
let entry = edge_stash.entry(time.retain()).or_default();
data.for_each(|data| entry.append(data));
});

// hold on to rank changes until it is time.
input2.for_each(|time, data| {
rank_stash.entry(time.retain()).or_default().append(data);
input2.for_each_time(|time, data| {
let entry = rank_stash.entry(time.retain()).or_default();
data.for_each(|data| entry.append(data));
});

let frontiers = &[input1.frontier(), input2.frontier()];
let frontiers = &[frontier1, frontier2];

for (time, edge_changes) in edge_stash.iter_mut() {
if frontiers.iter().all(|f| !f.less_equal(time)) {
Expand Down
7 changes: 3 additions & 4 deletions timely/examples/unionfind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {

move |input, output| {

while let Some((time, data)) = input.next() {

input.for_each_time(|time, data| {
let mut session = output.session(&time);
for &(mut x, mut y) in data.iter() {
for &mut (mut x, mut y) in data.flatten() {

// grow arrays if required.
let m = ::std::cmp::max(x, y);
Expand All @@ -86,7 +85,7 @@ impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {
}
}
}
}
});
}
})
}
Expand Down
10 changes: 5 additions & 5 deletions timely/examples/wordcount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ fn main() {
let mut queues = HashMap::new();
let mut counts = HashMap::new();

move |input, output| {
while let Some((time, data)) = input.next() {
move |(input, frontier), output| {
input.for_each_time(|time, data| {
queues.entry(time.retain())
.or_insert(Vec::new())
.push(std::mem::take(data));
}
.extend(data.map(std::mem::take));
});

for (key, val) in queues.iter_mut() {
if !input.frontier().less_equal(key.time()) {
if !frontier.less_equal(key.time()) {
let mut session = output.session(key);
for mut batch in val.drain(..) {
for (word, diff) in batch.drain(..) {
Expand Down
9 changes: 6 additions & 3 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB,
Message::push_at(container, time, &mut self.pusher);
}
}

/// An internal implementation of push that should only be called by sessions.
#[inline]
fn push_internal<D>(&mut self, item: D) where CB: PushInto<D> {
Expand All @@ -138,11 +138,14 @@ where
T: Eq + Clone + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
/// Provide a container at the time specified by the [Session]. Maintains FIFO order with
/// previously pushed data.
/// Provide a container at the time specified by the [Session].
pub fn give_container(&mut self, container: &mut CB::Container) {
self.buffer.give_container(container)
}
/// Provide multiple containers at the time specifid by the [Session].
pub fn give_containers<'b>(&mut self, containers: impl Iterator<Item = &'b mut CB::Container>) {
for container in containers { self.buffer.give_container(container); }
}
}

impl<'a, T, CB, P> Session<'a, T, CB, P>
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/aggregation/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ impl<S: Scope<Timestamp: ::std::hash::Hash>, K: ExchangeData+Hash+Eq, V: Exchang
self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {

// read each input, fold into aggregates
input.for_each(|time, data| {
input.for_each_time(|time, data| {
let agg_time = aggregates.entry(time.time().clone()).or_insert_with(HashMap::new);
for (key, val) in data.drain(..) {
for (key, val) in data.flat_map(|d| d.drain(..)) {
let agg = agg_time.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, agg);
}
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/aggregation/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
});

// stash each input and request a notification when ready
input.for_each(|time, data| {
input.for_each_time(|time, data| {

// stash if not time yet
if notificator.frontier(0).less_than(time.time()) {
pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data);
for data in data { pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data); }
notificator.notify_at(time.retain());
}
else {
// else we can process immediately
let mut session = output.session(&time);
for (key, val) in data.drain(..) {
for (key, val) in data.flat_map(|d| d.drain(..)) {
let (remove, output) = {
let state = states.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, state)
Expand Down
Loading
Loading