From 20ac38cc95d655607f9301ad1e9aa22195c5b40e Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 18 Sep 2025 16:19:21 -0400 Subject: [PATCH 1/5] Pivot container builders above capability checking --- timely/src/dataflow/channels/pushers/mod.rs | 6 + .../src/dataflow/channels/pushers/progress.rs | 67 +++++++ timely/src/dataflow/operators/branch.rs | 15 +- timely/src/dataflow/operators/capability.rs | 3 +- timely/src/dataflow/operators/core/concat.rs | 4 +- .../src/dataflow/operators/core/feedback.rs | 11 +- timely/src/dataflow/operators/core/ok_err.rs | 8 +- .../src/dataflow/operators/core/partition.rs | 50 +++-- .../dataflow/operators/generic/builder_rc.rs | 36 ++-- .../src/dataflow/operators/generic/handles.rs | 186 +++++++++--------- timely/src/dataflow/operators/generic/mod.rs | 2 +- .../dataflow/operators/generic/operator.rs | 44 +++-- timely/src/dataflow/operators/mod.rs | 2 +- timely/tests/shape_scaling.rs | 10 +- 14 files changed, 256 insertions(+), 188 deletions(-) create mode 100644 timely/src/dataflow/channels/pushers/progress.rs diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 295d033ca..e1c49e59f 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -6,3 +6,9 @@ pub mod tee; pub mod exchange; pub mod counter; pub mod buffer; +pub mod progress; + +/// An output pusher which validates capabilities, records progress, and tees output. +pub type Output = progress::Progress>>; +/// An output session that will flush the output when dropped. +pub type OutputSession<'a, T, C> = progress::ProgressSession<'a, T, C, counter::Counter>>; diff --git a/timely/src/dataflow/channels/pushers/progress.rs b/timely/src/dataflow/channels/pushers/progress.rs new file mode 100644 index 000000000..618aedd04 --- /dev/null +++ b/timely/src/dataflow/channels/pushers/progress.rs @@ -0,0 +1,67 @@ +//! A wrapper that allows containers to be sent by validating capabilities. + +use std::rc::Rc; +use std::cell::RefCell; + +use crate::progress::{ChangeBatch, Timestamp}; +use crate::dataflow::channels::Message; +use crate::dataflow::operators::CapabilityTrait; +use crate::communication::Push; +use crate::Container; + +/// A wrapper that allows containers to be sent by validating capabilities. +#[derive(Debug)] +pub struct Progress { + pushee: P, + internal: Rc>>, + port: usize, +} + +impl Progress { + /// Ships a container using a provided capability. + /// + /// On return, the container may hold undefined contents and should be cleared before it is reused. + #[inline] pub fn give>(&mut self, capability: &CT, container: &mut C) where P: Push> { + debug_assert!(self.valid(capability), "Attempted to open output session with invalid capability"); + if !container.is_empty() { Message::push_at(container, capability.time().clone(), &mut self.pushee); } + } + /// Activates a `Progress` into a `ProgressSession` which will flush when dropped. + pub fn activate<'a, C>(&'a mut self) -> ProgressSession<'a, T, C, P> where P: Push> { + ProgressSession { + borrow: self, + marker: std::marker::PhantomData, + } + } + /// Determines if the capability is valid for this output. + pub fn valid>(&self, capability: &CT) -> bool { + capability.valid_for_output(&self.internal, self.port) + } +} + +impl Progress where T : Ord+Clone+'static { + /// Allocates a new `Progress` from a pushee and capability validation information. + pub fn new(pushee: P, internal: Rc>>, port: usize) -> Self { + Self { pushee, internal, port } + } +} + +/// A session that provides access to a `Progress` but will flush when dropped. +/// +/// The type of the container `C` must be known, as long as the flushing action requires a specific `Push` implementation. +pub struct ProgressSession<'a, T: Timestamp, C, P: Push>> { + borrow: &'a mut Progress, + marker: std::marker::PhantomData, +} + +impl<'a, T: Timestamp, C, P: Push>> std::ops::Deref for ProgressSession<'a, T, C, P> { + type Target = Progress; + fn deref(&self) -> &Self::Target { self.borrow } +} + +impl<'a, T: Timestamp, C, P: Push>> std::ops::DerefMut for ProgressSession<'a, T, C, P> { + fn deref_mut(&mut self) -> &mut Self::Target { self.borrow } +} + +impl<'a, T: Timestamp, C, P: Push>> Drop for ProgressSession<'a, T, C, P> { + fn drop(&mut self) { self.borrow.pushee.push(&mut None); } +} diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index 42752ad93..2ee9919c1 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -1,6 +1,7 @@ //! Operators that separate one stream into two streams based on some condition use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::operators::generic::OutputBuilder; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::{Scope, Stream, StreamCore}; use crate::{Container, Data}; @@ -43,8 +44,11 @@ impl Branch for Stream { builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); - let (mut output1, stream1) = builder.new_output(); - let (mut output2, stream2) = builder.new_output(); + let (output1, stream1) = builder.new_output(); + let (output2, stream2) = builder.new_output(); + + let mut output1 = OutputBuilder::from(output1); + let mut output2 = OutputBuilder::from(output2); builder.build(move |_| { move |_frontiers| { @@ -99,8 +103,11 @@ impl BranchWhen for StreamCore { builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); - let (mut output1, stream1) = builder.new_output(); - let (mut output2, stream2) = builder.new_output(); + let (output1, stream1) = builder.new_output(); + let (output2, stream2) = builder.new_output(); + + let mut output1 = OutputBuilder::from(output1); + let mut output2 = OutputBuilder::from(output2); builder.build(move |_| { diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 8a1deaa57..af81dba5d 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -37,6 +37,7 @@ use crate::dataflow::channels::pullers::counter::ConsumedGuard; pub trait CapabilityTrait { /// The timestamp associated with the capability. fn time(&self) -> &T; + /// Validates that the capability is valid for a specific internal buffer and output port. fn valid_for_output(&self, query_buffer: &Rc>>, port: usize) -> bool; } @@ -229,7 +230,7 @@ type CapabilityUpdates = Rc>>>>>; /// An capability of an input port. /// -/// Holding onto this capability will implicitly holds onto a capability for all the outputs +/// Holding onto this capability will implicitly hold on to a capability for all the outputs /// ports this input is connected to, after the connection summaries have been applied. /// /// This input capability supplies a `retain_for_output(self)` method which consumes the input diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index 431353941..84c9bc16e 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -84,8 +84,8 @@ impl Concatenate for G { move |_frontier| { let mut output = output.activate(); for handle in handles.iter_mut() { - handle.for_each_time(|time, data| { - output.session(&time).give_containers(data); + handle.for_each(|time, data| { + output.give(&time, data); }) } } diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 529b65766..f5a03cedd 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -1,10 +1,7 @@ //! Create cycles in a timely dataflow graph. use crate::Container; -use crate::container::CapacityContainerBuilder; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::channels::pushers::Tee; -use crate::dataflow::operators::generic::OutputWrapper; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::scopes::child::Iterative; use crate::dataflow::{StreamCore, Scope}; @@ -118,12 +115,10 @@ impl ConnectLoop for StreamCore { builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); - input.activate().for_each_time(|cap, data| { + input.for_each(|cap, data| { if let Some(new_time) = summary.results_in(cap.time()) { let new_cap = cap.delayed(&new_time); - output - .session(&new_cap) - .give_containers(data); + output.give(&new_cap, data); } }); }); @@ -135,5 +130,5 @@ impl ConnectLoop for StreamCore { pub struct Handle { builder: OperatorBuilder, summary: ::Summary, - output: OutputWrapper, Tee>, + output: crate::dataflow::channels::pushers::Output, } diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index 6b50a42c5..32a0d1fd2 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -4,6 +4,7 @@ use crate::Container; use crate::container::{DrainContainer, SizableContainer, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; +use crate::dataflow::operators::generic::OutputBuilder; use crate::dataflow::{Scope, StreamCore}; /// Extension trait for `Stream`. @@ -53,8 +54,11 @@ impl OkErr for StreamCore { builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); - let (mut output1, stream1) = builder.new_output(); - let (mut output2, stream2) = builder.new_output(); + let (output1, stream1) = builder.new_output(); + let (output2, stream2) = builder.new_output(); + + let mut output1 = OutputBuilder::from(output1); + let mut output2 = OutputBuilder::from(output2); builder.build(move |_| { move |_frontiers| { diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index 7602b4892..608e79ff1 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -1,9 +1,9 @@ //! Partition a stream of records into multiple streams. +use std::collections::BTreeMap; use crate::container::{DrainContainer, ContainerBuilder, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; -use crate::dataflow::operators::InputCapability; use crate::dataflow::{Scope, StreamCore}; use crate::Container; @@ -46,43 +46,41 @@ impl Partition for StreamCore(); + let (output, stream) = builder.new_output::(); outputs.push(output); streams.push(stream); } + let mut caps = BTreeMap::default(); + builder.build(move |_| { - let mut todo = vec![]; + let mut todo: BTreeMap<_,Vec<_>> = Default::default(); move |_frontiers| { let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::>(); - - // The capability associated with each session in `sessions`. - let mut sessions_cap: Option> = None; - let mut sessions = vec![]; - + let mut parts = BTreeMap::>::default(); while let Some((cap, data)) = input.next() { - todo.push((cap, std::mem::take(data))); + todo.entry(cap.time().clone()).or_default().push(std::mem::take(data)); + caps.insert(cap.time().clone(), cap); } - todo.sort_unstable_by(|a, b| a.0.cmp(&b.0)); - for (cap, mut data) in todo.drain(..) { - if sessions_cap.as_ref().map_or(true, |s_cap| s_cap.time() != cap.time()) { - sessions = handles.iter_mut().map(|h| (None, Some(h))).collect(); - sessions_cap = Some(cap); + while let Some((time, dataz)) = todo.pop_first() { + let cap = caps.remove(&time).unwrap(); + for mut data in dataz.into_iter() { + for datum in data.drain() { + let (part, datum) = route(datum); + parts.entry(part).or_default().push(datum); + } } - for datum in data.drain() { - let (part, datum2) = route(datum); - - let session = match sessions[part as usize] { - (Some(ref mut s), _) => s, - (ref mut session_slot, ref mut handle) => { - let handle = handle.take().unwrap(); - let session = handle.session_with_builder(sessions_cap.as_ref().unwrap()); - session_slot.insert(session) - } - }; - session.give(datum2); + while let Some((part, data)) = parts.pop_first() { + for datum in data.into_iter() { + c_build.push_into(datum); + } + while let Some(container) = c_build.finish() { + handles[part as usize].give(&cap, container); + } } } } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index fee122014..08d499266 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -9,15 +9,13 @@ use crate::progress::operate::SharedProgress; use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::Container; -use crate::container::ContainerBuilder; use crate::dataflow::{Scope, StreamCore}; -use crate::dataflow::channels::pushers::Tee; use crate::dataflow::channels::pushers::Counter as PushCounter; -use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; +use crate::dataflow::channels::pushers; use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::capability::Capability; -use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper}; +use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle}; use crate::dataflow::operators::generic::operator_info::OperatorInfo; use crate::dataflow::operators::generic::builder_raw::OperatorShape; use crate::progress::operate::PortConnectivity; @@ -90,7 +88,7 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { + pub fn new_output(&mut self) -> (pushers::Output, StreamCore) { let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default()))); self.new_output_connection(connection) } @@ -103,9 +101,9 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_output_connection(&mut self, connection: I) -> ( - OutputWrapper>, - StreamCore + pub fn new_output_connection(&mut self, connection: I) -> ( + pushers::Output, + StreamCore, ) where I: IntoIterator::Summary>)> + Clone, @@ -116,14 +114,14 @@ impl OperatorBuilder { let internal = Rc::new(RefCell::new(ChangeBatch::new())); self.internal.borrow_mut().push(Rc::clone(&internal)); - let mut buffer = PushBuffer::new(PushCounter::new(tee)); - self.produced.push(Rc::clone(buffer.inner().produced())); + let counter = PushCounter::new(tee); + self.produced.push(Rc::clone(counter.produced())); for (input, entry) in connection { self.summaries[input].borrow_mut().add_port(new_output, entry); } - (OutputWrapper::new(buffer, internal, new_output), stream) + (pushers::Output::new(counter, internal, new_output), stream) } /// Creates an operator implementation from supplied logic constructor. @@ -222,7 +220,7 @@ impl OperatorBuilder { #[cfg(test)] mod tests { - use crate::container::CapacityContainerBuilder; + use crate::dataflow::operators::generic::OutputBuilder; #[test] #[should_panic] @@ -237,9 +235,10 @@ mod tests { let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone()); - // let mut input = builder.new_input(stream, Pipeline); - let (mut output1, _stream1) = builder.new_output::>>(); - let (mut output2, _stream2) = builder.new_output::>>(); + let (output1, _stream1) = builder.new_output::>(); + let (output2, _stream2) = builder.new_output::>(); + let mut output1 = OutputBuilder::from(output1); + let mut output2 = OutputBuilder::from(output2); builder.build(move |capabilities| { move |_frontiers| { @@ -267,9 +266,10 @@ mod tests { let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone()); - // let mut input = builder.new_input(stream, Pipeline); - let (mut output1, _stream1) = builder.new_output::>>(); - let (mut output2, _stream2) = builder.new_output::>>(); + let (output1, _stream1) = builder.new_output::>(); + let (output2, _stream2) = builder.new_output::>(); + let mut output1 = OutputBuilder::from(output1); + let mut output2 = OutputBuilder::from(output2); builder.build(move |mut capabilities| { move |_frontiers| { diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index ba7330dcb..9befc0b0b 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -11,12 +11,10 @@ use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::progress::operate::PortConnectivity; use crate::dataflow::channels::pullers::Counter as PullCounter; -use crate::dataflow::channels::pushers::Counter as PushCounter; -use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; use crate::dataflow::channels::Message; -use crate::communication::{Push, Pull}; +use crate::communication::Pull; use crate::{Container, Accountable}; -use crate::container::{ContainerBuilder, CapacityContainerBuilder}; +use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto}; use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; @@ -116,116 +114,108 @@ pub fn new_input_handle>>( } } -/// An owned instance of an output buffer which ensures certain API use. -/// -/// An `OutputWrapper` exists to prevent anyone from using the wrapped buffer in any way other -/// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the -/// pusher is flushed (via the `cease` method) once it is no longer used. -#[derive(Debug)] -pub struct OutputWrapper>> { - push_buffer: Buffer>, - internal_buffer: Rc>>, - port: usize, +/// An owning pair of output pusher and container builder. +pub struct OutputBuilder { + output: crate::dataflow::channels::pushers::Output, + builder: CB, } -impl>> OutputWrapper { - /// Creates a new output wrapper from a push buffer. - pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>, port: usize) -> Self { - OutputWrapper { - push_buffer, - internal_buffer, - port, - } +impl OutputBuilder { + /// Constructs an output builder from an output and a default container builder. + pub fn from(output: crate::dataflow::channels::pushers::Output) -> Self { + Self { output, builder: CB::default() } } - /// Borrows the push buffer into a handle, which can be used to send records. - /// - /// This method ensures that the only access to the push buffer is through the `OutputHandle` - /// type which ensures the use of capabilities, and which calls `cease` when it is dropped. - pub fn activate(&mut self) -> OutputHandleCore<'_, T, CB, P> { - OutputHandleCore { - push_buffer: &mut self.push_buffer, - internal_buffer: &self.internal_buffer, - port: self.port, + /// An activated output buffer for building containers. + pub fn activate<'a>(&'a mut self) -> OutputBuffer<'a, T, CB> { + OutputBuffer { + session: self.output.activate(), + builder: &mut self.builder, } } } -/// Handle to an operator's output stream. -pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push>+'a> { - push_buffer: &'a mut Buffer>, - internal_buffer: &'a Rc>>, - port: usize, +/// A wrapper around a live output session, with a container builder to buffer. +pub struct OutputBuffer<'a, T: Timestamp, CB: ContainerBuilder> { + session: crate::dataflow::channels::pushers::OutputSession<'a, T, CB::Container>, + builder: &'a mut CB, } -/// Handle specialized to `Vec`-based container. -pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, CapacityContainerBuilder>, P>; - -impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> OutputHandleCore<'a, T, CB, P> { - /// Obtains a session that can send data at the timestamp associated with capability `cap`. - /// - /// In order to send data at a future timestamp, obtain a capability for the new timestamp - /// first, as show in the example. +impl<'a, T: Timestamp, CB: ContainerBuilder> OutputBuffer<'a, T, CB> { + /// A container-building session associated with a capability. /// - /// # Examples - /// ``` - /// use timely::dataflow::operators::ToStream; - /// use timely::dataflow::operators::generic::Operator; - /// use timely::dataflow::channels::pact::Pipeline; - /// use timely::container::CapacityContainerBuilder; - /// - /// timely::example(|scope| { - /// (0..10).to_stream(scope) - /// .unary::, _, _, _>(Pipeline, "example", |_cap, _info| |input, output| { - /// input.for_each_time(|cap, data| { - /// let time = cap.time().clone() + 1; - /// output.session_with_builder(&cap.delayed(&time)) - /// .give_containers(data); - /// }); - /// }); - /// }); - /// ``` - pub fn session_with_builder<'b, CT: CapabilityTrait>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter> where 'a: 'b { - debug_assert!(cap.valid_for_output(self.internal_buffer, self.port), "Attempted to open output session with invalid capability"); - self.push_buffer.session_with_builder(cap.time()) - } - - /// Flushes all pending data and indicate that no more data immediately follows. - pub fn cease(&mut self) { - self.push_buffer.cease(); + /// This method is the prefered way of sending records that must be accumulated into a container, + /// as it avoid the recurring overhead of capability validation. + pub fn session_with_builder<'b, CT: CapabilityTrait>(&'b mut self, capability: &'b CT) -> Session<'a, 'b, T, CB, CT> where 'a: 'b { + debug_assert!(self.session.valid(capability)); + Session { + buffer: self, + capability, + } } } -impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, CapacityContainerBuilder, P> { - /// Obtains a session that can send data at the timestamp associated with capability `cap`. - /// - /// In order to send data at a future timestamp, obtain a capability for the new timestamp - /// first, as show in the example. +impl<'a, T: Timestamp, C: Container> OutputBuffer<'a, T, CapacityContainerBuilder> { + /// A container-building session associated with a capability. /// - /// # Examples - /// ``` - /// use timely::dataflow::operators::ToStream; - /// use timely::dataflow::operators::generic::Operator; - /// use timely::dataflow::channels::pact::Pipeline; - /// - /// timely::example(|scope| { - /// (0..10).to_stream(scope) - /// .unary(Pipeline, "example", |_cap, _info| |input, output| { - /// input.for_each_time(|cap, data| { - /// let time = cap.time().clone() + 1; - /// output.session(&cap.delayed(&time)) - /// .give_containers(data); - /// }); - /// }); - /// }); - /// ``` - #[inline] - pub fn session<'b, CT: CapabilityTrait>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder, PushCounter> where 'a: 'b { - self.session_with_builder(cap) + /// This method is the prefered way of sending records that must be accumulated into a container, + /// as it avoid the recurring overhead of capability validation. + pub fn session<'b, CT: CapabilityTrait>(&'b mut self, capability: &'b CT) -> Session<'a, 'b, T, CapacityContainerBuilder, CT> where 'a: 'b { + debug_assert!(self.session.valid(capability)); + Session { + buffer: self, + capability, + } } } -impl>> Drop for OutputHandleCore<'_, T, CB, P> { - fn drop(&mut self) { - self.push_buffer.cease(); +/// An active output building session, which accepts items and builds containers. +pub struct Session<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait> { + buffer: &'b mut OutputBuffer<'a, T, CB>, + capability: &'b CT, +} + +impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait> Session<'a, 'b, T, CB, CT> { + + /// Provides one record at the time specified by the `Session`. + #[inline] + pub fn give(&mut self, data: D) where CB: PushInto { + self.buffer.builder.push_into(data); + self.extract_and_send(); + } + + /// Provides an iterator of records at the time specified by the `Session`. + #[inline] + pub fn give_iterator(&mut self, iter: I) + where + I: Iterator, + CB: PushInto, + { + for item in iter { self.buffer.builder.push_into(item); } + self.extract_and_send(); } + /// Provide a container at the time specified by the [Session]. + pub fn give_container(&mut self, container: &mut CB::Container) { + self.buffer.session.give(&self.capability, container); + } + /// Provide multiple containers at the time specifid by the [Session]. + pub fn give_containers<'c>(&mut self, containers: impl Iterator) { + for container in containers { self.buffer.session.give(&self.capability, container); } + } + + /// Extracts built containers and sends them. + pub fn extract_and_send(&mut self) { + while let Some(container) = self.buffer.builder.extract() { + self.buffer.session.give(&self.capability, container); + } + } + /// Finalizes containers and sends them. + pub fn flush(&mut self) { + while let Some(container) = self.buffer.builder.finish() { + self.buffer.session.give(&self.capability, container); + } + } +} + +impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait> Drop for Session<'a, 'b, T, CB, CT> { + fn drop(&mut self) { self.flush() } } diff --git a/timely/src/dataflow/operators/generic/mod.rs b/timely/src/dataflow/operators/generic/mod.rs index 0a9bf72c1..dd0742523 100644 --- a/timely/src/dataflow/operators/generic/mod.rs +++ b/timely/src/dataflow/operators/generic/mod.rs @@ -8,7 +8,7 @@ mod handles; mod notificator; mod operator_info; -pub use self::handles::{InputHandleCore, OutputHandle, OutputHandleCore, OutputWrapper}; +pub use self::handles::{InputHandleCore, OutputBuilder, OutputBuffer, Session}; pub use self::notificator::{Notificator, FrontierNotificator}; pub use self::operator::{Operator, source}; diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 5599e81eb..b55ca38a2 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -2,10 +2,9 @@ //! Methods to construct generic streaming and blocking unary operators. use crate::progress::frontier::MutableAntichain; -use crate::dataflow::channels::pushers::Tee; use crate::dataflow::channels::pact::ParallelizationContract; -use crate::dataflow::operators::generic::handles::{InputSession, OutputHandleCore}; +use crate::dataflow::operators::generic::handles::{InputSession, OutputBuffer, OutputBuilder}; use crate::dataflow::operators::capability::Capability; use crate::dataflow::{Scope, StreamCore}; @@ -59,7 +58,7 @@ pub trait Operator { CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain), - &mut OutputHandleCore>)+'static, + &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization @@ -89,7 +88,7 @@ pub trait Operator { /// ``` fn unary_notify, - &mut OutputHandleCore>, + &mut OutputBuffer<'_, G::Timestamp, CB>, &mut Notificator)+'static, P: ParallelizationContract> (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; @@ -125,7 +124,7 @@ pub trait Operator { CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, - &mut OutputHandleCore>)+'static, + &mut OutputBuffer)+'static, P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input streams by a parallelization @@ -183,7 +182,7 @@ pub trait Operator { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain), (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain), - &mut OutputHandleCore>)+'static, + &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract; @@ -232,7 +231,7 @@ pub trait Operator { CB: ContainerBuilder, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, - &mut OutputHandleCore>, + &mut OutputBuffer<'_, G::Timestamp, CB>, &mut Notificator)+'static, P1: ParallelizationContract, P2: ParallelizationContract> @@ -271,7 +270,7 @@ pub trait Operator { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, - &mut OutputHandleCore>)+'static, + &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract; @@ -311,14 +310,15 @@ impl Operator for StreamCore { CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain), - &mut OutputHandleCore>)+'static, + &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); let mut input = builder.new_input(self, pact); - let (mut output, stream) = builder.new_output(); + let (output, stream) = builder.new_output(); + let mut output = OutputBuilder::from(output); builder.build(move |mut capabilities| { // `capabilities` should be a single-element vector. @@ -335,7 +335,7 @@ impl Operator for StreamCore { fn unary_notify, - &mut OutputHandleCore>, + &mut OutputBuffer<'_, G::Timestamp, CB>, &mut Notificator)+'static, P: ParallelizationContract> (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { @@ -359,14 +359,15 @@ impl Operator for StreamCore { CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, - &mut OutputHandleCore>)+'static, + &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); let mut input = builder.new_input(self, pact); - let (mut output, stream) = builder.new_output(); + let (output, stream) = builder.new_output(); + let mut output = OutputBuilder::from(output); builder.set_notify(false); builder.build(move |mut capabilities| { @@ -386,7 +387,7 @@ impl Operator for StreamCore { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain), (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain), - &mut OutputHandleCore>)+'static, + &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract { @@ -395,7 +396,8 @@ impl Operator for StreamCore { let mut input1 = builder.new_input(self, pact1); let mut input2 = builder.new_input(other, pact2); - let (mut output, stream) = builder.new_output(); + let (output, stream) = builder.new_output(); + let mut output = OutputBuilder::from(output); builder.build(move |mut capabilities| { // `capabilities` should be a single-element vector. @@ -414,7 +416,7 @@ impl Operator for StreamCore { CB: ContainerBuilder, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, - &mut OutputHandleCore>, + &mut OutputBuffer<'_, G::Timestamp, CB>, &mut Notificator)+'static, P1: ParallelizationContract, P2: ParallelizationContract> @@ -443,7 +445,7 @@ impl Operator for StreamCore { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, - &mut OutputHandleCore>)+'static, + &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract { @@ -452,7 +454,8 @@ impl Operator for StreamCore { let mut input1 = builder.new_input(self, pact1); let mut input2 = builder.new_input(other, pact2); - let (mut output, stream) = builder.new_output(); + let (output, stream) = builder.new_output(); + let mut output = OutputBuilder::from(output); builder.set_notify(false); builder.build(move |mut capabilities| { @@ -530,12 +533,13 @@ pub fn source(scope: &G, name: &str, constructor: B) -> Stre where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut OutputHandleCore>)+'static { + L: FnMut(&mut OutputBuffer<'_, G::Timestamp, CB>)+'static { let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); let operator_info = builder.operator_info(); - let (mut output, stream) = builder.new_output(); + let (output, stream) = builder.new_output(); + let mut output = OutputBuilder::from(output); builder.set_notify(false); builder.build(move |mut capabilities| { diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 2644e7ce0..f6d212a58 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -59,4 +59,4 @@ pub mod count; // keep "mint" module-private mod capability; -pub use self::capability::{ActivateCapability, Capability, InputCapability, CapabilitySet, DowngradeError}; +pub use self::capability::{ActivateCapability, Capability, CapabilityTrait, InputCapability, CapabilitySet, DowngradeError}; diff --git a/timely/tests/shape_scaling.rs b/timely/tests/shape_scaling.rs index 75cbf141d..b6c136755 100644 --- a/timely/tests/shape_scaling.rs +++ b/timely/tests/shape_scaling.rs @@ -25,8 +25,7 @@ fn operator_scaling(scale: u64) { let mut handles = Vec::with_capacity(parts.len()); let mut outputs = Vec::with_capacity(parts.len()); for (index, part) in parts.into_iter().enumerate() { - use timely::container::CapacityContainerBuilder; - let (output, stream) = builder.new_output_connection::>,_>([]); + let (output, stream) = builder.new_output_connection::,_>([]); use timely::progress::Antichain; let connectivity = [(index, Antichain::from_elem(Default::default()))]; handles.push((builder.new_input_connection(&part, Pipeline, connectivity), output)); @@ -37,11 +36,8 @@ fn operator_scaling(scale: u64) { move |_frontiers| { for (input, output) in handles.iter_mut() { let mut output = output.activate(); - input.activate().for_each_time(|time, data| { - let mut output = output.session_with_builder(&time); - for datum in data.flat_map(|d| d.drain(..)) { - output.give(datum); - } + input.for_each(|time, data| { + output.give(&time, data); }); } } From 388df20034c4a910a3062798e817ec2947d61cc9 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 18 Sep 2025 18:11:34 -0400 Subject: [PATCH 2/5] Remove pushers::Buffer --- timely/examples/unordered_input.rs | 3 +- .../src/dataflow/channels/pushers/buffer.rs | 255 ------------------ .../src/dataflow/channels/pushers/counter.rs | 8 + timely/src/dataflow/channels/pushers/mod.rs | 1 - timely/src/dataflow/operators/capability.rs | 2 +- .../dataflow/operators/core/capture/replay.rs | 12 +- timely/src/dataflow/operators/core/probe.rs | 10 +- .../operators/core/unordered_input.rs | 41 ++- .../src/dataflow/operators/generic/handles.rs | 15 +- .../src/dataflow/operators/unordered_input.rs | 2 +- 10 files changed, 44 insertions(+), 305 deletions(-) delete mode 100644 timely/src/dataflow/channels/pushers/buffer.rs diff --git a/timely/examples/unordered_input.rs b/timely/examples/unordered_input.rs index 4df15e91b..da469dce5 100644 --- a/timely/examples/unordered_input.rs +++ b/timely/examples/unordered_input.rs @@ -1,6 +1,5 @@ use timely::dataflow::operators::*; use timely::Config; -// use timely::progress::timestamp::RootTimestamp; fn main() { timely::execute(Config::thread(), |worker| { @@ -11,7 +10,7 @@ fn main() { }); for round in 0..10 { - input.session(cap.clone()).give(round); + input.activate().session(&cap).give(round); cap = cap.delayed(&(round + 1)); worker.step(); } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs deleted file mode 100644 index d9e69755f..000000000 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ /dev/null @@ -1,255 +0,0 @@ -//! Buffering and session mechanisms to provide the appearance of record-at-a-time sending, -//! with the performance of batched sends. - -use crate::communication::Push; -use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto}; -use crate::dataflow::channels::Message; -use crate::dataflow::operators::Capability; -use crate::progress::Timestamp; -use crate::{Container, Accountable}; - -/// Buffers data sent at the same time, for efficient communication. -/// -/// The `Buffer` type should be used by calling `session` with a time, which checks whether -/// data must be flushed and creates a `Session` object which allows sending at the given time. -#[derive(Debug)] -pub struct Buffer { - /// The currently open time, if it is open. - time: Option, - /// A builder for containers, to send at `self.time`. - builder: CB, - /// The pusher to send data downstream. - pusher: P, -} - -impl Buffer { - /// Creates a new `Buffer`. - pub fn new(pusher: P) -> Self { - Self { - time: None, - builder: Default::default(), - pusher, - } - } - - /// Returns a reference to the inner `P: Push` type. - /// - /// This is currently used internally, and should not be used without some care. - pub fn inner(&mut self) -> &mut P { &mut self.pusher } - - /// Access the builder. Immutable access to prevent races with flushing - /// the underlying buffer. - pub fn builder(&self) -> &CB { - &self.builder - } -} - -impl>> Buffer, P> where T: Eq+Clone { - /// Returns a `Session`, which accepts data to send at the associated time - #[inline] - pub fn session(&mut self, time: &T) -> Session<'_, T, CapacityContainerBuilder, P> { - self.session_with_builder(time) - } - - /// Allocates a new `AutoflushSession` which flushes itself on drop. - #[inline] - pub fn autoflush_session(&mut self, cap: Capability) -> AutoflushSession<'_, T, CapacityContainerBuilder, P> where T: Timestamp { - self.autoflush_session_with_builder(cap) - } -} - -impl>> Buffer where T: Eq+Clone { - /// Returns a `Session`, which accepts data to send at the associated time - pub fn session_with_builder(&mut self, time: &T) -> Session<'_, T, CB, P> { - if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); } - self.time = Some(time.clone()); - Session { buffer: self } - } - - /// Allocates a new `AutoflushSession` which flushes itself on drop. - pub fn autoflush_session_with_builder(&mut self, cap: Capability) -> AutoflushSession<'_, T, CB, P> where T: Timestamp { - if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); } - self.time = Some(cap.time().clone()); - AutoflushSession { - buffer: self, - _capability: cap, - } - } -} - -impl>> Buffer where T: Eq+Clone { - /// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush. - pub fn cease(&mut self) { - self.flush(); - self.builder.relax(); - self.pusher.push(&mut None); - } - - /// Extract pending data from the builder, but not forcing a flush. - #[inline] - fn extract_and_send(&mut self) { - while let Some(container) = self.builder.extract() { - let time = self.time.as_ref().unwrap().clone(); - Message::push_at(container, time, &mut self.pusher); - } - } - - /// Flush the builder, forcing all its contents to be written. - #[inline] - fn flush(&mut self) { - while let Some(container) = self.builder.finish() { - let time = self.time.as_ref().unwrap().clone(); - Message::push_at(container, time, &mut self.pusher); - } - } - - /// Gives an entire container at the current time. Maintains FIFO order with previously pushed - /// data. Only intended to be used through [`Session::give_container`]. - // TODO: This method could exist without a container builder, but we can't express this as a - // buffer always requires a container builder. We could expose the buffer's underlying pusher - // directly, but this would bypass the buffer's time tracking. - fn give_container(&mut self, container: &mut CB::Container) { - if !container.is_empty() { - self.flush(); - let time = self.time.as_ref().unwrap().clone(); - Message::push_at(container, time, &mut self.pusher); - } - } - - /// An internal implementation of push that should only be called by sessions. - #[inline] - fn push_internal(&mut self, item: D) where CB: PushInto { - self.builder.push_into(item); - self.extract_and_send(); - } -} - -/// An output session for sending records at a specified time. -/// -/// The `Session` struct provides the user-facing interface to an operator output, namely -/// the `Buffer` type. A `Session` wraps a session of output at a specified time, and -/// avoids what would otherwise be a constant cost of checking timestamp equality. -pub struct Session<'a, T, CB, P> { - buffer: &'a mut Buffer, -} - -impl<'a, T, CB: ContainerBuilder, P> Session<'a, T, CB, P> -where - T: Eq + Clone + 'a, - P: Push> + 'a, -{ - /// Provide a container at the time specified by the [Session]. - pub fn give_container(&mut self, container: &mut CB::Container) { - self.buffer.give_container(container) - } - /// Provide multiple containers at the time specifid by the [Session]. - pub fn give_containers<'b>(&mut self, containers: impl Iterator) { - for container in containers { self.buffer.give_container(container); } - } -} - -impl<'a, T, CB, P> Session<'a, T, CB, P> -where - T: Eq + Clone + 'a, - CB: ContainerBuilder + 'a, - P: Push> + 'a -{ - /// Access the builder. Immutable access to prevent races with flushing - /// the underlying buffer. - pub fn builder(&self) -> &CB { - self.buffer.builder() - } - - /// Provides one record at the time specified by the `Session`. - #[inline] - pub fn give(&mut self, data: D) where CB: PushInto { - self.push_into(data); - } - - /// Provides an iterator of records at the time specified by the `Session`. - #[inline] - pub fn give_iterator(&mut self, iter: I) - where - I: Iterator, - CB: PushInto, - { - for item in iter { - self.push_into(item); - } - } -} - -impl<'a, T, CB, P, D> PushInto for Session<'a, T, CB, P> -where - T: Eq + Clone + 'a, - CB: ContainerBuilder + PushInto + 'a, - P: Push> + 'a, -{ - #[inline] - fn push_into(&mut self, item: D) { - self.buffer.push_internal(item); - } -} - -/// A session which will flush itself when dropped. -pub struct AutoflushSession<'a, T, CB, P> -where - T: Timestamp + 'a, - CB: ContainerBuilder + 'a, - P: Push> + 'a, -{ - /// A reference to the underlying buffer. - buffer: &'a mut Buffer, - /// The capability being used to send the data. - _capability: Capability, -} - -impl<'a, T, CB, P> AutoflushSession<'a, T, CB, P> -where - T: Timestamp + 'a, - CB: ContainerBuilder + 'a, - P: Push> + 'a, -{ - /// Transmits a single record. - #[inline] - pub fn give(&mut self, data: D) - where - CB: PushInto, - { - self.push_into(data); - } - - /// Transmits records produced by an iterator. - #[inline] - pub fn give_iterator(&mut self, iter: I) - where - I: Iterator, - CB: PushInto, - { - for item in iter { - self.push_into(item); - } - } -} -impl<'a, T, CB, P, D> PushInto for AutoflushSession<'a, T, CB, P> -where - T: Timestamp + 'a, - CB: ContainerBuilder + PushInto + 'a, - P: Push> + 'a, -{ - #[inline] - fn push_into(&mut self, item: D) { - self.buffer.push_internal(item); - } -} - -impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P> -where - T: Timestamp + 'a, - CB: ContainerBuilder + 'a, - P: Push> + 'a, -{ - fn drop(&mut self) { - self.buffer.cease(); - } -} diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index 8718c8acd..249569937 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -42,4 +42,12 @@ impl Counter where T : Ord+Clone+'static { pub fn produced(&self) -> &Rc>> { &self.produced } + /// Ships a time and a container. + /// + /// This is not a validated capability, and this method should not be used without great care. + /// Ideally, users would not have direct access to a `Counter`, and preventing this is the way + /// to uphold invariants. + #[inline] pub fn give(&mut self, time: T, container: &mut C) where P: Push> { + if !container.is_empty() { Message::push_at(container, time, &mut self.pushee); } + } } diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index e1c49e59f..d980db003 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -5,7 +5,6 @@ pub use self::counter::Counter; pub mod tee; pub mod exchange; pub mod counter; -pub mod buffer; pub mod progress; /// An output pusher which validates capabilities, records progress, and tees output. diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index af81dba5d..f984a5339 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -342,7 +342,7 @@ impl Debug for InputCapability { } } -/// Capability that activates on drop. +/// Capability that activates on drop or downgrade. #[derive(Clone, Debug)] pub struct ActivateCapability { pub(crate) capability: Capability, diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index 41ff50d12..45ad46869 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -40,7 +40,6 @@ use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pushers::Counter as PushCounter; -use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::progress::Timestamp; @@ -76,7 +75,7 @@ where let (targets, stream) = builder.new_output(); - let mut output = PushBuffer::new(PushCounter::new(targets)); + let mut output = PushCounter::new(targets); let mut event_streams = self.into_iter().collect::>(); let mut started = false; let mut allocation: C = Default::default(); @@ -100,14 +99,14 @@ where progress.internals[0].extend(vec.into_iter()); }, Owned(Event::Messages(time, mut data)) => { - output.session(&time).give_container(&mut data); + output.give(time.clone(), &mut data); } Borrowed(Event::Progress(vec)) => { progress.internals[0].extend(vec.iter().cloned()); }, Borrowed(Event::Messages(time, data)) => { allocation.clone_from(data); - output.session(time).give_container(&mut allocation); + output.give(time.clone(), &mut allocation); } } } @@ -118,8 +117,9 @@ where activator.activate_after(delay); } - output.cease(); - output.inner().produced().borrow_mut().drain_into(&mut progress.produceds[0]); + use timely_communication::Push; + output.push(&mut None); + output.produced().borrow_mut().drain_into(&mut progress.produceds[0]); false } diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index 324061991..f639c4966 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -6,7 +6,6 @@ use std::cell::RefCell; use crate::progress::Timestamp; use crate::progress::frontier::{AntichainRef, MutableAntichain}; use crate::dataflow::channels::pushers::Counter as PushCounter; -use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; @@ -92,7 +91,7 @@ impl Probe for StreamCore { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); let (tee, stream) = builder.new_output(); - let mut output = PushBuffer::new(PushCounter::new(tee)); + let mut output = PushCounter::new(tee); let shared_frontier = Rc::downgrade(&handle.frontier); let mut started = false; @@ -115,13 +114,14 @@ impl Probe for StreamCore { while let Some(message) = input.next() { let time = &message.time; let data = &mut message.data; - output.session(time).give_container(data); + output.give(time.clone(), data); } - output.cease(); + use timely_communication::Push; + output.push(&mut None); // extract what we know about progress from the input and output adapters. input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]); - output.inner().produced().borrow_mut().drain_into(&mut progress.produceds[0]); + output.produced().borrow_mut().drain_into(&mut progress.produceds[0]); false }, diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index a06633c26..f99dc299f 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -3,8 +3,7 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::Container; -use crate::container::{ContainerBuilder, CapacityContainerBuilder}; +use crate::container::ContainerBuilder; use crate::scheduling::{Schedule, ActivateOnDrop}; @@ -12,13 +11,13 @@ use crate::progress::{Operate, operate::SharedProgress, Timestamp}; use crate::progress::Source; use crate::progress::ChangeBatch; use crate::progress::operate::Connectivity; -use crate::dataflow::channels::pushers::{Counter, Tee}; -use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession}; - +use crate::dataflow::channels::pushers::{Counter, Tee, Output}; +use crate::dataflow::operators::generic::{OutputBuilder, OutputBuffer}; use crate::dataflow::operators::{ActivateCapability, Capability}; - use crate::dataflow::{Scope, StreamCore}; +use crate::scheduling::Activations; + /// Create a new `Stream` and `Handle` through which to supply input. pub trait UnorderedInput { /// Create a new capability-based [StreamCore] and [UnorderedHandle] through which to supply input. This @@ -65,7 +64,7 @@ pub trait UnorderedInput { /// /// // feed values 0..10 at times 0..10. /// for round in 0..10 { - /// input.session(cap.clone()).give(round); + /// input.activate().session(&cap).give(round); /// cap = cap.delayed(&(round + 1)); /// worker.step(); /// } @@ -88,6 +87,7 @@ impl UnorderedInput for G { let cap = Capability::new(G::Timestamp::minimum(), Rc::clone(&internal)); let counter = Counter::new(output); let produced = Rc::clone(counter.produced()); + let counter = Output::new(counter, Rc::clone(&internal), 0); let peers = self.peers(); let index = self.allocate_operator_index(); @@ -95,7 +95,7 @@ impl UnorderedInput for G { let cap = ActivateCapability::new(cap, Rc::clone(&address), self.activations()); - let helper = UnorderedHandle::new(counter); + let helper = UnorderedHandle::new(counter, Rc::clone(&address), self.activations()); self.add_operator_with_index(Box::new(UnorderedOperator { name: "UnorderedInput".to_owned(), @@ -146,29 +146,24 @@ impl Operate for UnorderedOperator { } /// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. -#[derive(Debug)] pub struct UnorderedHandle { - buffer: PushBuffer>>, + output: OutputBuilder, + address: Rc<[usize]>, + activations: Rc>, } impl UnorderedHandle { - fn new(pusher: Counter>) -> UnorderedHandle { - UnorderedHandle { - buffer: PushBuffer::new(pusher), + fn new(output: Output, address: Rc<[usize]>, activations: Rc>) -> Self { + Self { + output: OutputBuilder::from(output), + address, + activations, } } /// Allocates a new automatically flushing session based on the supplied capability. #[inline] - pub fn session_with_builder(&mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { - ActivateOnDrop::new(self.buffer.autoflush_session_with_builder(cap.capability.clone()), Rc::clone(&cap.address), Rc::clone(&cap.activations)) - } -} - -impl UnorderedHandle> { - /// Allocates a new automatically flushing session based on the supplied capability. - #[inline] - pub fn session(&mut self, cap: ActivateCapability) -> ActivateOnDrop, Counter>>> { - self.session_with_builder(cap) + pub fn activate(&mut self) -> ActivateOnDrop> { + ActivateOnDrop::new(self.output.activate(), Rc::clone(&self.address), Rc::clone(&self.activations)) } } diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 9befc0b0b..bc116e0cd 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -177,28 +177,21 @@ pub struct Session<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: Capabilit impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait> Session<'a, 'b, T, CB, CT> { /// Provides one record at the time specified by the `Session`. - #[inline] - pub fn give(&mut self, data: D) where CB: PushInto { + #[inline] pub fn give(&mut self, data: D) where CB: PushInto { self.buffer.builder.push_into(data); self.extract_and_send(); } - /// Provides an iterator of records at the time specified by the `Session`. - #[inline] - pub fn give_iterator(&mut self, iter: I) - where - I: Iterator, - CB: PushInto, - { + #[inline] pub fn give_iterator(&mut self, iter: I) where I: Iterator, CB: PushInto { for item in iter { self.buffer.builder.push_into(item); } self.extract_and_send(); } /// Provide a container at the time specified by the [Session]. - pub fn give_container(&mut self, container: &mut CB::Container) { + #[inline] pub fn give_container(&mut self, container: &mut CB::Container) { self.buffer.session.give(&self.capability, container); } /// Provide multiple containers at the time specifid by the [Session]. - pub fn give_containers<'c>(&mut self, containers: impl Iterator) { + #[inline] pub fn give_containers<'c>(&mut self, containers: impl Iterator) { for container in containers { self.buffer.session.give(&self.capability, container); } } diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index 76fb4329d..6f6b8af48 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -51,7 +51,7 @@ pub trait UnorderedInput { /// /// // feed values 0..10 at times 0..10. /// for round in 0..10 { - /// input.session(cap.clone()).give(round); + /// input.activate().session(&cap).give(round); /// cap = cap.delayed(&(round + 1)); /// worker.step(); /// } From 9b5d5b9485ea7299de8d91d030f0f0b940d3aad2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 19 Sep 2025 10:10:00 -0400 Subject: [PATCH 3/5] Respond to feedback --- timely/src/dataflow/channels/pushers/exchange.rs | 2 +- timely/src/dataflow/channels/pushers/progress.rs | 2 +- timely/src/dataflow/channels/pushers/tee.rs | 2 +- timely/src/dataflow/operators/core/capture/replay.rs | 2 +- timely/src/dataflow/operators/core/partition.rs | 3 +++ timely/src/dataflow/operators/core/probe.rs | 2 +- 6 files changed, 8 insertions(+), 5 deletions(-) diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 81e168c32..023b725a7 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -141,7 +141,7 @@ where } self.distributor.relax(); for index in 0..self.pushers.len() { - self.pushers[index].push(&mut None); + self.pushers[index].done(); } } } diff --git a/timely/src/dataflow/channels/pushers/progress.rs b/timely/src/dataflow/channels/pushers/progress.rs index 618aedd04..71c7993bf 100644 --- a/timely/src/dataflow/channels/pushers/progress.rs +++ b/timely/src/dataflow/channels/pushers/progress.rs @@ -63,5 +63,5 @@ impl<'a, T: Timestamp, C, P: Push>> std::ops::DerefMut for Progres } impl<'a, T: Timestamp, C, P: Push>> Drop for ProgressSession<'a, T, C, P> { - fn drop(&mut self) { self.borrow.pushee.push(&mut None); } + fn drop(&mut self) { self.borrow.pushee.done(); } } diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index df2e2090d..14e2c07ef 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -29,7 +29,7 @@ impl Push> for Tee { } else { for index in 1..pushers.len() { - pushers[index-1].push(&mut None); + pushers[index-1].done(); } } if !pushers.is_empty() { diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index 45ad46869..cbb120e1a 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -118,7 +118,7 @@ where } use timely_communication::Push; - output.push(&mut None); + output.done(); output.produced().borrow_mut().drain_into(&mut progress.produceds[0]); false diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index 608e79ff1..2891f2e0d 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -77,6 +77,9 @@ impl Partition for StreamCore Probe for StreamCore { output.give(time.clone(), data); } use timely_communication::Push; - output.push(&mut None); + output.done(); // extract what we know about progress from the input and output adapters. input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]); From 4f79d547c14dfb5d1817a603631d57756c4cc679 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 21 Sep 2025 08:40:44 -0400 Subject: [PATCH 4/5] Rename to OutputBuilderSession --- .../operators/core/unordered_input.rs | 4 +-- .../src/dataflow/operators/generic/handles.rs | 12 ++++---- timely/src/dataflow/operators/generic/mod.rs | 2 +- .../dataflow/operators/generic/operator.rs | 28 +++++++++---------- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index f99dc299f..013bc9123 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -12,7 +12,7 @@ use crate::progress::Source; use crate::progress::ChangeBatch; use crate::progress::operate::Connectivity; use crate::dataflow::channels::pushers::{Counter, Tee, Output}; -use crate::dataflow::operators::generic::{OutputBuilder, OutputBuffer}; +use crate::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession}; use crate::dataflow::operators::{ActivateCapability, Capability}; use crate::dataflow::{Scope, StreamCore}; @@ -163,7 +163,7 @@ impl UnorderedHandle { /// Allocates a new automatically flushing session based on the supplied capability. #[inline] - pub fn activate(&mut self) -> ActivateOnDrop> { + pub fn activate(&mut self) -> ActivateOnDrop> { ActivateOnDrop::new(self.output.activate(), Rc::clone(&self.address), Rc::clone(&self.activations)) } } diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index bc116e0cd..c6fa7a5b1 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -126,8 +126,8 @@ impl OutputBuilder { Self { output, builder: CB::default() } } /// An activated output buffer for building containers. - pub fn activate<'a>(&'a mut self) -> OutputBuffer<'a, T, CB> { - OutputBuffer { + pub fn activate<'a>(&'a mut self) -> OutputBuilderSession<'a, T, CB> { + OutputBuilderSession { session: self.output.activate(), builder: &mut self.builder, } @@ -135,12 +135,12 @@ impl OutputBuilder { } /// A wrapper around a live output session, with a container builder to buffer. -pub struct OutputBuffer<'a, T: Timestamp, CB: ContainerBuilder> { +pub struct OutputBuilderSession<'a, T: Timestamp, CB: ContainerBuilder> { session: crate::dataflow::channels::pushers::OutputSession<'a, T, CB::Container>, builder: &'a mut CB, } -impl<'a, T: Timestamp, CB: ContainerBuilder> OutputBuffer<'a, T, CB> { +impl<'a, T: Timestamp, CB: ContainerBuilder> OutputBuilderSession<'a, T, CB> { /// A container-building session associated with a capability. /// /// This method is the prefered way of sending records that must be accumulated into a container, @@ -154,7 +154,7 @@ impl<'a, T: Timestamp, CB: ContainerBuilder> OutputBuffer<'a, T, CB> { } } -impl<'a, T: Timestamp, C: Container> OutputBuffer<'a, T, CapacityContainerBuilder> { +impl<'a, T: Timestamp, C: Container> OutputBuilderSession<'a, T, CapacityContainerBuilder> { /// A container-building session associated with a capability. /// /// This method is the prefered way of sending records that must be accumulated into a container, @@ -170,7 +170,7 @@ impl<'a, T: Timestamp, C: Container> OutputBuffer<'a, T, CapacityContainerBuilde /// An active output building session, which accepts items and builds containers. pub struct Session<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait> { - buffer: &'b mut OutputBuffer<'a, T, CB>, + buffer: &'b mut OutputBuilderSession<'a, T, CB>, capability: &'b CT, } diff --git a/timely/src/dataflow/operators/generic/mod.rs b/timely/src/dataflow/operators/generic/mod.rs index dd0742523..59a4c1a33 100644 --- a/timely/src/dataflow/operators/generic/mod.rs +++ b/timely/src/dataflow/operators/generic/mod.rs @@ -8,7 +8,7 @@ mod handles; mod notificator; mod operator_info; -pub use self::handles::{InputHandleCore, OutputBuilder, OutputBuffer, Session}; +pub use self::handles::{InputHandleCore, OutputBuilder, OutputBuilderSession, Session}; pub use self::notificator::{Notificator, FrontierNotificator}; pub use self::operator::{Operator, source}; diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index b55ca38a2..72df038a9 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -4,7 +4,7 @@ use crate::progress::frontier::MutableAntichain; use crate::dataflow::channels::pact::ParallelizationContract; -use crate::dataflow::operators::generic::handles::{InputSession, OutputBuffer, OutputBuilder}; +use crate::dataflow::operators::generic::handles::{InputSession, OutputBuilderSession, OutputBuilder}; use crate::dataflow::operators::capability::Capability; use crate::dataflow::{Scope, StreamCore}; @@ -58,7 +58,7 @@ pub trait Operator { CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain), - &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, + &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization @@ -88,7 +88,7 @@ pub trait Operator { /// ``` fn unary_notify, - &mut OutputBuffer<'_, G::Timestamp, CB>, + &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P: ParallelizationContract> (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; @@ -124,7 +124,7 @@ pub trait Operator { CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, - &mut OutputBuffer)+'static, + &mut OutputBuilderSession)+'static, P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input streams by a parallelization @@ -182,7 +182,7 @@ pub trait Operator { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain), (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain), - &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, + &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract; @@ -231,7 +231,7 @@ pub trait Operator { CB: ContainerBuilder, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, - &mut OutputBuffer<'_, G::Timestamp, CB>, + &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P1: ParallelizationContract, P2: ParallelizationContract> @@ -270,7 +270,7 @@ pub trait Operator { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, - &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, + &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract; @@ -310,7 +310,7 @@ impl Operator for StreamCore { CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain), - &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, + &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); @@ -335,7 +335,7 @@ impl Operator for StreamCore { fn unary_notify, - &mut OutputBuffer<'_, G::Timestamp, CB>, + &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P: ParallelizationContract> (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { @@ -359,7 +359,7 @@ impl Operator for StreamCore { CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, - &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, + &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); @@ -387,7 +387,7 @@ impl Operator for StreamCore { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain), (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain), - &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, + &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract { @@ -416,7 +416,7 @@ impl Operator for StreamCore { CB: ContainerBuilder, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, - &mut OutputBuffer<'_, G::Timestamp, CB>, + &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P1: ParallelizationContract, P2: ParallelizationContract> @@ -445,7 +445,7 @@ impl Operator for StreamCore { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, - &mut OutputBuffer<'_, G::Timestamp, CB>)+'static, + &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract { @@ -533,7 +533,7 @@ pub fn source(scope: &G, name: &str, constructor: B) -> Stre where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut OutputBuffer<'_, G::Timestamp, CB>)+'static { + L: FnMut(&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static { let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); let operator_info = builder.operator_info(); From fbb7ba46a0b0a565358e7b6cb2e7b611f649f9d9 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 21 Sep 2025 08:40:52 -0400 Subject: [PATCH 5/5] Improve partition.rs --- .../src/dataflow/operators/core/partition.rs | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index 2891f2e0d..49254df5a 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -54,38 +54,29 @@ impl Partition for StreamCore> = Default::default(); move |_frontiers| { let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::>(); - let mut parts = BTreeMap::>::default(); - while let Some((cap, data)) = input.next() { - todo.entry(cap.time().clone()).or_default().push(std::mem::take(data)); - caps.insert(cap.time().clone(), cap); - } - - while let Some((time, dataz)) = todo.pop_first() { - let cap = caps.remove(&time).unwrap(); - for mut data in dataz.into_iter() { - for datum in data.drain() { - let (part, datum) = route(datum); - parts.entry(part).or_default().push(datum); - } + let mut targets = BTreeMap::>::default(); + input.for_each_time(|time, data| { + // Sort data by intended output. + for datum in data.flat_map(|d| d.drain()) { + let (part, datum) = route(datum); + targets.entry(part).or_default().push(datum); } - while let Some((part, data)) = parts.pop_first() { + // Form each intended output into a container and ship. + while let Some((part, data)) = targets.pop_first() { for datum in data.into_iter() { c_build.push_into(datum); while let Some(container) = c_build.extract() { - handles[part as usize].give(&cap, container); + handles[part as usize].give(&time, container); } } while let Some(container) = c_build.finish() { - handles[part as usize].give(&cap, container); + handles[part as usize].give(&time, container); } } - } + }); } });