Skip to content

Conversation

antiguru
Copy link
Member

Introduce a ContainerBuilder that sits on the outputs of operators and gets a chance to process data as produced by the operator. While the PR breaks many APIs, it tries to preserve compatibility for all users of output.session, which we hard code to a builder that's based on the length and capacity of a container. In other places, we need a type annotation, specifically when there is no call to session, or just a call to session_with_builder, which is the equivalent to session but doesn't force a specific builder.

The PR also replaces AutoflushSession with AutoflushSessionCore, and renames the latter to avoid Core. All uses were internal and easy to fix, which should apply to external crates, too.

This is compatible with differential to offer in-place consolidation.

Do not look commit-by-commit as the history represents a worklog instead of a grouping of changes. Readers are encouraged to first look at the timely-container library.

Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru requested a review from frankmcsherry April 28, 2024 20:04
@antiguru
Copy link
Member Author

TimelyDataflow/differential-dataflow#478 shows how to integrate this with differential.

@antiguru
Copy link
Member Author

Oh, if we want to have the return-position impl trait, we'd need to move to Rust 1.75. This, or refactoring to use explicit types both work for me.

Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
///
/// For example, a consolidating builder can aggregate differences in-place, but it has
/// to ensure that it preserves the intended information.
pub trait ContainerBuilder: Default + 'static {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a comment to explain FIFO vs. not.

#[inline]
fn push<T: PushInto<Self::Container>>(&mut self, item: T) where C: PushContainer {
if self.current.capacity() == 0 {
self.current = self.empty.take().unwrap_or_default();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider discarding oddly-sized empty containers.

Clear empty.


#[inline]
fn push_container(&mut self, container: &mut Self::Container) {
if self.current.len() > 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a comment that it flushes in support of FIFO.

if self.current.len() > 0 {
self.pending.push_back(std::mem::take(&mut self.current));
}
self.pending.push_back(std::mem::replace(container, self.empty.take().unwrap_or_default()));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty protection against non-empty containers.

/// Returns a `Session`, which accepts data to send at the associated time
pub fn session(&mut self, time: &T) -> Session<T, C, P> {
pub fn session(&mut self, time: &T) -> Session<T, CapacityContainerBuilder<C>, P> {
if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just call session_with_builder. Same for auto flushing thingy.

self.buffer.push(data);
if self.buffer.len() >= C::preferred_capacity() {
self.flush();
// Gives an entire container at a specific time.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix comment :)


impl<T: Timestamp, C: Container> UnorderedHandle<T, CapacityContainerBuilder<C>> {
/// Allocates a new automatically flushing session based on the supplied capability.
pub fn session(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<T, CapacityContainerBuilder<C>, Counter<T, C, Tee<T, C>>>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call session_with_builder.

#[derive(Debug)]
pub struct OutputWrapper<T: Timestamp, C: Container, P: Push<Bundle<T, C>>> {
push_buffer: Buffer<T, C, PushCounter<T, C, P>>,
pub struct OutputWrapper<T: Timestamp, B: ContainerBuilder, P: Push<Bundle<T, B::Container>>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace B by CB, here and below.

/// ```
pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, C, PushCounter<T, C, P>> where 'a: 'b {
pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder<C>, PushCounter<T, C, P>> where 'a: 'b {
assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call session_with_builder.

Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Copy link
Member

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thank you!

@antiguru antiguru merged commit d892ee5 into TimelyDataflow:master Apr 30, 2024
@antiguru antiguru deleted the container_builder branch April 30, 2024 21:25
@github-actions github-actions bot mentioned this pull request Oct 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants