Skip to content

Commit c27a14b

Browse files
committed
Present owned OutputBuilderSession
1 parent 9b5d5b9 commit c27a14b

32 files changed

+81
-84
lines changed

mdbook/src/chapter_2/chapter_2_4.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ fn main() {
1919
.to_stream(scope)
2020
.unary(Pipeline, "increment", |capability, info| {
2121

22-
move |input, output| {
22+
move |input, mut output| {
2323
input.for_each_time(|time, data| {
2424
let mut session = output.session(&time);
2525
for datum in data.flat_map(|d| d.drain(..)) {
@@ -81,7 +81,7 @@ fn main() {
8181
let activator = scope.activator_for(info.address);
8282

8383
let mut cap = Some(capability);
84-
move |output| {
84+
move |mut output| {
8585

8686
let mut done = false;
8787
if let Some(cap) = cap.as_mut() {
@@ -135,7 +135,7 @@ fn main() {
135135

136136
let mut maximum = 0; // define this here; use in the closure
137137

138-
move |input, output| {
138+
move |input, mut output| {
139139
input.for_each_time(|time, data| {
140140
let mut session = output.session(&time);
141141
for datum in data.flat_map(|d| d.drain(..)) {
@@ -187,7 +187,7 @@ fn main() {
187187
let mut notificator = FrontierNotificator::default();
188188
let mut stash = HashMap::new();
189189

190-
move |(input1, frontier1), (input2, frontier2), output| {
190+
move |(input1, frontier1), (input2, frontier2), mut output| {
191191
input1.for_each_time(|time, data| {
192192
stash.entry(time.time().clone())
193193
.or_insert(Vec::new())
@@ -237,7 +237,7 @@ fn main() {
237237

238238
let mut stash = HashMap::new();
239239

240-
move |(input1, frontier1), (input2, frontier2), output| {
240+
move |(input1, frontier1), (input2, frontier2), mut output| {
241241

242242
input1.for_each_time(|time, data| {
243243
stash.entry(time.retain())

mdbook/src/chapter_2/chapter_2_5.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ As before, I'm just going to show you the new code, which now lives just after `
206206
let mut counts = HashMap::new();
207207
let mut buffer = Vec::new();
208208

209-
move |(input, frontier), output| {
209+
move |(input, frontier), mut output| {
210210

211211
// for each input batch, stash it at `time`.
212212
input.for_each_time(|time, data| {

mdbook/src/chapter_4/chapter_4_3.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ fn main() {
7878
// Buffer records until all prior timestamps have completed.
7979
.binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
8080
81-
move |(input1, frontier1), (input2, frontier2), output| {
81+
move |(input1, frontier1), (input2, frontier2), mut output| {
8282
8383
// Stash received data.
8484
input1.for_each_time(|time, data| {

timely/examples/bfs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ fn main() {
5050
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
5151
"BFS",
5252
vec![],
53-
move |input1, input2, output, notify| {
53+
move |input1, input2, mut output, notify| {
5454

5555
// receive edges, start to sort them
5656
input1.for_each_time(|time, data| {

timely/examples/columnar.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ fn main() {
4343
Pipeline,
4444
"Split",
4545
|_cap, _info| {
46-
move |input, output| {
46+
move |input, mut output| {
4747
input.for_each_time(|time, data| {
4848
let mut session = output.session(&time);
4949
for data in data {
@@ -65,7 +65,7 @@ fn main() {
6565
let mut queues = HashMap::new();
6666
let mut counts = HashMap::new();
6767

68-
move |(input, frontier), output| {
68+
move |(input, frontier), mut output| {
6969
input.for_each_time(|time, data| {
7070
queues
7171
.entry(time.retain())

timely/examples/distinct.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ fn main() {
1717
let mut counts_by_time = HashMap::new();
1818
scope.input_from(&mut input)
1919
.unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20-
move |input, output| {
20+
move |input, mut output| {
2121
input.for_each_time(|time, data| {
2222
let counts =
2323
counts_by_time

timely/examples/hashjoin.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ fn main() {
3737
let mut map1 = HashMap::<u64, Vec<u64>>::new();
3838
let mut map2 = HashMap::<u64, Vec<u64>>::new();
3939

40-
move |input1, input2, output| {
40+
move |input1, input2, mut output| {
4141

4242
// Drain first input, check second map, update first map.
4343
input1.for_each_time(|time, data| {

timely/examples/pagerank.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ fn main() {
4141

4242
let timer = ::std::time::Instant::now();
4343

44-
move |(input1, frontier1), (input2, frontier2), output| {
44+
move |(input1, frontier1), (input2, frontier2), mut output| {
4545

4646
// hold on to edge changes until it is time.
4747
input1.for_each_time(|time, data| {

timely/examples/unionfind.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {
5959
let mut roots = vec![]; // u32 works, and is smaller than uint/u64
6060
let mut ranks = vec![]; // u8 should be large enough (n < 2^256)
6161

62-
move |input, output| {
62+
move |input, mut output| {
6363

6464
input.for_each_time(|time, data| {
6565
let mut session = output.session(&time);

timely/examples/wordcount.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ fn main() {
2929
let mut queues = HashMap::new();
3030
let mut counts = HashMap::new();
3131

32-
move |(input, frontier), output| {
32+
move |(input, frontier), mut output| {
3333
input.for_each_time(|time, data| {
3434
queues.entry(time.retain())
3535
.or_insert(Vec::new())

0 commit comments

Comments
 (0)