Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ impl<T:Ord+Clone+'static, D: Container, P: Pull<BundleCore<T, D>>> Counter<T, D,
impl<T:Ord+Clone+'static, D, P: Pull<BundleCore<T, D>>> Counter<T, D, P> {
/// Allocates a new `Counter` from a boxed puller.
pub fn new(pullable: P) -> Self {
Self::new_with_consumed(pullable, Rc::new(RefCell::new(ChangeBatch::new())))
}

/// Allocates a new [Counter] from a puller and a shared consumed handle.
pub fn new_with_consumed(pullable: P, consumed: Rc<RefCell<ChangeBatch<T>>>) -> Self {
Counter {
phantom: ::std::marker::PhantomData,
pullable,
consumed: Rc::new(RefCell::new(ChangeBatch::new())),
consumed,
}
}
/// A references to shared changes in counts, for cloning or draining.
Expand Down
39 changes: 30 additions & 9 deletions timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use std::rc::Rc;

use crate::dataflow::channels::{BundleCore, Message};

use crate::communication::Push;
use crate::{Container, Data};
use crate::communication::Push;
use crate::communication::message::RefOrMut;

type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<BundleCore<T, D>>>>>>;
type PushList<T, D> = Rc<RefCell<Vec<(Box<dyn Push<BundleCore<T, D>>>, Box<dyn FnMut(&T, RefOrMut<D>, &mut D)>)>>>;

/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
pub struct TeeCore<T, D> {
Expand All @@ -26,18 +27,33 @@ impl<T: Data, D: Container> Push<BundleCore<T, D>> for TeeCore<T, D> {
let mut pushers = self.shared.borrow_mut();
if let Some(message) = message {
for index in 1..pushers.len() {
self.buffer.clone_from(&message.data);
Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]);
(pushers[index-1].1)(&message.time, RefOrMut::Ref(&message.data), &mut self.buffer);
if !self.buffer.is_empty() {
Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1].0);
}
}
}
else {
for index in 1..pushers.len() {
pushers[index-1].push(&mut None);
pushers[index-1].0.push(&mut None);
}
}
if pushers.len() > 0 {
let last = pushers.len() - 1;
pushers[last].push(message);
if let Some(message) = message {
let message = message.as_ref_or_mut();
let time = message.time.clone();
let data = match message {
RefOrMut::Ref(message) => RefOrMut::Ref(&message.data),
RefOrMut::Mut(message) => RefOrMut::Mut(&mut message.data)
};
(pushers[last].1)(&time, data, &mut self.buffer);
if !self.buffer.is_empty() {
Message::push_at(&mut self.buffer, time, &mut pushers[last].0);
}
} else {
pushers[last].0.push(message);
}
}
}
}
Expand Down Expand Up @@ -88,9 +104,14 @@ pub struct TeeHelper<T, D> {
}

impl<T, D> TeeHelper<T, D> {
/// Adds a new `Push` implementor to the list of recipients shared with a `Stream`.
pub fn add_pusher<P: Push<BundleCore<T, D>>+'static>(&self, pusher: P) {
self.shared.borrow_mut().push(Box::new(pusher));
/// Adds a new `Push` implementor to the list of recipients shared with a `Stream`, with an
/// associated `filter` which will be applied before pushing data..
pub fn add_pusher<P, F>(&self, pusher: P, filter: F)
where
P: Push<BundleCore<T, D>> + 'static,
F: FnMut(&T, RefOrMut<D>, &mut D) + 'static
{
self.shared.borrow_mut().push((Box::new(pusher), Box::new(filter)));
}
}

Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T
let input = scope.subgraph.borrow_mut().new_input(produced);

let channel_id = scope.clone().new_identifier();
self.connect_to(input, ingress, channel_id);
self.connect_to(input, ingress, channel_id, |_, data, buffer| data.swap(buffer));
StreamCore::new(Source::new(0, input.port), registrar, scope.clone())
}
}
Expand Down Expand Up @@ -131,7 +131,7 @@ impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave
let output = scope.subgraph.borrow_mut().new_output();
let (targets, registrar) = TeeCore::<G::Timestamp, D>::new();
let channel_id = scope.clone().new_identifier();
self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id);
self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id, |_, data, buffer| data.swap(buffer));

StreamCore::new(
output,
Expand Down
54 changes: 44 additions & 10 deletions timely/src/dataflow/operators/filter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Filters a stream by a predicate.

use crate::Data;
use crate::communication::message::RefOrMut;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;

/// Extension trait for filtering.
pub trait Filter<D: Data> {
Expand All @@ -24,15 +25,48 @@ pub trait Filter<D: Data> {

impl<G: Scope, D: Data> Filter<D> for Stream<G, D> {
fn filter<P: FnMut(&D)->bool+'static>(&self, mut predicate: P) -> Stream<G, D> {
let mut vector = Vec::new();
self.unary(Pipeline, "Filter", move |_,_| move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
vector.retain(|x| predicate(x));
if !vector.is_empty() {
output.session(&time).give_vec(&mut vector);
let mut builder = OperatorBuilder::new("Filter".to_owned(), self.scope());

let filter = move |data: RefOrMut<Vec<D>>, buffer: &mut Vec<D>| {
match data {
RefOrMut::Ref(data) => {
buffer.extend(data.into_iter().filter(|x| predicate(*x)).cloned())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fairly unrelated, but it'd be nice if operators could pass on references instead of requiring cloning, that could remove a lot of fairly redundant clones

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might do what you suggested: #471

},
RefOrMut::Mut(data) => {
data.retain(|x| predicate(x));
std::mem::swap(buffer, data);
}
});
})
}
};
let mut input = builder.new_input_filter(self, Pipeline, vec![], filter);
let (mut output, stream) = builder.new_output();
builder.set_notify(false);

builder.build(|_capabilities| {
let mut vector = Vec::new();
move |_frontiers| {
let mut output_handle = output.activate();
input.for_each(|time, data| {
data.swap(&mut vector);
output_handle.session(&time).give_vec(&mut vector);
})
}
});

stream
}
}

#[cfg(test)]
mod test {
use crate::dataflow::operators::{Filter, Inspect, ToStream};

#[test]
fn test_filter_example() {
crate::example(|scope| {
(0..10).to_stream(scope)
.filter(|x| *x % 2 == 0)
.inspect(|x| println!("seen: {:?}", x));
});
}
}
14 changes: 13 additions & 1 deletion timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::default::Default;
use std::rc::Rc;
use std::cell::RefCell;

use crate::communication::message::RefOrMut;

use crate::scheduling::{Schedule, Activations};

use crate::progress::{Source, Target};
Expand Down Expand Up @@ -116,12 +118,22 @@ impl<G: Scope> OperatorBuilder<G> {
pub fn new_input_connection<D: Container, P>(&mut self, stream: &StreamCore<G, D>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> P::Puller
where
P: ParallelizationContractCore<G::Timestamp, D> {
self.new_input_filter(stream, pact, connection, |_, data, buffer| data.swap(buffer))
}

/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
/// `filter` gives clients the opportunity to filter data before transferring over an edge.
pub fn new_input_filter<D: Container, P, F>(&mut self, stream: &StreamCore<G, D>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>, filter: F) -> P::Puller
where
P: ParallelizationContractCore<G::Timestamp, D>,
F: FnMut(&G::Timestamp, RefOrMut<D>, &mut D)+'static
{

let channel_id = self.scope.new_identifier();
let logging = self.scope.logging();
let (sender, receiver) = pact.connect(&mut self.scope, channel_id, &self.address[..], logging);
let target = Target::new(self.index, self.shape.inputs);
stream.connect_to(target, sender, channel_id);
stream.connect_to(target, sender, channel_id, filter);

self.shape.inputs += 1;
assert_eq!(self.shape.outputs, connection.len());
Expand Down
36 changes: 34 additions & 2 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::default::Default;

use crate::communication::message::RefOrMut;

use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::operate::SharedProgress;
use crate::progress::frontier::{Antichain, MutableAntichain};
Expand Down Expand Up @@ -75,10 +77,40 @@ impl<G: Scope> OperatorBuilder<G> {
pub fn new_input_connection<D: Container, P>(&mut self, stream: &StreamCore<G, D>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandleCore<G::Timestamp, D, P::Puller>
where
P: ParallelizationContractCore<G::Timestamp, D> {
self.new_input_filter(stream, pact, connection, |data, buffer| data.swap(buffer))
}

let puller = self.builder.new_input_connection(stream, pact, connection);
/// Adds a new input with connection information to a generic operator builder, returning the `Pull` implementor to use.
///
/// The `connection` parameter contains promises made by the operator for each of the existing *outputs*, that any timestamp
/// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
/// greater or equal to some element of the corresponding antichain in `connection`.
///
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
/// antichain indicating that there is no connection from the input to the output.
///
/// The `filter` parameter gets a chance to process data on the output of the upstream operator.
/// It receives a [RefOrMut] to the input data and a mutable vector to append its output. A
/// no-op implementation would simply swap the data, i.e., `|data, buffer| data.swap(buffer)`.
pub fn new_input_filter<D: Container, P, F>(&mut self, stream: &StreamCore<G, D>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>, mut filter: F) -> InputHandleCore<G::Timestamp, D, P::Puller>
where
P: ParallelizationContractCore<G::Timestamp, D>,
F: FnMut(RefOrMut<D>, &mut D) + 'static
{

// The data that's filtered on the output of the upstream operator won't show up in the
// pull counter's input. Hence, we need to announce that the messages were consumed to
// ensure the invariants of progress tracking.
let consumed = Rc::new(RefCell::new(ChangeBatch::new()));
let consumed_input = Rc::clone(&consumed);
let puller = self.builder.new_input_filter(stream, pact, connection, move |time, data, buffer| {
let len = data.len();
filter(data, buffer);
let delta = len - buffer.len();
consumed_input.borrow_mut().update(time.clone(), delta as i64);
});

let input = PullCounter::new(puller);
let input = PullCounter::new_with_consumed(puller, consumed);
self.frontier.push(MutableAntichain::new());
self.consumed.push(input.consumed().clone());

Expand Down
9 changes: 7 additions & 2 deletions timely/src/dataflow/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use crate::progress::{Source, Target};

use crate::communication::Push;
use crate::communication::message::RefOrMut;
use crate::dataflow::Scope;
use crate::dataflow::channels::pushers::tee::TeeHelper;
use crate::dataflow::channels::BundleCore;
Expand Down Expand Up @@ -37,7 +38,11 @@ impl<S: Scope, D: Container> StreamCore<S, D> {
///
/// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the
/// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
pub fn connect_to<P: Push<BundleCore<S::Timestamp, D>>+'static>(&self, target: Target, pusher: P, identifier: usize) {
pub fn connect_to<P, F>(&self, target: Target, pusher: P, identifier: usize, filter: F)
where
P: Push<BundleCore<S::Timestamp, D>> + 'static,
F: FnMut(&S::Timestamp, RefOrMut<D>, &mut D) + 'static
{

let mut logging = self.scope().logging();
logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
Expand All @@ -48,7 +53,7 @@ impl<S: Scope, D: Container> StreamCore<S, D> {
}));

self.scope.add_edge(self.name, target);
self.ports.add_pusher(pusher);
self.ports.add_pusher(pusher, filter);
}
/// Allocates a `Stream` from a supplied `Source` name and rendezvous point.
pub fn new(source: Source, output: TeeHelper<S::Timestamp, D>, scope: S) -> Self {
Expand Down