Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 22 additions & 57 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::collections::VecDeque;

/// An type containing a number of records accounted for by progress tracking.
/// A type containing a number of records accounted for by progress tracking.
///
/// The object stores a number of updates and thus is able to describe it count
/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
Expand All @@ -19,23 +19,10 @@ pub trait Accountable {
fn record_count(&self) -> i64;

/// Determine if this contains any updates, corresponding to `update_count() == 0`.
/// It is a correctness error for this to by anything other than `self.record_count() == 0`.
/// It is a correctness error for this to be anything other than `self.record_count() == 0`.
#[inline] fn is_empty(&self) -> bool { self.record_count() == 0 }
}

/// A container that allows iteration morally equivalent to [`IntoIterator`].
///
/// Iterating the container presents items in an implementation-specific order.
/// The container's contents are not changed.
pub trait IterContainer {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;
/// Iterator type when reading from the container.
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
/// Returns an iterator that reads the contents of this container.
fn iter(&self) -> Self::Iter<'_>;
}

/// A container that can drain itself.
///
/// Draining the container presents items in an implementation-specific order.
Expand Down Expand Up @@ -191,14 +178,6 @@ impl<T> Accountable for Vec<T> {
#[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
}

impl<T> IterContainer for Vec<T> {
type ItemRef<'a> = &'a T where T: 'a;
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
#[inline] fn iter(&self) -> Self::Iter<'_> {
self.as_slice().iter()
}
}

impl<T> DrainContainer for Vec<T> {
type Item<'a> = T where T: 'a;
type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
Expand Down Expand Up @@ -246,46 +225,32 @@ impl<T: Clone> PushInto<&&T> for Vec<T> {
}

mod rc {
use std::ops::Deref;
use std::rc::Rc;

use crate::{IterContainer, DrainContainer};

impl<T: crate::Accountable> crate::Accountable for Rc<T> {
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
}
impl<T: IterContainer> IterContainer for Rc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Iter<'a> = T::Iter<'a> where Self: 'a;
#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
impl<T: crate::Accountable> crate::Accountable for std::rc::Rc<T> {
#[inline] fn record_count(&self) -> i64 { self.as_ref().record_count() }
#[inline] fn is_empty(&self) -> bool { self.as_ref().is_empty() }
}
impl<T: IterContainer> DrainContainer for Rc<T> {
type Item<'a> = T::ItemRef<'a> where Self: 'a;
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
impl<T> crate::DrainContainer for std::rc::Rc<T>
where
for<'a> &'a T: IntoIterator
{
type Item<'a> = <&'a T as IntoIterator>::Item where Self: 'a;
type DrainIter<'a> = <&'a T as IntoIterator>::IntoIter where Self: 'a;
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.into_iter() }
}
}

mod arc {
use std::ops::Deref;
use std::sync::Arc;

use crate::{IterContainer, DrainContainer};

impl<T: crate::Accountable> crate::Accountable for Arc<T> {
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
}
impl<T: IterContainer> IterContainer for Arc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Iter<'a> = T::Iter<'a> where Self: 'a;
#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
impl<T: crate::Accountable> crate::Accountable for std::sync::Arc<T> {
#[inline] fn record_count(&self) -> i64 { self.as_ref().record_count() }
#[inline] fn is_empty(&self) -> bool { self.as_ref().is_empty() }
}
impl<T: IterContainer> DrainContainer for Arc<T> {
type Item<'a> = T::ItemRef<'a> where Self: 'a;
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
impl<T> crate::DrainContainer for std::sync::Arc<T>
where
for<'a> &'a T: IntoIterator
{
type Item<'a> = <&'a T as IntoIterator>::Item where Self: 'a;
type DrainIter<'a> = <&'a T as IntoIterator>::IntoIter where Self: 'a;
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.into_iter() }
}
}

Expand Down
29 changes: 18 additions & 11 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

use std::collections::HashMap;

use timely::container::{IterContainer, CapacityContainerBuilder};
use columnar::Index;
use timely::Accountable;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
use timely::dataflow::InputHandleCore;
use timely::dataflow::operators::{Inspect, Operator, Probe};
use timely::dataflow::operators::{InspectCore, Operator, Probe};
use timely::dataflow::ProbeHandle;

// Creates `WordCountContainer` and `WordCountReference` structs,
Expand Down Expand Up @@ -44,7 +46,7 @@ fn main() {
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session(&time);
for wordcount in data.iter().flat_map(|wordcount| {
for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
}) {
session.give(wordcount);
Expand Down Expand Up @@ -73,7 +75,7 @@ fn main() {
if !input.frontier().less_equal(key.time()) {
let mut session = output.session(key);
for batch in val.drain(..) {
for wordcount in batch.iter() {
for wordcount in batch.borrow().into_index_iter() {
let total =
if let Some(count) = counts.get_mut(wordcount.text) {
*count += wordcount.diff;
Expand All @@ -94,7 +96,17 @@ fn main() {
},
)
.container::<Container>()
.inspect(|x| println!("seen: {:?}", x))
.inspect_container(|x| {
match x {
Ok((time, data)) => {
println!("seen at: {:?}\t{:?} records", time, data.record_count());
for wc in data.borrow().into_index_iter() {
println!(" {}: {}", wc.text, wc.diff);
}
},
Err(frontier) => println!("frontier advanced to {:?}", frontier),
}
})
.probe_with(&probe);
});

Expand Down Expand Up @@ -167,7 +179,7 @@ mod container {

impl<C: columnar::ContainerBytes> Column<C> {
/// Borrows the contents no matter their representation.
#[inline(always)] fn borrow(&self) -> C::Borrowed<'_> {
#[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> {
match self {
Column::Typed(t) => t.borrow(),
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
Expand All @@ -180,11 +192,6 @@ mod container {
#[inline] fn record_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
#[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
}
impl<C: columnar::ContainerBytes> timely::container::IterContainer for Column<C> {
type ItemRef<'a> = C::Ref<'a>;
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }
}
impl<C: columnar::ContainerBytes> timely::container::DrainContainer for Column<C> {
type Item<'a> = C::Ref<'a>;
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub trait Input : Scope {
/// ```
/// use std::rc::Rc;
/// use timely::*;
/// use timely::dataflow::operators::core::{Input, Inspect};
/// use timely::dataflow::operators::core::{Input, InspectCore};
/// use timely::container::CapacityContainerBuilder;
///
/// // construct and execute a timely dataflow
Expand All @@ -84,7 +84,7 @@ pub trait Input : Scope {
/// // add an input and base computation off of it
/// let mut input = worker.dataflow(|scope| {
/// let (input, stream) = scope.new_input_with_builder::<CapacityContainerBuilder<Rc<Vec<_>>>>();
/// stream.inspect(|x| println!("hello {:?}", x));
/// stream.inspect_container(|x| println!("hello {:?}", x));
/// input
/// });
///
Expand Down
19 changes: 12 additions & 7 deletions timely/src/dataflow/operators/core/inspect.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
//! Extension trait and implementation for observing and action on streamed data.

use crate::Container;
use crate::container::IterContainer;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::operators::generic::Operator;

/// Methods to inspect records and batches of records on a stream.
pub trait Inspect<G: Scope, C: IterContainer>: InspectCore<G, C> + Sized {
pub trait Inspect<G: Scope, C>: InspectCore<G, C> + Sized
where
for<'a> &'a C: IntoIterator,
{
/// Runs a supplied closure on each observed data element.
///
/// # Examples
Expand All @@ -21,10 +23,10 @@ pub trait Inspect<G: Scope, C: IterContainer>: InspectCore<G, C> + Sized {
/// ```
fn inspect<F>(&self, mut func: F) -> Self
where
F: for<'a> FnMut(C::ItemRef<'a>) + 'static,
F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,
{
self.inspect_batch(move |_, data| {
for datum in data.iter() { func(datum); }
for datum in data.into_iter() { func(datum); }
})
}

Expand All @@ -41,10 +43,10 @@ pub trait Inspect<G: Scope, C: IterContainer>: InspectCore<G, C> + Sized {
/// ```
fn inspect_time<F>(&self, mut func: F) -> Self
where
F: for<'a> FnMut(&G::Timestamp, C::ItemRef<'a>) + 'static,
F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static,
{
self.inspect_batch(move |time, data| {
for datum in data.iter() {
for datum in data.into_iter() {
func(time, datum);
}
})
Expand Down Expand Up @@ -91,7 +93,10 @@ pub trait Inspect<G: Scope, C: IterContainer>: InspectCore<G, C> + Sized {
fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
}

impl<G: Scope, C: Container + IterContainer> Inspect<G, C> for StreamCore<G, C> {
impl<G: Scope, C: Container> Inspect<G, C> for StreamCore<G, C>
where
for<'a> &'a C: IntoIterator,
{
fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
self.inspect_container(func)
}
Expand Down
7 changes: 4 additions & 3 deletions timely/src/dataflow/operators/core/rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ pub trait SharedStream<S: Scope, C> {
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Inspect};
/// use timely::dataflow::operators::{ToStream, InspectCore};
/// use timely::dataflow::operators::rc::SharedStream;
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .shared()
/// .inspect(|x| println!("seen: {:?}", x));
/// .inspect_container(|x| println!("seen: {:?}", x));
/// });
/// ```
fn shared(&self) -> StreamCore<S, Rc<C>>;
Expand All @@ -43,12 +43,13 @@ mod test {
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::capture::Extract;
use crate::dataflow::operators::rc::SharedStream;
use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream};
use crate::dataflow::operators::{Capture, Concatenate, InspectCore, Operator, ToStream};

#[test]
fn test_shared() {
let output = crate::example(|scope| {
let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
let shared = shared.inspect_container(|x| println!("seen: {x:?}"));
scope
.concatenate([
shared.unary(Pipeline, "read shared 1", |_, _| {
Expand Down
Loading