diff --git a/container/src/flatcontainer.rs b/container/src/flatcontainer.rs index 53f13ed39..026eef5b9 100644 --- a/container/src/flatcontainer.rs +++ b/container/src/flatcontainer.rs @@ -47,4 +47,4 @@ impl> PushInto> for T { fn push_into(self, target: &mut FlatStack) { target.copy(self); } -} \ No newline at end of file +} diff --git a/container/src/lib.rs b/container/src/lib.rs index 61a4b32b9..dc44f8483 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -2,6 +2,8 @@ #![forbid(missing_docs)] +use std::collections::VecDeque; + pub mod columnation; pub mod flatcontainer; @@ -63,12 +65,6 @@ pub trait PushInto { /// A type that has the necessary infrastructure to push elements, without specifying how pushing /// itself works. For this, pushable types should implement [`PushInto`]. -// TODO: Reconsider this interface because it assumes -// * Containers have a capacity -// * Push presents single elements. -// * Instead of testing `len == cap`, we could have a `is_full` to test that we might -// not be able to absorb more data. -// * Example: A FlatStack with optimized offsets and deduplication can absorb many elements without reallocation. What does capacity mean in this context? pub trait PushContainer: Container { /// Push `item` into self #[inline] @@ -83,6 +79,112 @@ pub trait PushContainer: Container { fn reserve(&mut self, additional: usize); } +/// A type that can build containers from items. +/// +/// An implementation needs to absorb elements, and later reveal equivalent information +/// chunked into individual containers, but is free to change the data representation to +/// better fit the properties of the container. +/// +/// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns +/// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`] +/// needs to produce all outputs, even partial ones. +/// +/// For example, a consolidating builder can aggregate differences in-place, but it has +/// to ensure that it preserves the intended information. +/// +/// The trait does not prescribe any specific ordering guarantees, and each implementation can +/// decide to represent a `push`/`push_container` order for `extract` and `finish`, or not. +// TODO: Consider adding `push_iterator` to receive an iterator of data. +pub trait ContainerBuilder: Default + 'static { + /// The container type we're building. + type Container: Container; + /// Add an item to a container. + fn push>(&mut self, item: T) where Self::Container: PushContainer; + /// Push a pre-built container. + fn push_container(&mut self, container: &mut Self::Container); + /// Extract assembled containers, potentially leaving unfinished data behind. + fn extract(&mut self) -> Option<&mut Self::Container>; + /// Extract assembled containers and any unfinished data. + fn finish(&mut self) -> Option<&mut Self::Container>; +} + +/// A default container builder that uses length and preferred capacity to chunk data. +/// +/// Maintains FIFO order. +#[derive(Default, Debug)] +pub struct CapacityContainerBuilder{ + /// Container that we're writing to. + current: C, + /// Emtpy allocation. + empty: Option, + /// Completed containers pending to be sent. + pending: VecDeque, +} + +impl ContainerBuilder for CapacityContainerBuilder { + type Container = C; + + #[inline] + fn push>(&mut self, item: T) where C: PushContainer { + if self.current.capacity() == 0 { + self.current = self.empty.take().unwrap_or_default(); + // Discard any non-uniform capacity container. + if self.current.capacity() != C::preferred_capacity() { + self.current = C::default(); + } + // Protect against non-emptied containers. + self.current.clear(); + } + // Ensure capacity + if self.current.capacity() < C::preferred_capacity() { + self.current.reserve(C::preferred_capacity() - self.current.len()); + } + + // Push item + self.current.push(item); + + // Maybe flush + if self.current.len() == self.current.capacity() { + self.pending.push_back(std::mem::take(&mut self.current)); + } + } + + #[inline] + fn push_container(&mut self, container: &mut Self::Container) { + if !container.is_empty() { + // Flush to maintain FIFO ordering. + if self.current.len() > 0 { + self.pending.push_back(std::mem::take(&mut self.current)); + } + + let mut empty = self.empty.take().unwrap_or_default(); + // Ideally, we'd discard non-uniformly sized containers, but we don't have + // access to `len`/`capacity` of the container. + empty.clear(); + + self.pending.push_back(std::mem::replace(container, empty)); + } + } + + #[inline] + fn extract(&mut self) -> Option<&mut C> { + if let Some(container) = self.pending.pop_front() { + self.empty = Some(container); + self.empty.as_mut() + } else { + None + } + } + + #[inline] + fn finish(&mut self) -> Option<&mut C> { + if self.current.len() > 0 { + self.pending.push_back(std::mem::take(&mut self.current)); + } + self.extract() + } +} + impl Container for Vec { type ItemRef<'a> = &'a T where T: 'a; type Item<'a> = T where T: 'a; diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index 87f815556..839fc0b8f 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -209,7 +209,7 @@ fn main() { let mut session = output.session(&time); if let Some(list) = stash.remove(time.time()) { for mut vector in list.into_iter() { - session.give_vec(&mut vector); + session.give_container(&mut vector); } } }); @@ -261,7 +261,7 @@ fn main() { if frontiers.iter().all(|f| !f.less_equal(time.time())) { let mut session = output.session(&time); for mut vector in list.drain(..) { - session.give_vec(&mut vector); + session.give_container(&mut vector); } } } diff --git a/timely/examples/barrier.rs b/timely/examples/barrier.rs index f1d4c2a48..2f1c99b32 100644 --- a/timely/examples/barrier.rs +++ b/timely/examples/barrier.rs @@ -3,6 +3,7 @@ extern crate timely; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::{Feedback, ConnectLoop}; use timely::dataflow::operators::generic::operator::Operator; +use timely::container::CapacityContainerBuilder; fn main() { @@ -12,7 +13,7 @@ fn main() { worker.dataflow(move |scope| { let (handle, stream) = scope.feedback::>(1); - stream.unary_notify( + stream.unary_notify::, _, _>( Pipeline, "Barrier", vec![0], diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 5262d36c9..507b18d23 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -2,194 +2,224 @@ //! with the performance of batched sends. use crate::communication::Push; -use crate::container::{PushContainer, PushInto}; +use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushContainer, PushInto}; use crate::dataflow::channels::{Bundle, Message}; use crate::dataflow::operators::Capability; use crate::progress::Timestamp; -use crate::{Container, Data}; +use crate::Container; /// Buffers data sent at the same time, for efficient communication. /// /// The `Buffer` type should be used by calling `session` with a time, which checks whether /// data must be flushed and creates a `Session` object which allows sending at the given time. #[derive(Debug)] -pub struct Buffer>> { - /// the currently open time, if it is open +pub struct Buffer { + /// The currently open time, if it is open. time: Option, - /// a buffer for records, to send at self.time - buffer: C, + /// A builder for containers, to send at `self.time`. + builder: CB, + /// The pusher to send data downstream. pusher: P, } -impl>> Buffer where T: Eq+Clone { - +impl Buffer { /// Creates a new `Buffer`. pub fn new(pusher: P) -> Self { Self { time: None, - buffer: Default::default(), + builder: Default::default(), pusher, } } + /// Returns a reference to the inner `P: Push` type. + /// + /// This is currently used internally, and should not be used without some care. + pub fn inner(&mut self) -> &mut P { &mut self.pusher } + + /// Access the builder. Immutable access to prevent races with flushing + /// the underlying buffer. + pub fn builder(&self) -> &CB { + &self.builder + } +} + +impl>> Buffer, P> where T: Eq+Clone { /// Returns a `Session`, which accepts data to send at the associated time - pub fn session(&mut self, time: &T) -> Session { + #[inline] + pub fn session(&mut self, time: &T) -> Session, P> { + self.session_with_builder(time) + } + + /// Allocates a new `AutoflushSession` which flushes itself on drop. + #[inline] + pub fn autoflush_session(&mut self, cap: Capability) -> AutoflushSession, P> where T: Timestamp { + self.autoflush_session_with_builder(cap) + } +} + +impl>> Buffer where T: Eq+Clone { + /// Returns a `Session`, which accepts data to send at the associated time + pub fn session_with_builder(&mut self, time: &T) -> Session { if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); } self.time = Some(time.clone()); Session { buffer: self } } + /// Allocates a new `AutoflushSession` which flushes itself on drop. - pub fn autoflush_session(&mut self, cap: Capability) -> AutoflushSessionCore where T: Timestamp { + pub fn autoflush_session_with_builder(&mut self, cap: Capability) -> AutoflushSession where T: Timestamp { if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); } self.time = Some(cap.time().clone()); - AutoflushSessionCore { + AutoflushSession { buffer: self, _capability: cap, } } +} - /// Returns a reference to the inner `P: Push` type. - /// - /// This is currently used internally, and should not be used without some care. - pub fn inner(&mut self) -> &mut P { &mut self.pusher } - +impl>> Buffer where T: Eq+Clone { /// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush. pub fn cease(&mut self) { self.flush(); self.pusher.push(&mut None); } - /// moves the contents of - fn flush(&mut self) { - if !self.buffer.is_empty() { + /// Extract pending data from the builder, but not forcing a flush. + #[inline] + fn extract(&mut self) { + while let Some(container) = self.builder.extract() { let time = self.time.as_ref().unwrap().clone(); - Message::push_at(&mut self.buffer, time, &mut self.pusher); + Message::push_at(container, time, &mut self.pusher); } } - // Gives an entire container at a specific time. - fn give_container(&mut self, vector: &mut C) { - if !vector.is_empty() { - // flush to ensure fifo-ness - self.flush(); - - let time = self.time.as_ref().expect("Buffer::give_container(): time is None.").clone(); - Message::push_at(vector, time, &mut self.pusher); + /// Flush the builder, forcing all its contents to be written. + #[inline] + fn flush(&mut self) { + while let Some(container) = self.builder.finish() { + let time = self.time.as_ref().unwrap().clone(); + Message::push_at(container, time, &mut self.pusher); } } -} -impl>> Buffer where T: Eq+Clone { - // internal method for use by `Session`. - #[inline] - fn give>(&mut self, data: D) { - if self.buffer.capacity() < C::preferred_capacity() { - let to_reserve = C::preferred_capacity() - self.buffer.capacity(); - self.buffer.reserve(to_reserve); - } - self.buffer.push(data); - if self.buffer.len() >= C::preferred_capacity() { - self.flush(); + /// Gives an entire container at the current time. + fn give_container(&mut self, container: &mut CB::Container) { + if !container.is_empty() { + self.builder.push_container(container); + self.extract(); } } } -impl>>> Buffer, P> where T: Eq+Clone { - // Gives an entire message at a specific time. - fn give_vec(&mut self, vector: &mut Vec) { - // flush to ensure fifo-ness - self.flush(); - - let time = self.time.as_ref().expect("Buffer::give_vec(): time is None.").clone(); - Message::push_at(vector, time, &mut self.pusher); +impl>> Buffer +where + T: Eq+Clone, + CB::Container: PushContainer, +{ + // Push a single item into the builder. Internal method for use by `Session`. + #[inline] + fn give>(&mut self, data: D) { + self.builder.push(data); + self.extract(); } } - /// An output session for sending records at a specified time. /// /// The `Session` struct provides the user-facing interface to an operator output, namely /// the `Buffer` type. A `Session` wraps a session of output at a specified time, and /// avoids what would otherwise be a constant cost of checking timestamp equality. -pub struct Session<'a, T, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { - buffer: &'a mut Buffer, +pub struct Session<'a, T, CB, P> { + buffer: &'a mut Buffer, } -impl<'a, T, C: Container, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { +impl<'a, T, CB, P> Session<'a, T, CB, P> +where + T: Eq + Clone + 'a, + CB: ContainerBuilder + 'a, + P: Push> + 'a +{ /// Provide a container at the time specified by the [Session]. - pub fn give_container(&mut self, container: &mut C) { + pub fn give_container(&mut self, container: &mut CB::Container) { self.buffer.give_container(container) } + + /// Access the builder. Immutable access to prevent races with flushing + /// the underlying buffer. + pub fn builder(&self) -> &CB { + self.buffer.builder() + } } -impl<'a, T, C, P: Push>+'a> Session<'a, T, C, P> +impl<'a, T, CB, P: Push>+'a> Session<'a, T, CB, P> where - T: Eq+Clone+'a, - C: 'a + PushContainer, + T: Eq + Clone + 'a, + CB: ContainerBuilder + 'a, + CB::Container: PushContainer, { /// Provides one record at the time specified by the `Session`. #[inline] - pub fn give>(&mut self, data: D) { + pub fn give>(&mut self, data: D) { self.buffer.give(data); } + /// Provides an iterator of records at the time specified by the `Session`. #[inline] - pub fn give_iterator, D: PushInto>(&mut self, iter: I) { + pub fn give_iterator(&mut self, iter: I) + where + I: Iterator, + D: PushInto, + { for item in iter { self.give(item); } } } -impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { - /// Provides a fully formed `Content` message for senders which can use this type. - /// - /// The `Content` type is the backing memory for communication in timely, and it can - /// often be more efficient to reuse this memory rather than have timely allocate - /// new backing memory. - #[inline] - pub fn give_vec(&mut self, message: &mut Vec) { - if !message.is_empty() { - self.buffer.give_vec(message); - } - } -} - /// A session which will flush itself when dropped. -pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push>+'a> where - T: Eq+Clone+'a, C: 'a { +pub struct AutoflushSession<'a, T, CB, P> +where + T: Timestamp + 'a, + CB: ContainerBuilder + 'a, + P: Push> + 'a, +{ /// A reference to the underlying buffer. - buffer: &'a mut Buffer, + buffer: &'a mut Buffer, /// The capability being used to send the data. _capability: Capability, } -/// Auto-flush session specialized to vector-based containers. -pub type AutoflushSession<'a, T, D, P> = AutoflushSessionCore<'a, T, Vec, P>; - -impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSessionCore<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { +impl<'a, T, CB, P> AutoflushSession<'a, T, CB, P> +where + T: Timestamp + 'a, + CB: ContainerBuilder + 'a, + P: Push> + 'a, +{ /// Transmits a single record. #[inline] - pub fn give(&mut self, data: D) { + pub fn give>(&mut self, data: D) where CB::Container: PushContainer { self.buffer.give(data); } /// Transmits records produced by an iterator. #[inline] - pub fn give_iterator>(&mut self, iter: I) { + pub fn give_iterator(&mut self, iter: I) + where + I: Iterator, + D: PushInto, + CB::Container: PushContainer, + { for item in iter { self.give(item); } } - /// Transmits a pre-packed batch of data. - #[inline] - pub fn give_content(&mut self, message: &mut Vec) { - if !message.is_empty() { - self.buffer.give_vec(message); - } - } } -impl<'a, T: Timestamp, C: Container, P: Push>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { +impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P> +where + T: Timestamp + 'a, + CB: ContainerBuilder + 'a, + P: Push> + 'a, +{ fn drop(&mut self) { self.buffer.cease(); } diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 0481bbb3c..c7d579d63 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -1,17 +1,16 @@ //! Create cycles in a timely dataflow graph. use crate::Container; - -use crate::progress::{Timestamp, PathSummary}; -use crate::progress::frontier::Antichain; -use crate::order::Product; - -use crate::dataflow::channels::pushers::Tee; +use crate::container::CapacityContainerBuilder; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{StreamCore, Scope}; -use crate::dataflow::scopes::child::Iterative; -use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; +use crate::dataflow::channels::pushers::Tee; use crate::dataflow::operators::generic::OutputWrapper; +use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; +use crate::dataflow::scopes::child::Iterative; +use crate::dataflow::{StreamCore, Scope}; +use crate::order::Product; +use crate::progress::frontier::Antichain; +use crate::progress::{Timestamp, PathSummary}; /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. pub trait Feedback { @@ -137,5 +136,5 @@ impl ConnectLoop for StreamCore { pub struct Handle { builder: OperatorBuilder, summary: ::Summary, - output: OutputWrapper>, + output: OutputWrapper, Tee>, } diff --git a/timely/src/dataflow/operators/core/filter.rs b/timely/src/dataflow/operators/core/filter.rs index c6585162a..ca1ff3385 100644 --- a/timely/src/dataflow/operators/core/filter.rs +++ b/timely/src/dataflow/operators/core/filter.rs @@ -1,6 +1,5 @@ //! Filters a stream by a predicate. -use timely_container::{Container, PushContainer, PushInto}; - +use crate::container::{Container, PushContainer, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::operators::generic::operator::Operator; diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 2b8ed58dc..f929d045e 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use std::cell::RefCell; -use timely_container::{PushContainer, PushInto}; +use crate::container::{PushContainer, PushInto}; use crate::scheduling::{Schedule, Activator}; diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 2b52826dc..86ac39ec9 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -1,7 +1,6 @@ //! Extension methods for `StreamCore` based on record-by-record transformation. -use timely_container::{Container, PushContainer, PushInto}; - +use crate::container::{Container, PushContainer, PushInto}; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index c49b0513f..2a046f4af 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -1,7 +1,6 @@ //! Operators that separate one stream into two streams based on some condition -use timely_container::{Container, PushContainer, PushInto}; - +use crate::container::{Container, PushContainer, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::{Scope, StreamCore}; diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 61c42daa5..c513cc922 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -3,6 +3,7 @@ use std::rc::Rc; use std::cell::RefCell; use crate::Container; +use crate::container::{ContainerBuilder, CapacityContainerBuilder}; use crate::scheduling::{Schedule, ActivateOnDrop}; @@ -12,7 +13,7 @@ use crate::progress::Source; use crate::progress::ChangeBatch; use crate::dataflow::channels::pushers::{Counter, Tee}; -use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSessionCore}; +use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession}; use crate::dataflow::operators::{ActivateCapability, Capability}; @@ -56,7 +57,9 @@ pub trait UnorderedInput { /// // create and capture the unordered input. /// let (mut input, mut cap) = worker.dataflow::(|scope| { /// let (input, stream) = scope.new_unordered_input(); - /// stream.capture_into(send); + /// stream + /// .container::>() + /// .capture_into(send); /// input /// }); /// @@ -73,13 +76,13 @@ pub trait UnorderedInput { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore); + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore); } impl UnorderedInput for G { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore) { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore) { - let (output, registrar) = Tee::::new(); + let (output, registrar) = Tee::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); // let produced = Rc::new(RefCell::new(ChangeBatch::new())); let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); @@ -145,19 +148,28 @@ impl Operate for UnorderedOperator { /// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct UnorderedHandle { - buffer: PushBuffer>>, +pub struct UnorderedHandle { + buffer: PushBuffer>>, } -impl UnorderedHandle { - fn new(pusher: Counter>) -> UnorderedHandle { +impl UnorderedHandle { + fn new(pusher: Counter>) -> UnorderedHandle { UnorderedHandle { buffer: PushBuffer::new(pusher), } } /// Allocates a new automatically flushing session based on the supplied capability. - pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { - ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) + #[inline] + pub fn session_with_builder(&mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { + ActivateOnDrop::new(self.buffer.autoflush_session_with_builder(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) + } +} + +impl UnorderedHandle> { + /// Allocates a new automatically flushing session based on the supplied capability. + #[inline] + pub fn session(&mut self, cap: ActivateCapability) -> ActivateOnDrop, Counter>>> { + self.session_with_builder(cap) } } diff --git a/timely/src/dataflow/operators/delay.rs b/timely/src/dataflow/operators/delay.rs index ddda2faaf..a3cc4c354 100644 --- a/timely/src/dataflow/operators/delay.rs +++ b/timely/src/dataflow/operators/delay.rs @@ -140,7 +140,7 @@ impl Delay for Stream { notificator.for_each(|time,_,_| { if let Some(mut datas) = elements.remove(&time) { for mut data in datas.drain(..) { - output.session(&time).give_vec(&mut data); + output.session(&time).give_container(&mut data); } } }); diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/filter.rs index 7034a9df0..f1720eca2 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/filter.rs @@ -30,7 +30,7 @@ impl Filter for Stream { data.swap(&mut vector); vector.retain(|x| predicate(x)); if !vector.is_empty() { - output.session(&time).give_vec(&mut vector); + output.session(&time).give_container(&mut vector); } }); }) diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 3d2ebbe29..c266b0051 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -9,6 +9,7 @@ use crate::progress::operate::SharedProgress; use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::Container; +use crate::container::ContainerBuilder; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pushers::Tee; use crate::dataflow::channels::pushers::Counter as PushCounter; @@ -92,7 +93,7 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { + pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; self.new_output_connection(connection) } @@ -105,7 +106,13 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { + pub fn new_output_connection( + &mut self, + connection: Vec::Summary>> + ) -> ( + OutputWrapper>, + StreamCore + ) { let (tee, stream) = self.builder.new_output_connection(connection.clone()); @@ -218,6 +225,7 @@ impl OperatorBuilder { #[cfg(test)] mod tests { + use crate::container::CapacityContainerBuilder; #[test] #[should_panic] @@ -233,8 +241,8 @@ mod tests { let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone()); // let mut input = builder.new_input(stream, Pipeline); - let (mut output1, _stream1) = builder.new_output::>(); - let (mut output2, _stream2) = builder.new_output::>(); + let (mut output1, _stream1) = builder.new_output::>>(); + let (mut output2, _stream2) = builder.new_output::>>(); builder.build(move |capabilities| { move |_frontiers| { @@ -263,8 +271,8 @@ mod tests { let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone()); // let mut input = builder.new_input(stream, Pipeline); - let (mut output1, _stream1) = builder.new_output::>(); - let (mut output2, _stream2) = builder.new_output::>(); + let (mut output1, _stream1) = builder.new_output::>>(); + let (mut output2, _stream2) = builder.new_output::>>(); builder.build(move |mut capabilities| { move |_frontiers| { diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index ee4059bb4..7e63434f3 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -16,6 +16,7 @@ use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; use crate::dataflow::channels::Bundle; use crate::communication::{Push, Pull, message::RefOrMut}; use crate::Container; +use crate::container::{ContainerBuilder, CapacityContainerBuilder}; use crate::logging::TimelyLogger as Logger; use crate::dataflow::operators::InputCapability; @@ -81,7 +82,7 @@ impl<'a, T: Timestamp, C: Container, P: Pull>> InputHandleCore>+'a> FrontieredInputHa /// (0..10).to_stream(scope) /// .unary(Pipeline, "example", |_cap,_info| |input, output| { /// input.for_each(|cap, data| { - /// output.session(&cap).give_vec(&mut data.replace(Vec::new())); + /// output.session(&cap).give_container(&mut data.replace(Vec::new())); /// }); /// }); /// }); @@ -172,14 +173,14 @@ pub fn new_input_handle>>( /// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the /// pusher is flushed (via the `cease` method) once it is no longer used. #[derive(Debug)] -pub struct OutputWrapper>> { - push_buffer: Buffer>, +pub struct OutputWrapper>> { + push_buffer: Buffer>, internal_buffer: Rc>>, } -impl>> OutputWrapper { +impl>> OutputWrapper { /// Creates a new output wrapper from a push buffer. - pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>) -> Self { + pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>) -> Self { OutputWrapper { push_buffer, internal_buffer, @@ -189,7 +190,7 @@ impl>> OutputWrapper { /// /// This method ensures that the only access to the push buffer is through the `OutputHandle` /// type which ensures the use of capabilities, and which calls `cease` when it is dropped. - pub fn activate(&mut self) -> OutputHandleCore { + pub fn activate(&mut self) -> OutputHandleCore { OutputHandleCore { push_buffer: &mut self.push_buffer, internal_buffer: &self.internal_buffer, @@ -197,17 +198,46 @@ impl>> OutputWrapper { } } - /// Handle to an operator's output stream. -pub struct OutputHandleCore<'a, T: Timestamp, C: Container+'a, P: Push>+'a> { - push_buffer: &'a mut Buffer>, +pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push>+'a> { + push_buffer: &'a mut Buffer>, internal_buffer: &'a Rc>>, } /// Handle specialized to `Vec`-based container. -pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, Vec, P>; +pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, CapacityContainerBuilder>, P>; + +impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> OutputHandleCore<'a, T, CB, P> { + /// Obtains a session that can send data at the timestamp associated with capability `cap`. + /// + /// In order to send data at a future timestamp, obtain a capability for the new timestamp + /// first, as show in the example. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::ToStream; + /// use timely::dataflow::operators::generic::Operator; + /// use timely::dataflow::channels::pact::Pipeline; + /// use timely::container::CapacityContainerBuilder; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .unary::, _, _, _>(Pipeline, "example", |_cap, _info| |input, output| { + /// input.for_each(|cap, data| { + /// let time = cap.time().clone() + 1; + /// output.session_with_builder(&cap.delayed(&time)) + /// .give_container(&mut data.replace(Vec::new())); + /// }); + /// }); + /// }); + /// ``` + pub fn session_with_builder<'b, CT: CapabilityTrait>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter> where 'a: 'b { + assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability"); + self.push_buffer.session_with_builder(cap.time()) + } +} -impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, C, P> { +impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, CapacityContainerBuilder, P> { /// Obtains a session that can send data at the timestamp associated with capability `cap`. /// /// In order to send data at a future timestamp, obtain a capability for the new timestamp @@ -225,14 +255,14 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, /// input.for_each(|cap, data| { /// let time = cap.time().clone() + 1; /// output.session(&cap.delayed(&time)) - /// .give_vec(&mut data.replace(Vec::new())); + /// .give_container(&mut data.replace(Vec::new())); /// }); /// }); /// }); /// ``` - pub fn session<'b, CT: CapabilityTrait>(&'b mut self, cap: &'b CT) -> Session<'b, T, C, PushCounter> where 'a: 'b { - assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability"); - self.push_buffer.session(cap.time()) + #[inline] + pub fn session<'b, CT: CapabilityTrait>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder, PushCounter> where 'a: 'b { + self.session_with_builder(cap) } /// Flushes all pending data and indicate that no more data immediately follows. @@ -241,7 +271,7 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, } } -impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandleCore<'a, T, C, P> { +impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> Drop for OutputHandleCore<'a, T, CB, P> { fn drop(&mut self) { self.push_buffer.cease(); } diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 72a4be2f9..e585c7c5d 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -59,7 +59,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> { /// (0..10).to_stream(scope) /// .unary_notify(Pipeline, "example", Some(0), |input, output, notificator| { /// input.for_each(|cap, data| { - /// output.session(&cap).give_vec(&mut data.replace(Vec::new())); + /// output.session(&cap).give_container(&mut data.replace(Vec::new())); /// let time = cap.time().clone() + 1; /// notificator.notify_at(cap.delayed(&time)); /// }); @@ -275,7 +275,7 @@ impl FrontierNotificator { /// let mut notificator = FrontierNotificator::new(); /// move |input, output| { /// input.for_each(|cap, data| { - /// output.session(&cap).give_vec(&mut data.replace(Vec::new())); + /// output.session(&cap).give_container(&mut data.replace(Vec::new())); /// let time = cap.time().clone() + 1; /// notificator.notify_at(cap.delayed(&time)); /// }); @@ -405,7 +405,7 @@ impl FrontierNotificator { /// let mut notificator = FrontierNotificator::new(); /// move |input, output| { /// input.for_each(|cap, data| { - /// output.session(&cap).give_vec(&mut data.replace(Vec::new())); + /// output.session(&cap).give_container(&mut data.replace(Vec::new())); /// let time = cap.time().clone() + 1; /// notificator.notify_at(cap.delayed(&time)); /// assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1); diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 24ce5b376..d6dd92c69 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -13,6 +13,7 @@ use super::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OperatorInfo; use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator}; use crate::Container; +use crate::container::{ContainerBuilder, CapacityContainerBuilder}; /// Methods to construct generic streaming and blocking operators. pub trait Operator { @@ -56,12 +57,12 @@ pub trait Operator { /// }); /// } /// ``` - fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where - C2: Container, + CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, + &mut OutputHandleCore>)+'static, P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization @@ -83,7 +84,7 @@ pub trait Operator { /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| { /// input.for_each(|time, data| { /// data.swap(&mut vector); - /// output.session(&time).give_vec(&mut vector); + /// output.session(&time).give_container(&mut vector); /// notificator.notify_at(time.retain()); /// }); /// notificator.for_each(|time, _cnt, _not| { @@ -93,12 +94,12 @@ pub trait Operator { /// }); /// } /// ``` - fn unary_notify, - &mut OutputHandleCore>, + &mut OutputHandleCore>, &mut Notificator)+'static, P: ParallelizationContract> - (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; + (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -122,18 +123,18 @@ pub trait Operator { /// } /// while let Some((time, data)) = input.next() { /// data.swap(&mut vector); - /// output.session(&time).give_vec(&mut vector); + /// output.session(&time).give_container(&mut vector); /// } /// } /// }); /// }); /// ``` - fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore where - C2: Container, + CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, - &mut OutputHandleCore>)+'static, + &mut OutputHandleCore>)+'static, P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input streams by a parallelization @@ -188,14 +189,14 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where C2: Container, - C3: Container, + CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, + &mut OutputHandleCore>)+'static, P1: ParallelizationContract, P2: ParallelizationContract; @@ -220,12 +221,12 @@ pub trait Operator { /// in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| { /// input1.for_each(|time, data| { /// data.swap(&mut vector1); - /// output.session(&time).give_vec(&mut vector1); + /// output.session(&time).give_container(&mut vector1); /// notificator.notify_at(time.retain()); /// }); /// input2.for_each(|time, data| { /// data.swap(&mut vector2); - /// output.session(&time).give_vec(&mut vector2); + /// output.session(&time).give_container(&mut vector2); /// notificator.notify_at(time.retain()); /// }); /// notificator.for_each(|time, _cnt, _not| { @@ -245,14 +246,14 @@ pub trait Operator { /// }).unwrap(); /// ``` fn binary_notify, &mut InputHandleCore, - &mut OutputHandleCore>, + &mut OutputHandleCore>, &mut Notificator)+'static, P1: ParallelizationContract, P2: ParallelizationContract> - (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; + (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -278,24 +279,24 @@ pub trait Operator { /// } /// while let Some((time, data)) = input1.next() { /// data.swap(&mut vector1); - /// output.session(&time).give_vec(&mut vector1); + /// output.session(&time).give_container(&mut vector1); /// } /// while let Some((time, data)) = input2.next() { /// data.swap(&mut vector2); - /// output.session(&time).give_vec(&mut vector2); + /// output.session(&time).give_container(&mut vector2); /// } /// } /// }).inspect(|x| println!("{:?}", x)); /// }); /// ``` - fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where C2: Container, - C3: Container, + CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut InputHandleCore, - &mut OutputHandleCore>)+'static, + &mut OutputHandleCore>)+'static, P1: ParallelizationContract, P2: ParallelizationContract; @@ -330,12 +331,12 @@ pub trait Operator { impl Operator for StreamCore { - fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where - C2: Container, + CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, + &mut OutputHandleCore>)+'static, P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); @@ -358,12 +359,12 @@ impl Operator for StreamCore { stream } - fn unary_notify, - &mut OutputHandleCore>, + &mut OutputHandleCore>, &mut Notificator)+'static, P: ParallelizationContract> - (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { + (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.unary_frontier(pact, name, move |capability, _info| { let mut notificator = FrontierNotificator::new(); @@ -380,12 +381,12 @@ impl Operator for StreamCore { }) } - fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore where - C2: Container, + CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, - &mut OutputHandleCore>)+'static, + &mut OutputHandleCore>)+'static, P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); @@ -408,14 +409,14 @@ impl Operator for StreamCore { stream } - fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where C2: Container, - C3: Container, + CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, + &mut OutputHandleCore>)+'static, P1: ParallelizationContract, P2: ParallelizationContract { @@ -442,14 +443,14 @@ impl Operator for StreamCore { } fn binary_notify, &mut InputHandleCore, - &mut OutputHandleCore>, + &mut OutputHandleCore>, &mut Notificator)+'static, P1: ParallelizationContract, P2: ParallelizationContract> - (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { + (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.binary_frontier(other, pact1, pact2, name, |capability, _info| { let mut notificator = FrontierNotificator::new(); @@ -468,14 +469,14 @@ impl Operator for StreamCore { } - fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where C2: Container, - C3: Container, + CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut InputHandleCore, - &mut OutputHandleCore>)+'static, + &mut OutputHandleCore>)+'static, P1: ParallelizationContract, P2: ParallelizationContract { @@ -559,11 +560,11 @@ impl Operator for StreamCore { /// .inspect(|x| println!("number: {:?}", x)); /// }); /// ``` -pub fn source(scope: &G, name: &str, constructor: B) -> StreamCore +pub fn source(scope: &G, name: &str, constructor: B) -> StreamCore where - C: Container, + CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut OutputHandleCore>)+'static { + L: FnMut(&mut OutputHandleCore>)+'static { let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); let operator_info = builder.operator_info(); @@ -604,7 +605,7 @@ where /// }); /// ``` pub fn empty(scope: &G) -> StreamCore { - source(scope, "Empty", |_capability, _info| |_output| { + source::<_, CapacityContainerBuilder, _, _>(scope, "Empty", |_capability, _info| |_output| { // drop capability, do nothing }) } diff --git a/timely/src/dataflow/operators/map.rs b/timely/src/dataflow/operators/map.rs index 9786d4ec5..ae1c43c5d 100644 --- a/timely/src/dataflow/operators/map.rs +++ b/timely/src/dataflow/operators/map.rs @@ -58,7 +58,7 @@ impl Map for Stream { input.for_each(|time, data| { data.swap(&mut vector); for datum in vector.iter_mut() { logic(datum); } - output.session(&time).give_vec(&mut vector); + output.session(&time).give_container(&mut vector); }) }) } diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index f35d6ff06..76fb4329d 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -2,6 +2,7 @@ use crate::Data; +use crate::container::CapacityContainerBuilder; use crate::dataflow::operators::{ActivateCapability}; use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore}; use crate::dataflow::{Stream, Scope}; @@ -72,4 +73,4 @@ impl UnorderedInput for G { } /// An unordered handle specialized to vectors. -pub type UnorderedHandle = UnorderedHandleCore>; +pub type UnorderedHandle = UnorderedHandleCore>>; diff --git a/timely/tests/barrier.rs b/timely/tests/barrier.rs index 200a37a78..6180e26bb 100644 --- a/timely/tests/barrier.rs +++ b/timely/tests/barrier.rs @@ -4,6 +4,7 @@ use timely::{Config, CommunicationConfig, WorkerConfig}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::{Feedback, ConnectLoop}; use timely::dataflow::operators::generic::operator::Operator; +use timely::container::CapacityContainerBuilder; #[test] fn barrier_sync_1w() { barrier_sync_helper(CommunicationConfig::Thread); } #[test] fn barrier_sync_2w() { barrier_sync_helper(CommunicationConfig::Process(2)); } @@ -18,7 +19,7 @@ fn barrier_sync_helper(comm_config: ::timely::CommunicationConfig) { timely::execute(config, move |worker| { worker.dataflow(move |scope| { let (handle, stream) = scope.feedback::>(1); - stream.unary_notify( + stream.unary_notify::, _, _>( Pipeline, "Barrier", vec![0, 1],