Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion container/src/flatcontainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ impl<R: Region + Clone + 'static, T: CopyOnto<R>> PushInto<FlatStack<R>> for T {
fn push_into(self, target: &mut FlatStack<R>) {
target.copy(self);
}
}
}
114 changes: 108 additions & 6 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#![forbid(missing_docs)]

use std::collections::VecDeque;

pub mod columnation;
pub mod flatcontainer;

Expand Down Expand Up @@ -63,12 +65,6 @@ pub trait PushInto<C> {

/// A type that has the necessary infrastructure to push elements, without specifying how pushing
/// itself works. For this, pushable types should implement [`PushInto`].
// TODO: Reconsider this interface because it assumes
// * Containers have a capacity
// * Push presents single elements.
// * Instead of testing `len == cap`, we could have a `is_full` to test that we might
// not be able to absorb more data.
// * Example: A FlatStack with optimized offsets and deduplication can absorb many elements without reallocation. What does capacity mean in this context?
pub trait PushContainer: Container {
/// Push `item` into self
#[inline]
Expand All @@ -83,6 +79,112 @@ pub trait PushContainer: Container {
fn reserve(&mut self, additional: usize);
}

/// A type that can build containers from items.
///
/// An implementation needs to absorb elements, and later reveal equivalent information
/// chunked into individual containers, but is free to change the data representation to
/// better fit the properties of the container.
///
/// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns
/// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`]
/// needs to produce all outputs, even partial ones.
///
/// For example, a consolidating builder can aggregate differences in-place, but it has
/// to ensure that it preserves the intended information.
///
/// The trait does not prescribe any specific ordering guarantees, and each implementation can
/// decide to represent a `push`/`push_container` order for `extract` and `finish`, or not.
// TODO: Consider adding `push_iterator` to receive an iterator of data.
pub trait ContainerBuilder: Default + 'static {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a comment to explain FIFO vs. not.

/// The container type we're building.
type Container: Container;
/// Add an item to a container.
fn push<T: PushInto<Self::Container>>(&mut self, item: T) where Self::Container: PushContainer;
/// Push a pre-built container.
fn push_container(&mut self, container: &mut Self::Container);
/// Extract assembled containers, potentially leaving unfinished data behind.
fn extract(&mut self) -> Option<&mut Self::Container>;
/// Extract assembled containers and any unfinished data.
fn finish(&mut self) -> Option<&mut Self::Container>;
}

/// A default container builder that uses length and preferred capacity to chunk data.
///
/// Maintains FIFO order.
#[derive(Default, Debug)]
pub struct CapacityContainerBuilder<C>{
/// Container that we're writing to.
current: C,
/// Emtpy allocation.
empty: Option<C>,
/// Completed containers pending to be sent.
pending: VecDeque<C>,
}

impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {
type Container = C;

#[inline]
fn push<T: PushInto<Self::Container>>(&mut self, item: T) where C: PushContainer {
if self.current.capacity() == 0 {
self.current = self.empty.take().unwrap_or_default();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider discarding oddly-sized empty containers.

Clear empty.

// Discard any non-uniform capacity container.
if self.current.capacity() != C::preferred_capacity() {
self.current = C::default();
}
// Protect against non-emptied containers.
self.current.clear();
}
// Ensure capacity
if self.current.capacity() < C::preferred_capacity() {
self.current.reserve(C::preferred_capacity() - self.current.len());
}

// Push item
self.current.push(item);

// Maybe flush
if self.current.len() == self.current.capacity() {
self.pending.push_back(std::mem::take(&mut self.current));
}
}

#[inline]
fn push_container(&mut self, container: &mut Self::Container) {
if !container.is_empty() {
// Flush to maintain FIFO ordering.
if self.current.len() > 0 {
self.pending.push_back(std::mem::take(&mut self.current));
}

let mut empty = self.empty.take().unwrap_or_default();
// Ideally, we'd discard non-uniformly sized containers, but we don't have
// access to `len`/`capacity` of the container.
empty.clear();

self.pending.push_back(std::mem::replace(container, empty));
}
}

#[inline]
fn extract(&mut self) -> Option<&mut C> {
if let Some(container) = self.pending.pop_front() {
self.empty = Some(container);
self.empty.as_mut()
} else {
None
}
}

#[inline]
fn finish(&mut self) -> Option<&mut C> {
if self.current.len() > 0 {
self.pending.push_back(std::mem::take(&mut self.current));
}
self.extract()
}
}

impl<T: Clone + 'static> Container for Vec<T> {
type ItemRef<'a> = &'a T where T: 'a;
type Item<'a> = T where T: 'a;
Expand Down
4 changes: 2 additions & 2 deletions mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ fn main() {
let mut session = output.session(&time);
if let Some(list) = stash.remove(time.time()) {
for mut vector in list.into_iter() {
session.give_vec(&mut vector);
session.give_container(&mut vector);
}
}
});
Expand Down Expand Up @@ -261,7 +261,7 @@ fn main() {
if frontiers.iter().all(|f| !f.less_equal(time.time())) {
let mut session = output.session(&time);
for mut vector in list.drain(..) {
session.give_vec(&mut vector);
session.give_container(&mut vector);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion timely/examples/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate timely;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::{Feedback, ConnectLoop};
use timely::dataflow::operators::generic::operator::Operator;
use timely::container::CapacityContainerBuilder;

fn main() {

Expand All @@ -12,7 +13,7 @@ fn main() {

worker.dataflow(move |scope| {
let (handle, stream) = scope.feedback::<Vec<usize>>(1);
stream.unary_notify(
stream.unary_notify::<CapacityContainerBuilder<_>, _, _>(
Pipeline,
"Barrier",
vec![0],
Expand Down
Loading