diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 0f07a4fb8..2c4bbc99e 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -26,6 +26,7 @@ pub use self::to_stream::ToStream; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; pub use self::ok_err::OkErr; +pub use self::result::ResultStream; pub use self::generic::Operator; pub use self::generic::{Notificator, FrontierNotificator}; @@ -51,6 +52,7 @@ pub mod to_stream; pub mod capture; pub mod branch; pub mod ok_err; +pub mod result; pub mod aggregation; pub mod generic; diff --git a/timely/src/dataflow/operators/result.rs b/timely/src/dataflow/operators/result.rs new file mode 100644 index 000000000..3607f9f8e --- /dev/null +++ b/timely/src/dataflow/operators/result.rs @@ -0,0 +1,187 @@ +//! Extension methods for `Stream` containing `Result`s. + +use crate::Data; +use crate::dataflow::operators::Map; +use crate::dataflow::{Scope, Stream}; + +/// Extension trait for `Stream`. +pub trait ResultStream { + /// Returns a new instance of `self` containing only `ok` records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(0), Err(())].to_stream(scope) + /// .ok() + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn ok(&self) -> Stream; + + /// Returns a new instance of `self` containing only `err` records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(0), Err(())].to_stream(scope) + /// .err() + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn err(&self) -> Stream; + + /// Returns a new instance of `self` applying `logic` on all `Ok` records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(0), Err(())].to_stream(scope) + /// .map_ok(|x| x + 1) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn map_ok T2 + 'static>(&self, logic: L) -> Stream>; + + /// Returns a new instance of `self` applying `logic` on all `Err` records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(0), Err(())].to_stream(scope) + /// .map_err(|_| 1) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn map_err E2 + 'static>(&self, logic: L) -> Stream>; + + /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err` + /// records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(0), Err(())].to_stream(scope) + /// .and_then(|x| Ok(1 + 1)) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn and_then Result + 'static>( + &self, + logic: L, + ) -> Stream>; + + /// Returns a new instance of `self` applying `logic` on all `Ok` records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(1), Err(())].to_stream(scope) + /// .unwrap_or_else(|_| 0) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn unwrap_or_else T + 'static>(&self, logic: L) -> Stream; +} + +impl ResultStream for Stream> { + fn ok(&self) -> Stream { + self.flat_map(Result::ok) + } + + fn err(&self) -> Stream { + self.flat_map(Result::err) + } + + fn map_ok T2 + 'static>(&self, mut logic: L) -> Stream> { + self.map(move |r| r.map(|x| logic(x))) + } + + fn map_err E2 + 'static>(&self, mut logic: L) -> Stream> { + self.map(move |r| r.map_err(|x| logic(x))) + } + + fn and_then Result + 'static>(&self, mut logic: L) -> Stream> { + self.map(move |r| r.and_then(|x| logic(x))) + } + + fn unwrap_or_else T + 'static>(&self, mut logic: L) -> Stream { + self.map(move |r| r.unwrap_or_else(|err| logic(err))) + } +} + +#[cfg(test)] +mod tests { + use crate::dataflow::operators::{ToStream, ResultStream, Capture, capture::Extract}; + + #[test] + fn test_ok() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .ok() + .capture() + }); + assert_eq!(output.extract()[0].1, vec![0]); + } + + #[test] + fn test_err() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .err() + .capture() + }); + assert_eq!(output.extract()[0].1, vec![()]); + } + + #[test] + fn test_map_ok() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .map_ok(|_| 10) + .capture() + }); + assert_eq!(output.extract()[0].1, vec![Ok(10), Err(())]); + } + + #[test] + fn test_map_err() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .map_err(|_| 10) + .capture() + }); + assert_eq!(output.extract()[0].1, vec![Ok(0), Err(10)]); + } + + #[test] + fn test_and_then() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .and_then(|_| Ok(1)) + .capture() + }); + assert_eq!(output.extract()[0].1, vec![Ok(1), Err(())]); + } + + #[test] + fn test_unwrap_or_else() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .unwrap_or_else(|_| 10) + .capture() + }); + assert_eq!(output.extract()[0].1, vec![0, 10]); + } +}