diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 04a189da5..54ec211a2 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -33,10 +33,15 @@ impl>> Counter>> Counter { /// Allocates a new `Counter` from a boxed puller. pub fn new(pullable: P) -> Self { + Self::new_with_consumed(pullable, Rc::new(RefCell::new(ChangeBatch::new()))) + } + + /// Allocates a new [Counter] from a puller and a shared consumed handle. + pub fn new_with_consumed(pullable: P, consumed: Rc>>) -> Self { Counter { phantom: ::std::marker::PhantomData, pullable, - consumed: Rc::new(RefCell::new(ChangeBatch::new())), + consumed, } } /// A references to shared changes in counts, for cloning or draining. diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 1ec05e4e7..fb3a918ab 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -6,10 +6,11 @@ use std::rc::Rc; use crate::dataflow::channels::{BundleCore, Message}; -use crate::communication::Push; use crate::{Container, Data}; +use crate::communication::Push; +use crate::communication::message::RefOrMut; -type PushList = Rc>>>>>; +type PushList = Rc>>, Box, &mut D)>)>>>; /// Wraps a shared list of `Box` to forward pushes to. Owned by `Stream`. pub struct TeeCore { @@ -26,18 +27,33 @@ impl Push> for TeeCore { let mut pushers = self.shared.borrow_mut(); if let Some(message) = message { for index in 1..pushers.len() { - self.buffer.clone_from(&message.data); - Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]); + (pushers[index-1].1)(&message.time, RefOrMut::Ref(&message.data), &mut self.buffer); + if !self.buffer.is_empty() { + Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1].0); + } } } else { for index in 1..pushers.len() { - pushers[index-1].push(&mut None); + pushers[index-1].0.push(&mut None); } } if pushers.len() > 0 { let last = pushers.len() - 1; - pushers[last].push(message); + if let Some(message) = message { + let message = message.as_ref_or_mut(); + let time = message.time.clone(); + let data = match message { + RefOrMut::Ref(message) => RefOrMut::Ref(&message.data), + RefOrMut::Mut(message) => RefOrMut::Mut(&mut message.data) + }; + (pushers[last].1)(&time, data, &mut self.buffer); + if !self.buffer.is_empty() { + Message::push_at(&mut self.buffer, time, &mut pushers[last].0); + } + } else { + pushers[last].0.push(message); + } } } } @@ -88,9 +104,14 @@ pub struct TeeHelper { } impl TeeHelper { - /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. - pub fn add_pusher>+'static>(&self, pusher: P) { - self.shared.borrow_mut().push(Box::new(pusher)); + /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`, with an + /// associated `filter` which will be applied before pushing data.. + pub fn add_pusher(&self, pusher: P, filter: F) + where + P: Push> + 'static, + F: FnMut(&T, RefOrMut, &mut D) + 'static + { + self.shared.borrow_mut().push((Box::new(pusher), Box::new(filter))); } } diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index bd9b1f86a..d2aed2301 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -99,7 +99,7 @@ impl, C: Data+Container> Enter> Leave let output = scope.subgraph.borrow_mut().new_output(); let (targets, registrar) = TeeCore::::new(); let channel_id = scope.clone().new_identifier(); - self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id); + self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id, |_, data, buffer| data.swap(buffer)); StreamCore::new( output, diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/filter.rs index 7034a9df0..0bf727216 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/filter.rs @@ -1,9 +1,10 @@ //! Filters a stream by a predicate. use crate::Data; +use crate::communication::message::RefOrMut; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Stream, Scope}; -use crate::dataflow::operators::generic::operator::Operator; +use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; /// Extension trait for filtering. pub trait Filter { @@ -24,15 +25,48 @@ pub trait Filter { impl Filter for Stream { fn filterbool+'static>(&self, mut predicate: P) -> Stream { - let mut vector = Vec::new(); - self.unary(Pipeline, "Filter", move |_,_| move |input, output| { - input.for_each(|time, data| { - data.swap(&mut vector); - vector.retain(|x| predicate(x)); - if !vector.is_empty() { - output.session(&time).give_vec(&mut vector); + let mut builder = OperatorBuilder::new("Filter".to_owned(), self.scope()); + + let filter = move |data: RefOrMut>, buffer: &mut Vec| { + match data { + RefOrMut::Ref(data) => { + buffer.extend(data.into_iter().filter(|x| predicate(*x)).cloned()) + }, + RefOrMut::Mut(data) => { + data.retain(|x| predicate(x)); + std::mem::swap(buffer, data); } - }); - }) + } + }; + let mut input = builder.new_input_filter(self, Pipeline, vec![], filter); + let (mut output, stream) = builder.new_output(); + builder.set_notify(false); + + builder.build(|_capabilities| { + let mut vector = Vec::new(); + move |_frontiers| { + let mut output_handle = output.activate(); + input.for_each(|time, data| { + data.swap(&mut vector); + output_handle.session(&time).give_vec(&mut vector); + }) + } + }); + + stream + } +} + +#[cfg(test)] +mod test { + use crate::dataflow::operators::{Filter, Inspect, ToStream}; + + #[test] + fn test_filter_example() { + crate::example(|scope| { + (0..10).to_stream(scope) + .filter(|x| *x % 2 == 0) + .inspect(|x| println!("seen: {:?}", x)); + }); } } diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 8e97492af..6c90661a2 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -8,6 +8,8 @@ use std::default::Default; use std::rc::Rc; use std::cell::RefCell; +use crate::communication::message::RefOrMut; + use crate::scheduling::{Schedule, Activations}; use crate::progress::{Source, Target}; @@ -116,12 +118,22 @@ impl OperatorBuilder { pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> P::Puller where P: ParallelizationContractCore { + self.new_input_filter(stream, pact, connection, |_, data, buffer| data.swap(buffer)) + } + + /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. + /// `filter` gives clients the opportunity to filter data before transferring over an edge. + pub fn new_input_filter(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>, filter: F) -> P::Puller + where + P: ParallelizationContractCore, + F: FnMut(&G::Timestamp, RefOrMut, &mut D)+'static + { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); let (sender, receiver) = pact.connect(&mut self.scope, channel_id, &self.address[..], logging); let target = Target::new(self.index, self.shape.inputs); - stream.connect_to(target, sender, channel_id); + stream.connect_to(target, sender, channel_id, filter); self.shape.inputs += 1; assert_eq!(self.shape.outputs, connection.len()); diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index cc505e7ee..d67d8ccb9 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -4,6 +4,8 @@ use std::rc::Rc; use std::cell::RefCell; use std::default::Default; +use crate::communication::message::RefOrMut; + use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::operate::SharedProgress; use crate::progress::frontier::{Antichain, MutableAntichain}; @@ -75,10 +77,40 @@ impl OperatorBuilder { pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> InputHandleCore where P: ParallelizationContractCore { + self.new_input_filter(stream, pact, connection, |data, buffer| data.swap(buffer)) + } - let puller = self.builder.new_input_connection(stream, pact, connection); + /// Adds a new input with connection information to a generic operator builder, returning the `Pull` implementor to use. + /// + /// The `connection` parameter contains promises made by the operator for each of the existing *outputs*, that any timestamp + /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary` + /// greater or equal to some element of the corresponding antichain in `connection`. + /// + /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty + /// antichain indicating that there is no connection from the input to the output. + /// + /// The `filter` parameter gets a chance to process data on the output of the upstream operator. + /// It receives a [RefOrMut] to the input data and a mutable vector to append its output. A + /// no-op implementation would simply swap the data, i.e., `|data, buffer| data.swap(buffer)`. + pub fn new_input_filter(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>, mut filter: F) -> InputHandleCore + where + P: ParallelizationContractCore, + F: FnMut(RefOrMut, &mut D) + 'static + { + + // The data that's filtered on the output of the upstream operator won't show up in the + // pull counter's input. Hence, we need to announce that the messages were consumed to + // ensure the invariants of progress tracking. + let consumed = Rc::new(RefCell::new(ChangeBatch::new())); + let consumed_input = Rc::clone(&consumed); + let puller = self.builder.new_input_filter(stream, pact, connection, move |time, data, buffer| { + let len = data.len(); + filter(data, buffer); + let delta = len - buffer.len(); + consumed_input.borrow_mut().update(time.clone(), delta as i64); + }); - let input = PullCounter::new(puller); + let input = PullCounter::new_with_consumed(puller, consumed); self.frontier.push(MutableAntichain::new()); self.consumed.push(input.consumed().clone()); diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index aa5e48601..55813f52f 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -7,6 +7,7 @@ use crate::progress::{Source, Target}; use crate::communication::Push; +use crate::communication::message::RefOrMut; use crate::dataflow::Scope; use crate::dataflow::channels::pushers::tee::TeeHelper; use crate::dataflow::channels::BundleCore; @@ -37,7 +38,11 @@ impl StreamCore { /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { + pub fn connect_to(&self, target: Target, pusher: P, identifier: usize, filter: F) + where + P: Push> + 'static, + F: FnMut(&S::Timestamp, RefOrMut, &mut D) + 'static + { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { @@ -48,7 +53,7 @@ impl StreamCore { })); self.scope.add_edge(self.name, target); - self.ports.add_pusher(pusher); + self.ports.add_pusher(pusher, filter); } /// Allocates a `Stream` from a supplied `Source` name and rendezvous point. pub fn new(source: Source, output: TeeHelper, scope: S) -> Self {