From 3e0eb5e49e673ba3d00a890ae51d36d67cbb389d Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 19 May 2019 11:06:34 -0400 Subject: [PATCH 1/2] simplify park --- timely/src/logging.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/timely/src/logging.rs b/timely/src/logging.rs index adb8ed620..00e648b85 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -198,29 +198,22 @@ pub struct InputEvent { pub start_stop: StartStop, } -#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] -/// Input logic start/stop -pub struct ParkEvent { - /// True when activity begins, false when it stops - pub event: ParkUnpark -} - -impl ParkEvent { - /// Creates a new park event from the supplied duration. - pub fn park(duration: Option) -> Self { ParkEvent { event: ParkUnpark::Park(duration) } } - /// Creates a new unpark event. - pub fn unpark() -> Self { ParkEvent { event: ParkUnpark::Unpark } } -} - /// Records the starting and stopping of an operator. #[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)] -pub enum ParkUnpark { +pub enum ParkEvent { /// Worker parks. Park(Option), /// Worker unparks. Unpark, } +impl ParkEvent { + /// Creates a new park event from the supplied duration. + pub fn park(duration: Option) -> Self { ParkEvent::Park(duration) } + /// Creates a new unpark event. + pub fn unpark() -> Self { ParkEvent::Unpark } +} + #[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)] /// An event in a timely worker pub enum TimelyEvent { From 37474012e1906b901ae333a3d78537448138acf8 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 26 May 2019 19:33:19 -0400 Subject: [PATCH 2/2] Mutable Exchange --- timely/src/dataflow/channels/pact.rs | 8 ++++---- timely/src/dataflow/channels/pushers/exchange.rs | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 0f0146d90..6a99d31d3 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -43,8 +43,8 @@ impl ParallelizationContract for Pipeline { } /// An exchange between multiple observers by data -pub struct Exchangeu64+'static> { hash_func: F, phantom: PhantomData, } -implu64> Exchange { +pub struct Exchangeu64+'static> { hash_func: F, phantom: PhantomData, } +implu64> Exchange { /// Allocates a new `Exchange` pact from a distribution function. pub fn new(func: F) -> Exchange { Exchange { @@ -55,12 +55,12 @@ implu64> Exchange { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for Exchange { +implu64+'static> ParallelizationContract for Exchange { // TODO: The closure in the type prevents us from naming it. // Could specialize `ExchangePusher` to a time-free version. type Pusher = Box>>; type Puller = Box>>; - fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { + fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); (Box::new(ExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index d0477858d..5b37a8438 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -6,14 +6,14 @@ use crate::dataflow::channels::{Bundle, Message}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H: Fn(&T, &D) -> u64> { +pub struct Exchange>, H: FnMut(&T, &D) -> u64> { pushers: Vec

, buffers: Vec>, current: Option, hash_func: H, } -impl>, H: Fn(&T, &D)->u64> Exchange { +impl>, H: FnMut(&T, &D)->u64> Exchange { /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; @@ -37,7 +37,7 @@ impl>, H: Fn(&T, &D)->u64> Exchange>, H: Fn(&T, &D)->u64> Push> for Exchange { +impl>, H: FnMut(&T, &D)->u64> Push> for Exchange { #[inline(never)] fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange