diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index b6add3d3d..6f63f099b 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -51,6 +51,7 @@ pub mod to_stream; pub mod capture; pub mod branch; pub mod ok_err; +pub mod rc; pub mod result; pub mod aggregation; diff --git a/timely/src/dataflow/operators/rc.rs b/timely/src/dataflow/operators/rc.rs new file mode 100644 index 000000000..eaae55093 --- /dev/null +++ b/timely/src/dataflow/operators/rc.rs @@ -0,0 +1,82 @@ +//! Shared containers + +use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::operators::Operator; +use crate::dataflow::{Scope, StreamCore}; +use crate::Container; +use std::rc::Rc; + +/// Convert a stream into a stream of shared containers +pub trait SharedStream { + /// Convert a stream into a stream of shared data + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::rc::SharedStream; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .shared() + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn shared(&self) -> StreamCore>; +} + +impl SharedStream for StreamCore { + fn shared(&self) -> StreamCore> { + let mut container = Default::default(); + self.unary(Pipeline, "Shared", move |_, _| { + move |input, output| { + input.for_each(|time, data| { + data.swap(&mut container); + output + .session(&time) + .give_container(&mut Rc::new(std::mem::take(&mut container))); + }); + } + }) + } +} + +#[cfg(test)] +mod test { + use crate::dataflow::channels::pact::Pipeline; + use crate::dataflow::operators::capture::Extract; + use crate::dataflow::operators::rc::SharedStream; + use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream}; + + #[test] + fn test_shared() { + let output = crate::example(|scope| { + let shared = vec![Ok(0), Err(())].to_stream(scope).shared(); + scope + .concatenate([ + shared.unary(Pipeline, "read shared 1", |_, _| { + let mut container = Default::default(); + move |input, output| { + input.for_each(|time, data| { + data.swap(&mut container); + output.session(&time).give(container.as_ptr() as usize); + }); + } + }), + shared.unary(Pipeline, "read shared 2", |_, _| { + let mut container = Default::default(); + move |input, output| { + input.for_each(|time, data| { + data.swap(&mut container); + output.session(&time).give(container.as_ptr() as usize); + }); + } + }), + ]) + .capture() + }); + let output = &mut output.extract()[0].1; + output.sort(); + output.dedup(); + assert_eq!(output.len(), 1); + } +}