diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 1cde81540..b8bdd5f91 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -30,6 +30,7 @@ timely_bytes = { path = "../bytes", version = "0.11" } timely_logging = { path = "../logging", version = "0.11" } timely_communication = { path = "../communication", version = "0.11" } crossbeam-channel = "0.5.0" +futures-util = "0.3" [dev-dependencies] timely_sort="0.1.6" diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 24bb466e3..9c0e959b1 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -371,10 +371,14 @@ impl CapabilitySet { /// Downgrades the set of capabilities to correspond with the times in `frontier`. /// /// This method panics if any element of `frontier` is not greater or equal to some element of `self.elements`. - pub fn downgrade(&mut self, frontier: &[T]) { + pub fn downgrade(&mut self, frontier: F) + where + B: std::borrow::Borrow, + F: IntoIterator, + { let count = self.elements.len(); - for time in frontier.iter() { - let capability = self.delayed(time); + for time in frontier.into_iter() { + let capability = self.delayed(time.borrow()); self.elements.push(capability); } self.elements.drain(..count); diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 2c4bbc99e..7b418df46 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -22,7 +22,7 @@ pub use self::delay::Delay; pub use self::exchange::Exchange; pub use self::broadcast::Broadcast; pub use self::probe::Probe; -pub use self::to_stream::ToStream; +pub use self::to_stream::{ToStream, ToStreamAsync, Event}; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; pub use self::ok_err::OkErr; diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index acb9eddc8..cfb20e70f 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -1,11 +1,15 @@ //! Conversion to the `Stream` type from iterators. -use crate::progress::Timestamp; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; -use crate::Data; use crate::dataflow::channels::Message; use crate::dataflow::operators::generic::operator::source; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::operators::CapabilitySet; +use crate::dataflow::{Scope, Stream}; +use crate::progress::Timestamp; +use crate::Data; /// Converts to a timely `Stream`. pub trait ToStream { @@ -56,3 +60,82 @@ impl ToStream for I where I:: }) } } + +/// Data and progress events of the native stream. +pub enum Event { + /// Indicates that timestamps have advanced to frontier F + Progress(F), + /// Indicates that event D happened at time T + Message(F::Item, D), +} + +/// Converts to a timely `Stream`. +pub trait ToStreamAsync { + /// Converts a [native `Stream`](futures_util::stream::Stream) of [`Event`s](Event) into a [timely + /// `Stream`](crate::dataflow::Stream). + /// + /// # Examples + /// + /// ``` + /// use futures_util::stream; + /// + /// use timely::dataflow::operators::{Capture, Event, ToStream, ToStreamAsync}; + /// use timely::dataflow::operators::capture::Extract; + /// + /// let native_stream = stream::iter(vec![ + /// Event::Message(0, 0), + /// Event::Message(0, 1), + /// Event::Message(0, 2), + /// Event::Progress(Some(0)), + /// ]); + /// + /// let native_stream = Box::pin(native_stream); + /// + /// let (data1, data2) = timely::example(|scope| { + /// let data1 = native_stream.to_stream(scope).capture(); + /// let data2 = vec![0,1,2].to_stream(scope).capture(); + /// + /// (data1, data2) + /// }); + /// + /// assert_eq!(data1.extract(), data2.extract()); + /// ``` + fn to_stream>(self: Pin>, scope: &S) -> Stream; +} + +impl ToStreamAsync for I +where + D: Data, + T: Timestamp, + F: IntoIterator, + I: futures_util::stream::Stream> + ?Sized + 'static, +{ + fn to_stream>(mut self: Pin>, scope: &S) -> Stream { + source(scope, "ToStreamAsync", move |capability, info| { + let activator = Arc::new(scope.sync_activator_for(&info.address[..])); + + let mut cap_set = CapabilitySet::from_elem(capability); + + move |output| { + let waker = futures_util::task::waker_ref(&activator); + let mut context = Context::from_waker(&waker); + + // Consume all the ready items of the source_stream and issue them to the operator + while let Poll::Ready(item) = self.as_mut().poll_next(&mut context) { + match item { + Some(Event::Progress(time)) => { + cap_set.downgrade(time); + } + Some(Event::Message(time, data)) => { + output.session(&cap_set.delayed(&time)).give(data); + } + None => { + cap_set.downgrade(&[]); + break; + } + } + } + } + }) + } +} diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index 68631bf61..844c03091 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -1,12 +1,14 @@ //! Parking and unparking timely fibers. use std::rc::Rc; +use std::sync::Arc; use std::cell::RefCell; use std::thread::Thread; use std::collections::BinaryHeap; use std::time::{Duration, Instant}; use std::cmp::Reverse; use crossbeam_channel::{Sender, Receiver}; +use futures_util::task::ArcWake; /// Methods required to act as a timely scheduler. /// @@ -268,6 +270,12 @@ impl SyncActivator { } } +impl ArcWake for SyncActivator { + fn wake_by_ref(arc_self: &Arc) { + arc_self.activate().unwrap(); + } +} + /// The error returned when activation fails across thread boundaries because /// the receiving end has hung up. #[derive(Debug)]