From 44a4248aa71c485a0fe84801f5ccbc361fa479c3 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Wed, 16 Mar 2022 14:14:53 +0100 Subject: [PATCH 1/2] exchange: remove one layer of boxing We were previously forced to use a boxed trait object because we were unable to name the type of the constructed closure. This patch fixes it by introducing a custom `ParallelizationHasher` trait that the `ParallelizationContractCore` expects with a blanket implementation for all closures taking two arguments. This is to maintain backwards compatibility with any code that passes a closure directly to `ExchangeCore::new`. Then a wrapper type `DataHasher` is introduced that allows us to both specialize the `ParallelizationHasher` implementation for single argument closures and at the same time name the type which is what we need to remove one layer of boxed trait objects. Signed-off-by: Petros Angelatos --- timely/src/dataflow/channels/pact.rs | 13 +++---- .../src/dataflow/channels/pushers/exchange.rs | 38 ++++++++++++++++--- timely/src/dataflow/channels/pushers/mod.rs | 2 +- 3 files changed, 40 insertions(+), 13 deletions(-) diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 6ef17ab31..9433f4b6f 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -15,7 +15,7 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::Container; use crate::worker::AsWorker; -use crate::dataflow::channels::pushers::Exchange as ExchangePusher; +use crate::dataflow::channels::pushers::{Exchange as ExchangePusher, DataHasher}; use super::{BundleCore, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; @@ -73,14 +73,13 @@ implu64+'static> ParallelizationC where C: Data + Container + PushPartitioned, { - // TODO: The closure in the type prevents us from naming it. - // Could specialize `ExchangePusher` to a time-free version. - type Pusher = Box>>; - type Puller = Box>>; - fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { + type Pusher = ExchangePusher>>>, DataHasher>; + type Puller = LogPuller>>>; + + fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (Box::new(ExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) + (ExchangePusher::new(senders, DataHasher::new(self.hash_func)), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) } } diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 6ddca7332..f3f41953a 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -5,9 +5,37 @@ use crate::{Container, Data}; use crate::communication::Push; use crate::dataflow::channels::{BundleCore, Message}; +/// Hashes a pair of time and data in order to distribute records +pub trait ParallelizationHasher { + /// Computes the distribution function + fn hash(&mut self, time: &T, data: &D) -> u64; +} + +impl u64> ParallelizationHasher for F { + fn hash(&mut self, time: &T, data: &D) -> u64 { + (self)(time, data) + } +} + +/// A ParallizationContract hasher that only consideres the data +pub struct DataHasher(F); + +impl u64> ParallelizationHasher for DataHasher { + fn hash(&mut self, _time: &T, data: &D) -> u64 { + (self.0)(data) + } +} + +impl DataHasher { + /// Construct a new data hasher with the given function + pub fn new(hash_func: F) -> Self { + Self(hash_func) + } +} + // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H: FnMut(&T, &D) -> u64> { +pub struct Exchange>, H: ParallelizationHasher> { pushers: Vec

, buffers: Vec, current: Option, @@ -15,7 +43,7 @@ pub struct Exchange>, H: FnMut(&T, phantom: std::marker::PhantomData, } -impl>, H: FnMut(&T, &D)->u64> Exchange { +impl>, H: ParallelizationHasher> Exchange { /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; @@ -40,7 +68,7 @@ impl>, H: FnMut(&T, &D } } -impl>, H: FnMut(&T, &D)->u64> Push> for Exchange +impl>, H: ParallelizationHasher> Push> for Exchange where C: PushPartitioned { @@ -72,7 +100,7 @@ where let pushers = &mut self.pushers; data.push_partitioned( &mut self.buffers, - move |datum| ((hash_func)(time, datum) & mask) as usize, + move |datum| (hash_func.hash(time, datum) & mask) as usize, |index, buffer| { Message::push_at(buffer, time.clone(), &mut pushers[index]); } @@ -84,7 +112,7 @@ where let pushers = &mut self.pushers; data.push_partitioned( &mut self.buffers, - move |datum| ((hash_func)(time, datum) % num_pushers) as usize, + move |datum| (hash_func.hash(time, datum) % num_pushers) as usize, |index, buffer| { Message::push_at(buffer, time.clone(), &mut pushers[index]); } diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 2ab6d4f9f..04d13f5f9 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -1,5 +1,5 @@ pub use self::tee::{Tee, TeeCore, TeeHelper}; -pub use self::exchange::Exchange; +pub use self::exchange::{Exchange, DataHasher}; pub use self::counter::{Counter, CounterCore}; pub mod tee; From d8e60c949bf4e831419589a62406b1d32e938ffb Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Wed, 16 Mar 2022 17:24:01 +0100 Subject: [PATCH 2/2] exchange: disallow exchanging based on time Signed-off-by: Petros Angelatos --- timely/src/dataflow/channels/pact.rs | 6 +-- .../src/dataflow/channels/pushers/exchange.rs | 38 +++---------------- timely/src/dataflow/channels/pushers/mod.rs | 2 +- 3 files changed, 9 insertions(+), 37 deletions(-) diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 9433f4b6f..976023a19 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -15,7 +15,7 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::Container; use crate::worker::AsWorker; -use crate::dataflow::channels::pushers::{Exchange as ExchangePusher, DataHasher}; +use crate::dataflow::channels::pushers::Exchange as ExchangePusher; use super::{BundleCore, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; @@ -73,13 +73,13 @@ implu64+'static> ParallelizationC where C: Data + Container + PushPartitioned, { - type Pusher = ExchangePusher>>>, DataHasher>; + type Pusher = ExchangePusher>>>, F>; type Puller = LogPuller>>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (ExchangePusher::new(senders, DataHasher::new(self.hash_func)), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) } } diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index f3f41953a..9ea271d31 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -5,37 +5,9 @@ use crate::{Container, Data}; use crate::communication::Push; use crate::dataflow::channels::{BundleCore, Message}; -/// Hashes a pair of time and data in order to distribute records -pub trait ParallelizationHasher { - /// Computes the distribution function - fn hash(&mut self, time: &T, data: &D) -> u64; -} - -impl u64> ParallelizationHasher for F { - fn hash(&mut self, time: &T, data: &D) -> u64 { - (self)(time, data) - } -} - -/// A ParallizationContract hasher that only consideres the data -pub struct DataHasher(F); - -impl u64> ParallelizationHasher for DataHasher { - fn hash(&mut self, _time: &T, data: &D) -> u64 { - (self.0)(data) - } -} - -impl DataHasher { - /// Construct a new data hasher with the given function - pub fn new(hash_func: F) -> Self { - Self(hash_func) - } -} - // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H: ParallelizationHasher> { +pub struct Exchange>, H: FnMut(&D) -> u64> { pushers: Vec

, buffers: Vec, current: Option, @@ -43,7 +15,7 @@ pub struct Exchange>, H: Paralleliz phantom: std::marker::PhantomData, } -impl>, H: ParallelizationHasher> Exchange { +impl>, H: FnMut(&D) -> u64> Exchange { /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; @@ -68,7 +40,7 @@ impl>, H: Parallelizat } } -impl>, H: ParallelizationHasher> Push> for Exchange +impl>, H: FnMut(&D) -> u64> Push> for Exchange where C: PushPartitioned { @@ -100,7 +72,7 @@ where let pushers = &mut self.pushers; data.push_partitioned( &mut self.buffers, - move |datum| (hash_func.hash(time, datum) & mask) as usize, + move |datum| ((hash_func)(datum) & mask) as usize, |index, buffer| { Message::push_at(buffer, time.clone(), &mut pushers[index]); } @@ -112,7 +84,7 @@ where let pushers = &mut self.pushers; data.push_partitioned( &mut self.buffers, - move |datum| (hash_func.hash(time, datum) % num_pushers) as usize, + move |datum| ((hash_func)(datum) % num_pushers) as usize, |index, buffer| { Message::push_at(buffer, time.clone(), &mut pushers[index]); } diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 04d13f5f9..2ab6d4f9f 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -1,5 +1,5 @@ pub use self::tee::{Tee, TeeCore, TeeHelper}; -pub use self::exchange::{Exchange, DataHasher}; +pub use self::exchange::Exchange; pub use self::counter::{Counter, CounterCore}; pub mod tee;