Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ impl<T: 'static, D: 'static> ParallelizationContract<T, D> for Pipeline {
}

/// An exchange between multiple observers by data
pub struct Exchange<D, F: Fn(&D)->u64+'static> { hash_func: F, phantom: PhantomData<D>, }
impl<D, F: Fn(&D)->u64> Exchange<D, F> {
pub struct Exchange<D, F: FnMut(&D)->u64+'static> { hash_func: F, phantom: PhantomData<D>, }
impl<D, F: FnMut(&D)->u64> Exchange<D, F> {
/// Allocates a new `Exchange` pact from a distribution function.
pub fn new(func: F) -> Exchange<D, F> {
Exchange {
Expand All @@ -55,12 +55,12 @@ impl<D, F: Fn(&D)->u64> Exchange<D, F> {
}

// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Eq+Data+Clone, D: Data+Clone, F: Fn(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<D, F> {
impl<T: Eq+Data+Clone, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<D, F> {
// TODO: The closure in the type prevents us from naming it.
// Could specialize `ExchangePusher` to a time-free version.
type Pusher = Box<Push<Bundle<T, D>>>;
type Puller = Box<Pull<Bundle<T, D>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
fn connect<A: AsWorker>(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, D>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(Box::new(ExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone())))
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use crate::dataflow::channels::{Bundle, Message};

// TODO : Software write combining
/// Distributes records among target pushees according to a distribution function.
pub struct Exchange<T, D, P: Push<Bundle<T, D>>, H: Fn(&T, &D) -> u64> {
pub struct Exchange<T, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D) -> u64> {
pushers: Vec<P>,
buffers: Vec<Vec<D>>,
current: Option<T>,
hash_func: H,
}

impl<T: Clone, D, P: Push<Bundle<T, D>>, H: Fn(&T, &D)->u64> Exchange<T, D, P, H> {
impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Exchange<T, D, P, H> {
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, D, P, H> {
let mut buffers = vec![];
Expand All @@ -37,7 +37,7 @@ impl<T: Clone, D, P: Push<Bundle<T, D>>, H: Fn(&T, &D)->u64> Exchange<T, D, P,
}
}

impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: Fn(&T, &D)->u64> Push<Bundle<T, D>> for Exchange<T, D, P, H> {
impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bundle<T, D>> for Exchange<T, D, P, H> {
#[inline(never)]
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
// if only one pusher, no exchange
Expand Down
23 changes: 8 additions & 15 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,29 +198,22 @@ pub struct InputEvent {
pub start_stop: StartStop,
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// Input logic start/stop
pub struct ParkEvent {
/// True when activity begins, false when it stops
pub event: ParkUnpark
}

impl ParkEvent {
/// Creates a new park event from the supplied duration.
pub fn park(duration: Option<Duration>) -> Self { ParkEvent { event: ParkUnpark::Park(duration) } }
/// Creates a new unpark event.
pub fn unpark() -> Self { ParkEvent { event: ParkUnpark::Unpark } }
}

/// Records the starting and stopping of an operator.
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
pub enum ParkUnpark {
pub enum ParkEvent {
/// Worker parks.
Park(Option<Duration>),
/// Worker unparks.
Unpark,
}

impl ParkEvent {
/// Creates a new park event from the supplied duration.
pub fn park(duration: Option<Duration>) -> Self { ParkEvent::Park(duration) }
/// Creates a new unpark event.
pub fn unpark() -> Self { ParkEvent::Unpark }
}

#[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// An event in a timely worker
pub enum TimelyEvent {
Expand Down