From e2d0555f24a76a4a7752a8180badf26472f69db8 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Mon, 19 Apr 2021 13:00:07 -0500 Subject: [PATCH 1/2] Worked on capability egronomics --- timely/src/dataflow/operators/capability.rs | 252 ++++++++++++++---- .../dataflow/operators/generic/builder_rc.rs | 3 +- .../src/dataflow/operators/generic/handles.rs | 9 +- .../dataflow/operators/generic/notificator.rs | 4 +- .../src/dataflow/operators/unordered_input.rs | 7 +- timely/src/order.rs | 4 +- timely/src/scheduling/activate.rs | 1 + timely/src/scheduling/mod.rs | 5 +- 8 files changed, 211 insertions(+), 74 deletions(-) diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 18cc5800c..6bab0a662 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -21,7 +21,7 @@ //! An operator should not hand its capabilities to some other operator. In the future, we should //! probably bind capabilities more strongly to a specific operator and output. -use std::ops::Deref; +use std::{borrow, error::Error, fmt::Display, ops::Deref}; use std::rc::Rc; use std::cell::RefCell; use std::fmt::{self, Debug}; @@ -70,8 +70,18 @@ impl CapabilityTrait for Capability { } impl Capability { + /// Creates a new capability at `time` while incrementing (and keeping a reference to) the provided + /// [`ChangeBatch`]. + pub(crate) fn new(time: T, internal: Rc>>) -> Self { + internal.borrow_mut().update(time.clone(), 1); + + Self { + time, + internal, + } + } + /// The timestamp associated with this capability. - #[inline] pub fn time(&self) -> &T { &self.time } @@ -80,33 +90,71 @@ impl Capability { /// the source capability (`self`). /// /// This method panics if `self.time` is not less or equal to `new_time`. - #[inline] pub fn delayed(&self, new_time: &T) -> Capability { - if !self.time.less_equal(new_time) { - panic!("Attempted to delay {:?} to {:?}, which is not `less_equal` the capability's time.", self, new_time); + /// Makes the panic branch cold & outlined to decrease code bloat & give + /// the inner function the best chance possible of being inlined with + /// minimal code bloat + #[cold] + #[inline(never)] + fn delayed_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! { + // Formatting & panic machinery is relatively expensive in terms of code bloat, so + // we outline it + panic!( + "Attempted to delay {:?} to {:?}, which is not `less_equal` the capability's time.", + capability, + invalid_time, + ) + } + + self.try_delayed(new_time) + .unwrap_or_else(|| delayed_panic(self, new_time)) + } + + /// Attempts to make a new capability for a timestamp `new_time` that is + /// greater or equal to the timestamp of the source capability (`self`). + /// + /// Returns [`None`] `self.time` is not less or equal to `new_time`. + pub fn try_delayed(&self, new_time: &T) -> Option> { + if self.time.less_equal(new_time) { + Some(Self::new(new_time.clone(), self.internal.clone())) + } else { + None } - mint(new_time.clone(), self.internal.clone()) } /// Downgrades the capability to one corresponding to `new_time`. /// /// This method panics if `self.time` is not less or equal to `new_time`. - #[inline] pub fn downgrade(&mut self, new_time: &T) { - let new_cap = self.delayed(new_time); - *self = new_cap; + /// Makes the panic branch cold & outlined to decrease code bloat & give + /// the inner function the best chance possible of being inlined with + /// minimal code bloat + #[cold] + #[inline(never)] + fn downgrade_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! { + // Formatting & panic machinery is relatively expensive in terms of code bloat, so + // we outline it + panic!( + "Attempted to downgrade {:?} to {:?}, which is not `less_equal` the capability's time.", + capability, + invalid_time, + ) + } + + self.try_downgrade(new_time) + .unwrap_or_else(|_| downgrade_panic(self, new_time)) } -} -/// Creates a new capability at `t` while incrementing (and keeping a reference to) the provided -/// `ChangeBatch`. -/// Declared separately so that it can be kept private when `Capability` is re-exported. -#[inline] -pub fn mint(time: T, internal: Rc>>) -> Capability { - internal.borrow_mut().update(time.clone(), 1); - Capability { - time, - internal, + /// Attempts to downgrade the capability to one corresponding to `new_time`. + /// + /// Returns a [`DowngradeError`] if `self.time` is not less or equal to `new_time`. + pub fn try_downgrade(&mut self, new_time: &T) -> Result<(), DowngradeError> { + if let Some(new_capability) = self.try_delayed(new_time) { + *self = new_capability; + Ok(()) + } else { + Err(DowngradeError(())) + } } } @@ -114,22 +162,20 @@ pub fn mint(time: T, internal: Rc>>) -> Cap // updated accordingly to inform the rest of the system that the operator has released its permit // to send data and request notification at the associated timestamp. impl Drop for Capability { - #[inline] fn drop(&mut self) { self.internal.borrow_mut().update(self.time.clone(), -1); } } impl Clone for Capability { - #[inline] fn clone(&self) -> Capability { - mint(self.time.clone(), self.internal.clone()) + Self::new(self.time.clone(), self.internal.clone()) } } impl Deref for Capability { type Target = T; - #[inline] + fn deref(&self) -> &T { &self.time } @@ -137,12 +183,14 @@ impl Deref for Capability { impl Debug for Capability { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Capability {{ time: {:?}, internal: ... }}", self.time) + f.debug_struct("Capability") + .field("time", &self.time) + .field("internal", &"...") + .finish() } } impl PartialEq for Capability { - #[inline] fn eq(&self, other: &Self) -> bool { self.time() == other.time() && Rc::ptr_eq(&self.internal, &other.internal) } @@ -150,26 +198,39 @@ impl PartialEq for Capability { impl Eq for Capability { } impl PartialOrder for Capability { - #[inline] fn less_equal(&self, other: &Self) -> bool { self.time().less_equal(other.time()) && Rc::ptr_eq(&self.internal, &other.internal) } } impl ::std::hash::Hash for Capability { - #[inline] fn hash(&self, state: &mut H) { self.time.hash(state); } } +/// An error produced when trying to downgrade a capability with a time +/// that's not less than or equal to the current capability +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct DowngradeError(()); + +impl Display for DowngradeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("could not downgrade the given capability") + } +} + +impl Error for DowngradeError {} + +type CapabilityUpdates = Rc>>>>>; + /// An unowned capability, which can be used but not retained. /// /// The capability reference supplies a `retain(self)` method which consumes the reference /// and turns it into an owned capability pub struct CapabilityRef<'cap, T: Timestamp+'cap> { time: &'cap T, - internal: Rc>>>>>, + internal: CapabilityUpdates, } impl<'cap, T: Timestamp+'cap> CapabilityTrait for CapabilityRef<'cap, T> { @@ -180,9 +241,17 @@ impl<'cap, T: Timestamp+'cap> CapabilityTrait for CapabilityRef<'cap, T> { } } -impl<'cap, T: Timestamp+'cap> CapabilityRef<'cap, T> { +impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> { + /// Creates a new capability reference at `time` while incrementing (and keeping a reference to) + /// the provided [`ChangeBatch`]. + pub(crate) fn new(time: &'cap T, internal: CapabilityUpdates) -> Self { + CapabilityRef { + time, + internal, + } + } + /// The timestamp associated with this capability. - #[inline] pub fn time(&self) -> &T { self.time } @@ -191,7 +260,6 @@ impl<'cap, T: Timestamp+'cap> CapabilityRef<'cap, T> { /// the source capability (`self`). /// /// This method panics if `self.time` is not less or equal to `new_time`. - #[inline] pub fn delayed(&self, new_time: &T) -> Capability { self.delayed_for_output(new_time, 0) } @@ -203,7 +271,7 @@ impl<'cap, T: Timestamp+'cap> CapabilityRef<'cap, T> { panic!("Attempted to delay {:?} to {:?}, which is not `less_equal` the capability's time.", self, new_time); } if output_port < self.internal.borrow().len() { - mint(new_time.clone(), self.internal.borrow()[output_port].clone()) + Capability::new(new_time.clone(), self.internal.borrow()[output_port].clone()) } else { panic!("Attempted to acquire a capability for a non-existent output port."); @@ -215,7 +283,6 @@ impl<'cap, T: Timestamp+'cap> CapabilityRef<'cap, T> { /// This method produces an owned capability which must be dropped to release the /// capability. Users should take care that these capabilities are only stored for /// as long as they are required, as failing to drop them may result in livelock. - #[inline] pub fn retain(self) -> Capability { // mint(self.time.clone(), self.internal) self.retain_for_output(0) @@ -224,7 +291,7 @@ impl<'cap, T: Timestamp+'cap> CapabilityRef<'cap, T> { /// Transforms to an owned capability for a specific output port. pub fn retain_for_output(self, output_port: usize) -> Capability { if output_port < self.internal.borrow().len() { - mint(self.time.clone(), self.internal.borrow()[output_port].clone()) + Capability::new(self.time.clone(), self.internal.borrow()[output_port].clone()) } else { panic!("Attempted to acquire a capability for a non-existent output port."); @@ -234,7 +301,7 @@ impl<'cap, T: Timestamp+'cap> CapabilityRef<'cap, T> { impl<'cap, T: Timestamp> Deref for CapabilityRef<'cap, T> { type Target = T; - #[inline] + fn deref(&self) -> &T { self.time } @@ -242,24 +309,15 @@ impl<'cap, T: Timestamp> Deref for CapabilityRef<'cap, T> { impl<'cap, T: Timestamp> Debug for CapabilityRef<'cap, T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "CapabilityRef {{ time: {:?}, internal: ... }}", self.time) - } -} - - -/// Creates a new capability at `t` while incrementing (and keeping a reference to) the provided -/// `ChangeBatch`. -/// Declared separately so that it can be kept private when `Capability` is re-exported. -#[inline] -pub fn mint_ref<'cap, T: Timestamp>(time: &'cap T, internal: Rc>>>>>) -> CapabilityRef<'cap, T> { - CapabilityRef { - time, - internal, + f.debug_struct("CapabilityRef") + .field("time", &self.time) + .field("internal", &"...") + .finish() } } /// Capability that activates on drop. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ActivateCapability { pub(crate) capability: Capability, pub(crate) address: Rc>, @@ -282,10 +340,12 @@ impl ActivateCapability { activations, } } + /// The timestamp associated with this capability. pub fn time(&self) -> &T { self.capability.time() } + /// Creates a new delayed capability. pub fn delayed(&self, time: &T) -> Self { ActivateCapability { @@ -294,16 +354,17 @@ impl ActivateCapability { activations: self.activations.clone(), } } + /// Downgrades this capability. pub fn downgrade(&mut self, time: &T) { self.capability.downgrade(time); - self.activations.borrow_mut().activate(&self.address[..]); + self.activations.borrow_mut().activate(&self.address); } } impl Drop for ActivateCapability { fn drop(&mut self) { - self.activations.borrow_mut().activate(&self.address[..]); + self.activations.borrow_mut().activate(&self.address); } } @@ -317,7 +378,12 @@ impl CapabilitySet { /// Allocates an empty capability set. pub fn new() -> Self { - CapabilitySet { elements: Vec::new() } + Self { elements: Vec::new() } + } + + /// Allocates an empty capability set with space for `capacity` elements + pub fn with_capacity(capacity: usize) -> Self { + Self { elements: Vec::with_capacity(capacity) } } /// Allocates a capability set containing a single capability. @@ -350,7 +416,7 @@ impl CapabilitySet { /// }); /// ``` pub fn from_elem(cap: Capability) -> Self { - CapabilitySet { elements: vec![cap] } + Self { elements: vec![cap] } } /// Inserts `capability` into the set, discarding redundant capabilities. @@ -365,7 +431,33 @@ impl CapabilitySet { /// /// This method panics if there does not exist a capability in `self.elements` less or equal to `time`. pub fn delayed(&self, time: &T) -> Capability { - self.elements.iter().find(|c| c.time().less_equal(time)).unwrap().delayed(time) + /// Makes the panic branch cold & outlined to decrease code bloat & give + /// the inner function the best chance possible of being inlined with + /// minimal code bloat + #[cold] + #[inline(never)] + fn delayed_panic(invalid_time: &dyn Debug) -> ! { + // Formatting & panic machinery is relatively expensive in terms of code bloat, so + // we outline it + panic!( + "failed to create a delayed capability, the current set does not \ + have an element less than or equal to {:?}", + invalid_time, + ) + } + + self.try_delayed(time) + .unwrap_or_else(|| delayed_panic(time)) + } + + /// Attempts to create a new capability to send data at `time`. + /// + /// Returns [`None`] if there does not exist a capability in `self.elements` less or equal to `time`. + pub fn try_delayed(&self, time: &T) -> Option> { + self.elements + .iter() + .find(|capability| capability.time().less_equal(time)) + .and_then(|capability| capability.try_delayed(time)) } /// Downgrades the set of capabilities to correspond with the times in `frontier`. @@ -373,15 +465,61 @@ impl CapabilitySet { /// This method panics if any element of `frontier` is not greater or equal to some element of `self.elements`. pub fn downgrade(&mut self, frontier: F) where - B: std::borrow::Borrow, - F: IntoIterator, + B: borrow::Borrow, + F: IntoIterator, + { + /// Makes the panic branch cold & outlined to decrease code bloat & give + /// the inner function the best chance possible of being inlined with + /// minimal code bloat + #[cold] + #[inline(never)] + fn downgrade_panic() -> ! { + // Formatting & panic machinery is relatively expensive in terms of code bloat, so + // we outline it + panic!( + "Attempted to downgrade a CapabilitySet with a frontier containing elements \ + which where were not not `less_equal` than all elements within the set" + ) + } + + self.try_downgrade(frontier) + .unwrap_or_else(|_| downgrade_panic()) + } + + /// Attempts to downgrade the set of capabilities to correspond with the times in `frontier`. + /// + /// Returns [`None`] if any element of `frontier` is not greater or equal to some element of `self.elements`. + /// + /// **Warning**: If an error is returned the capability set may be in an inconsistent state and can easily + /// cause logic errors within the program if not properly handled. + /// + pub fn try_downgrade(&mut self, frontier: F) -> Result<(), DowngradeError> + where + B: borrow::Borrow, + F: IntoIterator, { let count = self.elements.len(); for time in frontier.into_iter() { - let capability = self.delayed(time.borrow()); + let capability = self.try_delayed(time.borrow()).ok_or(DowngradeError(()))?; self.elements.push(capability); } self.elements.drain(..count); + + Ok(()) + } +} + +impl From>> for CapabilitySet +where + T: Timestamp, +{ + fn from(capabilities: Vec>) -> Self { + let mut this = Self::with_capacity(capabilities.len()); + for capability in capabilities { + this.insert(capability); + } + + this } } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index e2e263dd5..bca5f908c 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -17,7 +17,6 @@ use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::capability::Capability; -use crate::dataflow::operators::capability::mint as mint_capability; use crate::dataflow::operators::generic::handles::{InputHandle, new_input_handle, OutputWrapper}; use crate::dataflow::operators::generic::operator_info::OperatorInfo; @@ -138,7 +137,7 @@ impl OperatorBuilder { // create capabilities, discard references to their creation. let mut capabilities = Vec::with_capacity(self.internal.borrow().len()); for batch in self.internal.borrow().iter() { - capabilities.push(mint_capability(G::Timestamp::minimum(), batch.clone())); + capabilities.push(Capability::new(G::Timestamp::minimum(), batch.clone())); // Discard evidence of creation, as we are assumed to start with one. batch.borrow_mut().clear(); } diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index f84396762..d8a0303d4 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -18,7 +18,6 @@ use crate::communication::{Push, Pull, message::RefOrMut}; use crate::logging::TimelyLogger as Logger; use crate::dataflow::operators::CapabilityRef; -use crate::dataflow::operators::capability::mint_ref as mint_capability_ref; use crate::dataflow::operators::capability::CapabilityTrait; /// Handle to an operator's input stream. @@ -47,10 +46,10 @@ impl<'a, T: Timestamp, D: Data, P: Pull>> InputHandle { self.pull_counter.next().map(|bundle| { match bundle.as_ref_or_mut() { RefOrMut::Ref(bundle) => { - (mint_capability_ref(&bundle.time, internal.clone()), RefOrMut::Ref(&bundle.data)) + (CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Ref(&bundle.data)) }, RefOrMut::Mut(bundle) => { - (mint_capability_ref(&bundle.time, internal.clone()), RefOrMut::Mut(&mut bundle.data)) + (CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Mut(&mut bundle.data)) }, } }) @@ -81,10 +80,10 @@ impl<'a, T: Timestamp, D: Data, P: Pull>> InputHandle { while let Some((cap, data)) = self.pull_counter.next().map(|bundle| { match bundle.as_ref_or_mut() { RefOrMut::Ref(bundle) => { - (mint_capability_ref(&bundle.time, internal.clone()), RefOrMut::Ref(&bundle.data)) + (CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Ref(&bundle.data)) }, RefOrMut::Mut(bundle) => { - (mint_capability_ref(&bundle.time, internal.clone()), RefOrMut::Mut(&mut bundle.data)) + (CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Mut(&mut bundle.data)) }, } }) { diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index ded5ab7f2..ae3ad741c 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -109,11 +109,11 @@ fn notificator_delivers_notifications_in_topo_order() { use crate::progress::ChangeBatch; use crate::progress::frontier::MutableAntichain; use crate::order::Product; - use crate::dataflow::operators::capability::mint as mint_capability; + use crate::dataflow::operators::capability::Capability; let mut frontier = MutableAntichain::new_bottom(Product::new(0, 0)); - let root_capability = mint_capability(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new()))); + let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new()))); let logging = None;//::logging::new_inactive_logger(); diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index d39fab9a2..89b3d1faf 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -14,8 +14,7 @@ use crate::Data; use crate::dataflow::channels::pushers::{Tee, Counter as PushCounter}; use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession}; -use crate::dataflow::operators::ActivateCapability; -use crate::dataflow::operators::capability::mint as mint_capability; +use crate::dataflow::operators::{ActivateCapability, Capability}; use crate::dataflow::{Stream, Scope}; @@ -84,7 +83,7 @@ impl UnorderedInput for G { let (output, registrar) = Tee::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); // let produced = Rc::new(RefCell::new(ChangeBatch::new())); - let cap = mint_capability(G::Timestamp::minimum(), internal.clone()); + let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); let counter = PushCounter::new(output); let produced = counter.produced().clone(); let peers = self.peers(); @@ -93,7 +92,7 @@ impl UnorderedInput for G { let mut address = self.addr(); address.push(index); - let cap = ActivateCapability::new(cap, &address[..], self.activations().clone()); + let cap = ActivateCapability::new(cap, &address, self.activations()); let helper = UnorderedHandle::new(counter); diff --git a/timely/src/order.rs b/timely/src/order.rs index 0564c0a62..1d6d2f3cf 100644 --- a/timely/src/order.rs +++ b/timely/src/order.rs @@ -29,8 +29,8 @@ macro_rules! implement_partial { ($($index_type:ty,)*) => ( $( impl PartialOrder for $index_type { - #[inline] fn less_than(&self, other: &Self) -> bool { self < other } - #[inline] fn less_equal(&self, other: &Self) -> bool { self <= other } + fn less_than(&self, other: &Self) -> bool { self < other } + fn less_equal(&self, other: &Self) -> bool { self <= other } } )* ) diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index 844c03091..0b49f1fda 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -37,6 +37,7 @@ impl Scheduler for Box { } /// Allocation-free activation tracker. +#[derive(Debug)] pub struct Activations { clean: usize, /// `(offset, length)` diff --git a/timely/src/scheduling/mod.rs b/timely/src/scheduling/mod.rs index f9f438e3f..dc2ea35a0 100644 --- a/timely/src/scheduling/mod.rs +++ b/timely/src/scheduling/mod.rs @@ -24,11 +24,12 @@ pub trait Schedule { pub trait Scheduler { /// Provides a shared handle to the activation scheduler. fn activations(&self) -> Rc>; + /// Constructs an `Activator` tied to the specified operator address. fn activator_for(&self, path: &[usize]) -> Activator { - let activations = self.activations().clone(); - Activator::new(path, activations) + Activator::new(path, self.activations()) } + /// Constructs a `SyncActivator` tied to the specified operator address. fn sync_activator_for(&self, path: &[usize]) -> SyncActivator { let sync_activations = self.activations().borrow().sync(); From 9814f92aac8988b3054ccf58b7d3c7402f8b8e52 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Tue, 20 Apr 2021 12:38:58 -0500 Subject: [PATCH 2/2] Re-added #[inline] for PartialOrder --- timely/src/order.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/timely/src/order.rs b/timely/src/order.rs index 1d6d2f3cf..0564c0a62 100644 --- a/timely/src/order.rs +++ b/timely/src/order.rs @@ -29,8 +29,8 @@ macro_rules! implement_partial { ($($index_type:ty,)*) => ( $( impl PartialOrder for $index_type { - fn less_than(&self, other: &Self) -> bool { self < other } - fn less_equal(&self, other: &Self) -> bool { self <= other } + #[inline] fn less_than(&self, other: &Self) -> bool { self < other } + #[inline] fn less_equal(&self, other: &Self) -> bool { self <= other } } )* )