-
Notifications
You must be signed in to change notification settings - Fork 288
Container builder #562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Container builder #562
Conversation
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
TimelyDataflow/differential-dataflow#478 shows how to integrate this with differential. |
Oh, if we want to have the return-position impl trait, we'd need to move to Rust 1.75. This, or refactoring to use explicit types both work for me. |
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
/// | ||
/// For example, a consolidating builder can aggregate differences in-place, but it has | ||
/// to ensure that it preserves the intended information. | ||
pub trait ContainerBuilder: Default + 'static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave a comment to explain FIFO vs. not.
#[inline] | ||
fn push<T: PushInto<Self::Container>>(&mut self, item: T) where C: PushContainer { | ||
if self.current.capacity() == 0 { | ||
self.current = self.empty.take().unwrap_or_default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider discarding oddly-sized empty
containers.
Clear empty
.
container/src/lib.rs
Outdated
|
||
#[inline] | ||
fn push_container(&mut self, container: &mut Self::Container) { | ||
if self.current.len() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave a comment that it flushes in support of FIFO.
container/src/lib.rs
Outdated
if self.current.len() > 0 { | ||
self.pending.push_back(std::mem::take(&mut self.current)); | ||
} | ||
self.pending.push_back(std::mem::replace(container, self.empty.take().unwrap_or_default())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty protection against non-empty containers.
/// Returns a `Session`, which accepts data to send at the associated time | ||
pub fn session(&mut self, time: &T) -> Session<T, C, P> { | ||
pub fn session(&mut self, time: &T) -> Session<T, CapacityContainerBuilder<C>, P> { | ||
if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just call session_with_builder
. Same for auto flushing thingy.
self.buffer.push(data); | ||
if self.buffer.len() >= C::preferred_capacity() { | ||
self.flush(); | ||
// Gives an entire container at a specific time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix comment :)
|
||
impl<T: Timestamp, C: Container> UnorderedHandle<T, CapacityContainerBuilder<C>> { | ||
/// Allocates a new automatically flushing session based on the supplied capability. | ||
pub fn session(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<T, CapacityContainerBuilder<C>, Counter<T, C, Tee<T, C>>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call session_with_builder
.
#[derive(Debug)] | ||
pub struct OutputWrapper<T: Timestamp, C: Container, P: Push<Bundle<T, C>>> { | ||
push_buffer: Buffer<T, C, PushCounter<T, C, P>>, | ||
pub struct OutputWrapper<T: Timestamp, B: ContainerBuilder, P: Push<Bundle<T, B::Container>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace B by CB, here and below.
/// ``` | ||
pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, C, PushCounter<T, C, P>> where 'a: 'b { | ||
pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder<C>, PushCounter<T, C, P>> where 'a: 'b { | ||
assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call session_with_builder
.
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thank you!
Introduce a
ContainerBuilder
that sits on the outputs of operators and gets a chance to process data as produced by the operator. While the PR breaks many APIs, it tries to preserve compatibility for all users ofoutput.session
, which we hard code to a builder that's based on the length and capacity of a container. In other places, we need a type annotation, specifically when there is no call tosession
, or just a call tosession_with_builder
, which is the equivalent tosession
but doesn't force a specific builder.The PR also replaces
AutoflushSession
withAutoflushSessionCore
, and renames the latter to avoidCore
. All uses were internal and easy to fix, which should apply to external crates, too.This is compatible with differential to offer in-place consolidation.
Do not look commit-by-commit as the history represents a worklog instead of a grouping of changes. Readers are encouraged to first look at the
timely-container
library.