Skip to content

Commit 89b5e56

Browse files
committed
Fix #395: Add next_count returning the notification count
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 209af99 commit 89b5e56

File tree

1 file changed

+25
-9
lines changed

1 file changed

+25
-9
lines changed

timely/src/dataflow/operators/generic/notificator.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@ impl<'a, T: Timestamp> Notificator<'a, T> {
5757
///
5858
/// timely::example(|scope| {
5959
/// (0..10).to_stream(scope)
60-
/// .unary_notify(Pipeline, "example", Vec::new(), |input, output, notificator| {
60+
/// .unary_notify(Pipeline, "example", Some(0), |input, output, notificator| {
6161
/// input.for_each(|cap, data| {
6262
/// output.session(&cap).give_vec(&mut data.replace(Vec::new()));
6363
/// let time = cap.time().clone() + 1;
6464
/// notificator.notify_at(cap.delayed(&time));
6565
/// });
66-
/// notificator.for_each(|cap,_,_| {
67-
/// println!("done with time: {:?}", cap.time());
66+
/// notificator.for_each(|cap, count, _| {
67+
/// println!("done with time: {:?}, requested {} times", cap.time(), count);
68+
/// assert!(*cap.time() == 0 && count == 2 || count == 1);
6869
/// });
6970
/// });
7071
/// });
@@ -99,7 +100,7 @@ impl<'a, T: Timestamp> Iterator for Notificator<'a, T> {
99100
/// timestamp.
100101
#[inline]
101102
fn next(&mut self) -> Option<(Capability<T>, u64)> {
102-
self.inner.next(self.frontiers).map(|x| (x,1))
103+
self.inner.next_count(self.frontiers)
103104
}
104105
}
105106

@@ -333,22 +334,37 @@ impl<T: Timestamp> FrontierNotificator<T> {
333334
}
334335
}
335336

336-
/// Returns the next available capability with respect to the supplied frontiers, if one exists.
337+
/// Returns the next available capability with respect to the supplied frontiers, if one exists,
338+
/// and the count of how many instances are found.
337339
///
338340
/// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
339341
/// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
340-
/// use `for_each`
342+
/// use `for_each`, or (ii) call `make_available` first.
341343
#[inline]
342-
pub fn next<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<Capability<T>> {
344+
pub fn next_count<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<(Capability<T>, u64)> {
343345
if self.available.is_empty() {
344346
self.make_available(frontiers);
345347
}
346348
self.available.pop().map(|front| {
347-
while self.available.peek() == Some(&front) { self.available.pop(); }
348-
front.element
349+
let mut count = 1;
350+
while self.available.peek() == Some(&front) {
351+
self.available.pop();
352+
count += 1;
353+
}
354+
(front.element, count)
349355
})
350356
}
351357

358+
/// Returns the next available capability with respect to the supplied frontiers, if one exists.
359+
///
360+
/// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
361+
/// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
362+
/// use `for_each`, or (ii) call `make_available` first.
363+
#[inline]
364+
pub fn next<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<Capability<T>> {
365+
self.next_count(frontiers).map(|(cap, _)| cap)
366+
}
367+
352368
/// Repeatedly calls `logic` till exhaustion of the notifications made available by inspecting
353369
/// the frontiers.
354370
///

0 commit comments

Comments
 (0)