Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions timely/examples/unordered_input.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use timely::dataflow::operators::*;
use timely::Config;
// use timely::progress::timestamp::RootTimestamp;

fn main() {
timely::execute(Config::thread(), |worker| {
Expand All @@ -11,7 +10,7 @@ fn main() {
});

for round in 0..10 {
input.session(cap.clone()).give(round);
input.activate().session(&cap).give(round);
cap = cap.delayed(&(round + 1));
worker.step();
}
Expand Down
255 changes: 0 additions & 255 deletions timely/src/dataflow/channels/pushers/buffer.rs

This file was deleted.

8 changes: 8 additions & 0 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ impl<T, P> Counter<T, P> where T : Ord+Clone+'static {
pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
&self.produced
}
/// Ships a time and a container.
///
/// This is not a validated capability, and this method should not be used without great care.
/// Ideally, users would not have direct access to a `Counter`, and preventing this is the way
/// to uphold invariants.
#[inline] pub fn give<C: crate::Container>(&mut self, time: T, container: &mut C) where P: Push<Message<T, C>> {
if !container.is_empty() { Message::push_at(container, time, &mut self.pushee); }
}
}
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ where
}
self.distributor.relax();
for index in 0..self.pushers.len() {
self.pushers[index].push(&mut None);
self.pushers[index].done();
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion timely/src/dataflow/channels/pushers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ pub use self::counter::Counter;
pub mod tee;
pub mod exchange;
pub mod counter;
pub mod buffer;
pub mod progress;

/// An output pusher which validates capabilities, records progress, and tees output.
pub type Output<T, C> = progress::Progress<T, counter::Counter<T, tee::Tee<T, C>>>;
/// An output session that will flush the output when dropped.
pub type OutputSession<'a, T, C> = progress::ProgressSession<'a, T, C, counter::Counter<T, tee::Tee<T, C>>>;
67 changes: 67 additions & 0 deletions timely/src/dataflow/channels/pushers/progress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! A wrapper that allows containers to be sent by validating capabilities.

use std::rc::Rc;
use std::cell::RefCell;

use crate::progress::{ChangeBatch, Timestamp};
use crate::dataflow::channels::Message;
use crate::dataflow::operators::CapabilityTrait;
use crate::communication::Push;
use crate::Container;

/// A wrapper that allows containers to be sent by validating capabilities.
#[derive(Debug)]
pub struct Progress<T, P> {
pushee: P,
internal: Rc<RefCell<ChangeBatch<T>>>,
port: usize,
}

impl<T: Timestamp, P> Progress<T, P> {
/// Ships a container using a provided capability.
///
/// On return, the container may hold undefined contents and should be cleared before it is reused.
#[inline] pub fn give<C: Container, CT: CapabilityTrait<T>>(&mut self, capability: &CT, container: &mut C) where P: Push<Message<T, C>> {
debug_assert!(self.valid(capability), "Attempted to open output session with invalid capability");
if !container.is_empty() { Message::push_at(container, capability.time().clone(), &mut self.pushee); }
}
/// Activates a `Progress` into a `ProgressSession` which will flush when dropped.
pub fn activate<'a, C>(&'a mut self) -> ProgressSession<'a, T, C, P> where P: Push<Message<T, C>> {
ProgressSession {
borrow: self,
marker: std::marker::PhantomData,
}
}
/// Determines if the capability is valid for this output.
pub fn valid<CT: CapabilityTrait<T>>(&self, capability: &CT) -> bool {
capability.valid_for_output(&self.internal, self.port)
}
}

impl<T, P> Progress<T, P> where T : Ord+Clone+'static {
/// Allocates a new `Progress` from a pushee and capability validation information.
pub fn new(pushee: P, internal: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
Self { pushee, internal, port }
}
}

/// A session that provides access to a `Progress` but will flush when dropped.
///
/// The type of the container `C` must be known, as long as the flushing action requires a specific `Push` implementation.
pub struct ProgressSession<'a, T: Timestamp, C, P: Push<Message<T, C>>> {
borrow: &'a mut Progress<T, P>,
marker: std::marker::PhantomData<C>,
}

impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> std::ops::Deref for ProgressSession<'a, T, C, P> {
type Target = Progress<T, P>;
fn deref(&self) -> &Self::Target { self.borrow }
}

impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> std::ops::DerefMut for ProgressSession<'a, T, C, P> {
fn deref_mut(&mut self) -> &mut Self::Target { self.borrow }
}

impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> Drop for ProgressSession<'a, T, C, P> {
fn drop(&mut self) { self.borrow.pushee.done(); }
}
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<T: Data, C: Container> Push<Message<T, C>> for Tee<T, C> {
}
else {
for index in 1..pushers.len() {
pushers[index-1].push(&mut None);
pushers[index-1].done();
}
}
if !pushers.is_empty() {
Expand Down
Loading
Loading