Skip to content

Conversation

@antiguru
Copy link
Member

Preview of Timely transferring general Container types, but without the Container::Allocation variant.

This introduces a requirement of Container: Default to enable the following pattern:

let container = Default::default();
...
input.for_each(|time, data| {
    data.swap(&mut container);
    ....
}

Previously, container would have been initialized to Vec::new(), which corresponds to the default element for Vec. The allocation variant of Container avoided this requirement by externalizing a possible allocation into an Option<Container::Allocation>.

It seems to me that adding the default requirement is acceptable.

@antiguru antiguru marked this pull request as ready for review October 24, 2021 17:04
@antiguru antiguru force-pushed the container_stream_rework branch 3 times, most recently from c23a05f to 98d19ab Compare November 3, 2021 10:30
@antiguru antiguru force-pushed the container_stream_rework branch from 98d19ab to 6e223e1 Compare November 15, 2021 22:48
@antiguru antiguru force-pushed the container_stream_rework branch from e3d2a72 to 18713cb Compare November 16, 2021 20:36
@antiguru
Copy link
Member Author

This PR has a good basis for further experimentation. I tried implementing a word-count-like experiment to show the benefit of the changes, and here I'd like to capture what was difficult on the way:

  • Exchange is currently only defined for Vec<T>. This is a major obstacle as it's crucial that data is distributed. We have two options: Either let clients define their own exchange implementation for their container type, or make the current exchange generic such that more containers could benefit from the existing infrastructure.
  • It's unclear to me where the EfficientDataTransfer trait should live. In Timely, a common pattern is to not rely on the exponential growth of vectors but rather allocate a constant capacity, and only use this without reallocations. My idea of the trait was to give implementing types a hint when it would be a good idea to make sure that they're allocated and can absorb some capacity (which they define) of elements, possibly without reallocation.
  • There are several places in Timely where buffer sizes are calculated based on the size of elements, which feels awkward. We might want to promote this to be part of the container definition.

Additional traits

I took the current Exchange implementation and tried to add a minimal set of traits to encapsulate its requirements on exchanged types.

  • Containers with a content type. Currently, Timely containers do not communicate what data they store.
    pub trait ItemContainer: Container {
        /// The type of contained item.
        type Item: 'static;
    }
    
  • Pushing data on an existing container:
    pub trait PushContainer: ItemContainer {
        fn push(&mut self, item: Self::Item);
    }
    
  • Extracting data from a container. This is a trait that should be implemented for &mut C, shown here with an implementation for vectors.
    pub trait DrainContainer {
        type Item;
        type Drain: Iterator<Item=Self::Item>;
        fn drain(self) -> Self::Drain;
    }
    impl<'a, T> DrainContainer for &'a mut Vec<T> {
        type Item = T;
        type Drain = ::std::vec::Drain<'a, Self::Item>;
        fn drain(self) -> Self::Drain {
            Vec::drain(self, ..)
        }
    }
    
  • The EfficientDataTransfer::ensure_allocated() call always happens before a call to Vec::push, so we could instead switch to PushContainer::push and let the type itself handle the allocation.
  • Currently, containers only have a length. We could add a capacity to container or PushContainer to enable the exchange to determine when a container is full.

Ownership

The traits are defined to return owned data. This limits flexibility for the backing container types. For example, the ColumnStack container only provides references to its contents to avoid reallocations. With the above traits, the DrainContainer needs to clone the contents, which would then immediately be copied to a new ColumnStack. The API is not wide enough to accommodate for passing out a reference hand copying references into a receiving container. We could re-use the RefOrMut pattern here.

@antiguru antiguru force-pushed the container_stream_rework branch from 18713cb to d5291a5 Compare November 30, 2021 16:14
@antiguru antiguru force-pushed the container_stream_rework branch 3 times, most recently from 24031cc to 9cf6c3b Compare December 10, 2021 16:21
@antiguru
Copy link
Member Author

I restructured the traits around exchanging data. In the past, I had two traits to extract data and push data, but this proved to be inadequate for containers only returning references to items instead of owned data. The new approach is to encapsulate the logic to exchange data within a PushPartitioned trait:

pub trait PushPartitioned: Container {
    /// Partition and push this container.
    ///
    /// Drain all elements from `self`, and use the function `hash` to determine which `buffer` to
    /// append an element to. Call `flush` with an index and a buffer to send the data downstream.
    fn push_partitioned<H, F>(&mut self, buffers: &mut Vec<Self>, hash: H, flush: F)
    where
        H: FnMut(&Self::Item) -> usize,
        F: FnMut(usize, &mut Self);
}

Here is a sample implementation for Vec<T>:

impl<T: Clone + 'static> PushPartitioned for Vec<T> {
    fn push_partitioned<H, F>(&mut self, buffers: &mut Vec<Self>, mut hash: H, mut flush: F)
    where
        H: FnMut(&Self::Item) -> usize,
        F: FnMut(usize, &mut Self),
    {
        for datum in self.drain(..) {
            let index = hash(&datum);
            buffers[index].push(datum);
            if buffers[index].len() == buffers[index].capacity() {
                flush(index, &mut buffers[index]);
            }
        }
    }
}

(The actual implementation ensures allocated buffers) This seems to be simpler to implement, has less type constraints and offers more flexibility.

@antiguru
Copy link
Member Author

We also include a TimelyStack type, which is mostly a copy-paste of ColumnStack from the columnation crate. The idea is that we can add features to the TimelyStack independently of the columnation crate, which should really only serve as a library providing the Columnation and region abstractions. The ColumnStack is more of an example.

@antiguru
Copy link
Member Author

Ownership of data revisited

I'd like to analyze an area of dataflow programming that Timely traditionally didn't focus on much. Currently, all data in Timely is stored in vectors, which allow to move owned data, for example when draining. Additionally, vectors are constructed from owned data. Many of the operator-level abstractions in Timely are based on this fact, and offer little flexibility to adjust do different settings.

Region-allocated data as offered by the columnation crate manages ownership of allocations that back types. For this reason, it only grants immutable access to data, because otherwise it couldn't enforce the integrity of its region allocation. When adding data to a region, its contents are reconstructed within the region and don't involve an ownership transfer/move. Both are counter to what Timely currently assumes.

Let's look at a concrete example. The Map trait offers the following function:

fn map<D2: Data, L: FnMut(D) -> D2 + 'static>(&self, logic: L) -> Stream<S, D2>

The map function takes a generic function, which receives owned Ds and produces owned D2s. It doesn't offer the flexibility to work with references, which we'd like to have for some containers.

From an API perspective this is unfortunate because references and owned data offer different semantics, and it's hard to build an interface around accepting both while providing equivalent performance. To me, it currently seems we have the following options:

  • Provide compatibility with the current traits, such as the Map trait. We can achieve this by extracting references from region-allocated data, cloning the data, passing it to whatever closure we need to run. The closure returns owned data, which we copy into the region allocation and drop after. We add two allocations. If we're lucky the optimizer realizes this and doesn't actually create the clones, but it's not an ideal design.
  • Deliberately don't provide compatibility with the current API for anything but vector-based containers. This makes it difficult for clients to switch to a different container.
  • Add a new API layer that offers more flexibility by providing RefOrMut (or a fictional RefOrOwned) to closures, and a mechanism to produce data. I haven't figured out what the mechanism to produce data would look like.

The good news is that the changes contained in this PR don't seem to conflict with any of the proposals I've noted down.

@frankmcsherry
Copy link
Member

Provide compatibility with the current traits, such as the Map trait.

This sounds most appealing to me, of course. :D

I think at the moment we have several existing operators we might like to use, like map, that work "just fine" if we use the same reference-based infra (i.e. RefOrMut, as per abomonated data, not causing distributed timely to melt or anything). If folks want to tap in and provide even more specialized implementations (e.g. a container to container map) that sounds great, we should support that, but I think it is fine to expect a person to write their own operator implementation at that point.

If the InputHandle::next iterator provides containers, and containers are obliged to implement to_ref_or_mut(), would this step in the right direction? Right now we have RefOrMut<Vec<D>> hard-wired into that method return, but generalizing it slightly to just a moment before that happens seems .. doable? I don't want to say "easy" without knowing, but doable.

@antiguru
Copy link
Member Author

antiguru commented Dec 14, 2021

I came up with a MapRef trait that seems to be roughly in line with what Map currently does, but works for both vector- and columnation-based containers:

/// Extension trait for [StreamCore] to map elements by reference.
pub trait MapRef<S, C, D, CO>
    where
        S: Scope,
        C: Container<Item=D>,
        D: Data,
        CO: Container,
{
    /// Observes each element of the stream and yields new elements.
    ///
    /// # Examples
    /// ```
    /// use timely::container::columnation::TimelyStack;
    /// use timely::dataflow::operators::{ToStreamCore, MapRef, InspectCore};
    ///
    /// timely::example(|scope| {
    ///     Some((0..10).collect::<TimelyStack<_>>())
    ///         .to_stream_core(scope)
    ///         .map_ref(|x, session| session.copy(&(*x + 1)))
    ///         .inspect_container(|x| println!("seen: {:?}", x));
    /// });
    /// ```
    fn map_ref<L>(&self, logic: L) -> StreamCore<S, CO>
        where
            L: FnMut(RefOrMut<D>, &mut Session<S::Timestamp, CO, CounterCore<S::Timestamp, CO, TeeCore<S::Timestamp, CO>>>) + 'static;
}

The closure receives a RefOrMut<D>, and a session to write its outputs to. This means that the only specialization is in writing the data to the session. (give is only available for Vec<T> containers, and will be because otherwise Rust's type inference can't determine the expected container type in many cases, and we'd break backwards compatibility.) See 4e41e3f for related changes.

@antiguru
Copy link
Member Author

I cleanup up this PR into three commits:

  • Container abstraction
  • Timely generalization
  • TimelyStack type

We could extract the last one to a separate PR if that's easier.

Introduces a container abstraction to enable generic data on dataflow
edges. A container describes its contents, has a length and capacity,
can be cleared. Containers implementing the PushPartitioned trait are
suitable for exchanging across Timely workers.

Signed-off-by: Moritz Hoffmann <[email protected]>
Add *Core variants of core Timely infrastructure that is generic in the
data passed along dataflow edges.

Signed-off-by: Moritz Hoffmann <[email protected]>
This is mostly a copy of ColumnStack plus a few implementations. We'd like
to keep the columnar library mostly untouched and hence have our own
variant within Timely.

Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru force-pushed the container_stream_rework branch from b1ca0aa to 6015eff Compare December 16, 2021 10:17
Copy link
Member Author

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this review, I point out places where the buffer allocation strategy would be altered. While in the past, we'd try to send default-capacity buffers even if the weren't full, we might send non-default-capacity buffers in the current version of this PR. We should probably come to a conclusion if this is acceptable or if the current behavior should be restored.

if let Some(message) = message {
for index in 1..pushers.len() {
self.buffer.extend_from_slice(&message.data);
self.buffer.clone_from(&message.data);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change of behavior: In the past, all buffers had at least default capacity. After this change, the buffers passed around might be oddly sized.

Comment on lines -98 to +101
Event::Messages(ref time, ref data) => {
output.session(time).give_iterator(data.iter().cloned());
EventCore::Messages(ref time, data) => {
allocation.clone_from(data);
output.session(time).give_container(&mut allocation);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change of behavior: In the past, event data was sorted into new buffers, potentially many if the data would exceed the size of a single buffer. Now, it'll clone and pass whatever it received, which will likely result in non-default capacity buffers passed around.

Comment on lines -277 to +355
self.buffer2.extend_from_slice(&self.buffer1[..]);
self.buffer2.clone_from(&self.buffer1);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another place of potentially non-default capacity buffers.

Comment on lines -326 to +419
self.buffer2.extend_from_slice(&buffer[..]);
self.buffer2.clone_from(&buffer);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another place of potentially non-default capacity buffers.

Copy link
Member

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good to land, and to start experimenting with! Thanks for the effort and the patience.

Comment on lines +19 to +21
pub trait Container: Default + Clone + 'static {
/// The type of elements this container holds.
type Item;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting Deref here (and perhaps binding Item). I guess maybe it isn't needed? Will continue reading to see, but interesting that Container does not provide access to the contents.

@frankmcsherry frankmcsherry merged commit 84ff8bf into TimelyDataflow:master Jan 14, 2022
@antiguru antiguru deleted the container_stream_rework branch January 19, 2023 01:44
antiguru added a commit to antiguru/timely-dataflow that referenced this pull request Mar 7, 2023
This fixes a regression introduced by TimelyDataflow#426 where BufferCore::give now
lacked the inline annotation. In pingpong, this changes the runtime as
follows:

Without inline:
target/release/examples/pingpong 1 500000000 -w1  2.45s user 0.00s system 99% cpu 2.446 total

With inline:
target/release/examples/pingpong 1 500000000 -w1  1.75s user 0.00s system 99% cpu 1.755 total

Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru mentioned this pull request Mar 7, 2023
frankmcsherry pushed a commit that referenced this pull request Mar 7, 2023
This fixes a regression introduced by #426 where BufferCore::give now
lacked the inline annotation. In pingpong, this changes the runtime as
follows:

Without inline:
target/release/examples/pingpong 1 500000000 -w1  2.45s user 0.00s system 99% cpu 2.446 total

With inline:
target/release/examples/pingpong 1 500000000 -w1  1.75s user 0.00s system 99% cpu 1.755 total

Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru mentioned this pull request Mar 17, 2023
@github-actions github-actions bot mentioned this pull request Oct 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants