-
Notifications
You must be signed in to change notification settings - Fork 288
Container stream (without Allocation) #426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Container stream (without Allocation) #426
Conversation
c23a05f to
98d19ab
Compare
98d19ab to
6e223e1
Compare
e3d2a72 to
18713cb
Compare
|
This PR has a good basis for further experimentation. I tried implementing a word-count-like experiment to show the benefit of the changes, and here I'd like to capture what was difficult on the way:
Additional traitsI took the current
OwnershipThe traits are defined to return owned data. This limits flexibility for the backing container types. For example, the |
18713cb to
d5291a5
Compare
24031cc to
9cf6c3b
Compare
|
I restructured the traits around exchanging data. In the past, I had two traits to extract data and push data, but this proved to be inadequate for containers only returning references to items instead of owned data. The new approach is to encapsulate the logic to exchange data within a Here is a sample implementation for (The actual implementation ensures allocated buffers) This seems to be simpler to implement, has less type constraints and offers more flexibility. |
|
We also include a |
Ownership of data revisitedI'd like to analyze an area of dataflow programming that Timely traditionally didn't focus on much. Currently, all data in Timely is stored in vectors, which allow to move owned data, for example when draining. Additionally, vectors are constructed from owned data. Many of the operator-level abstractions in Timely are based on this fact, and offer little flexibility to adjust do different settings. Region-allocated data as offered by the Let's look at a concrete example. The The From an API perspective this is unfortunate because references and owned data offer different semantics, and it's hard to build an interface around accepting both while providing equivalent performance. To me, it currently seems we have the following options:
The good news is that the changes contained in this PR don't seem to conflict with any of the proposals I've noted down. |
This sounds most appealing to me, of course. :D I think at the moment we have several existing operators we might like to use, like If the |
|
I came up with a The closure receives a |
4e41e3f to
b1ca0aa
Compare
|
I cleanup up this PR into three commits:
We could extract the last one to a separate PR if that's easier. |
Introduces a container abstraction to enable generic data on dataflow edges. A container describes its contents, has a length and capacity, can be cleared. Containers implementing the PushPartitioned trait are suitable for exchanging across Timely workers. Signed-off-by: Moritz Hoffmann <[email protected]>
Add *Core variants of core Timely infrastructure that is generic in the data passed along dataflow edges. Signed-off-by: Moritz Hoffmann <[email protected]>
This is mostly a copy of ColumnStack plus a few implementations. We'd like to keep the columnar library mostly untouched and hence have our own variant within Timely. Signed-off-by: Moritz Hoffmann <[email protected]>
b1ca0aa to
6015eff
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this review, I point out places where the buffer allocation strategy would be altered. While in the past, we'd try to send default-capacity buffers even if the weren't full, we might send non-default-capacity buffers in the current version of this PR. We should probably come to a conclusion if this is acceptable or if the current behavior should be restored.
| if let Some(message) = message { | ||
| for index in 1..pushers.len() { | ||
| self.buffer.extend_from_slice(&message.data); | ||
| self.buffer.clone_from(&message.data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change of behavior: In the past, all buffers had at least default capacity. After this change, the buffers passed around might be oddly sized.
| Event::Messages(ref time, ref data) => { | ||
| output.session(time).give_iterator(data.iter().cloned()); | ||
| EventCore::Messages(ref time, data) => { | ||
| allocation.clone_from(data); | ||
| output.session(time).give_container(&mut allocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change of behavior: In the past, event data was sorted into new buffers, potentially many if the data would exceed the size of a single buffer. Now, it'll clone and pass whatever it received, which will likely result in non-default capacity buffers passed around.
| self.buffer2.extend_from_slice(&self.buffer1[..]); | ||
| self.buffer2.clone_from(&self.buffer1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another place of potentially non-default capacity buffers.
| self.buffer2.extend_from_slice(&buffer[..]); | ||
| self.buffer2.clone_from(&buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another place of potentially non-default capacity buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good to land, and to start experimenting with! Thanks for the effort and the patience.
| pub trait Container: Default + Clone + 'static { | ||
| /// The type of elements this container holds. | ||
| type Item; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was expecting Deref here (and perhaps binding Item). I guess maybe it isn't needed? Will continue reading to see, but interesting that Container does not provide access to the contents.
This fixes a regression introduced by TimelyDataflow#426 where BufferCore::give now lacked the inline annotation. In pingpong, this changes the runtime as follows: Without inline: target/release/examples/pingpong 1 500000000 -w1 2.45s user 0.00s system 99% cpu 2.446 total With inline: target/release/examples/pingpong 1 500000000 -w1 1.75s user 0.00s system 99% cpu 1.755 total Signed-off-by: Moritz Hoffmann <[email protected]>
This fixes a regression introduced by #426 where BufferCore::give now lacked the inline annotation. In pingpong, this changes the runtime as follows: Without inline: target/release/examples/pingpong 1 500000000 -w1 2.45s user 0.00s system 99% cpu 2.446 total With inline: target/release/examples/pingpong 1 500000000 -w1 1.75s user 0.00s system 99% cpu 1.755 total Signed-off-by: Moritz Hoffmann <[email protected]>
Preview of Timely transferring general
Containertypes, but without theContainer::Allocationvariant.This introduces a requirement of
Container: Defaultto enable the following pattern:Previously,
containerwould have been initialized toVec::new(), which corresponds to the default element forVec. The allocation variant ofContaineravoided this requirement by externalizing a possible allocation into anOption<Container::Allocation>.It seems to me that adding the default requirement is acceptable.