diff --git a/container/src/lib.rs b/container/src/lib.rs index 1b12c4882..6f8c30914 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -85,11 +85,11 @@ pub trait PushInto { /// /// The trait does not prescribe any specific ordering guarantees, and each implementation can /// decide to represent a push order for `extract` and `finish`, or not. -pub trait ContainerBuilder: Default + 'static { +pub trait ContainerBuilder: Default { /// The container type we're building. // The container is `Clone` because `Tee` requires it, otherwise we need to repeat it // all over Timely. `'static` because we don't want lifetimes everywhere. - type Container: Accountable + Default + Clone + 'static; + type Container; /// Extract assembled containers, potentially leaving unfinished data behind. Can /// be called repeatedly, for example while the caller can send data. /// diff --git a/logging/src/lib.rs b/logging/src/lib.rs index b6c07334c..a51ec3b4e 100644 --- a/logging/src/lib.rs +++ b/logging/src/lib.rs @@ -34,7 +34,7 @@ impl Registry { /// need to do with containers they receive and what properties to uphold. /// /// Passing a `&mut None` container to an action indicates a flush. - pub fn insert)+'static>( + pub fn insert + 'static, F: FnMut(&Duration, &mut Option)+'static>( &mut self, name: &str, action: F) -> Option> @@ -44,7 +44,7 @@ impl Registry { } /// Binds a log name to a logger. - pub fn insert_logger(&mut self, name: &str, logger: Logger) -> Option> { + pub fn insert_logger + 'static>(&mut self, name: &str, logger: Logger) -> Option> { self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0) } @@ -59,7 +59,7 @@ impl Registry { } /// Retrieves a shared logger, if one has been inserted. - pub fn get(&self, name: &str) -> Option> { + pub fn get + 'static>(&self, name: &str) -> Option> { self.map .get(name) .and_then(|entry| entry.0.downcast_ref::>()) @@ -89,11 +89,11 @@ impl Flush for Registry { } /// A buffering logger. -pub struct Logger { +pub struct Logger> { inner: Rc)>>>, } -impl Clone for Logger { +impl> Clone for Logger { fn clone(&self) -> Self { Self { inner: Rc::clone(&self.inner) @@ -101,7 +101,7 @@ impl Clone for Logger { } } -impl Debug for Logger { +impl + Debug> Debug for Logger { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Logger") .field("inner", &self.inner) @@ -109,7 +109,7 @@ impl Debug for Logger { } } -struct LoggerInner)> { +struct LoggerInner, A: ?Sized + FnMut(&Duration, &mut Option)> { /// common instant used for all loggers. time: Instant, /// offset to allow re-calibration. @@ -120,7 +120,7 @@ struct LoggerInner Logger { +impl> Logger { /// Allocates a new shareable logger bound to a write destination. pub fn new(time: Instant, offset: Duration, action: F) -> Self where @@ -185,12 +185,12 @@ impl Logger { /// /// Construct a `TypedLogger` with [`Logger::into_typed`] or by calling `into` on a `Logger`. #[derive(Debug)] -pub struct TypedLogger { +pub struct TypedLogger, T> { inner: Logger, _marker: PhantomData, } -impl TypedLogger { +impl, T> TypedLogger { /// Logs an event. Equivalent to [`Logger::log`], with the exception that it converts the /// event to `T` before logging. pub fn log>(&self, event: S) @@ -211,7 +211,7 @@ impl TypedLogger { } } -impl Clone for TypedLogger { +impl, T> Clone for TypedLogger { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -220,7 +220,7 @@ impl Clone for TypedLogger { } } -impl From> for TypedLogger { +impl, T> From> for TypedLogger { fn from(inner: Logger) -> Self { TypedLogger { inner, @@ -229,14 +229,14 @@ impl From> for TypedLogger { } } -impl std::ops::Deref for TypedLogger { +impl, T> std::ops::Deref for TypedLogger { type Target = Logger; fn deref(&self) -> &Self::Target { &self.inner } } -impl)> LoggerInner { +impl, A: ?Sized + FnMut(&Duration, &mut Option)> LoggerInner { /// Push a container with a time at an action. #[inline] fn push(action: &mut A, time: &Duration, container: &mut CB::Container) { @@ -272,7 +272,7 @@ impl)> Drop for LoggerInner { +impl, A: ?Sized + FnMut(&Duration, &mut Option)> Drop for LoggerInner { fn drop(&mut self) { self.flush(); } @@ -280,7 +280,7 @@ impl)> Debug for LoggerInner where - CB: ContainerBuilder + Debug, + CB: ContainerBuilder + Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("LoggerInner") @@ -298,7 +298,7 @@ trait Flush { fn flush(&self); } -impl Flush for Logger { +impl> Flush for Logger { fn flush(&self) { self.inner.borrow_mut().flush() } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index c99d76060..94412e433 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -2,11 +2,11 @@ //! with the performance of batched sends. use crate::communication::Push; -use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto}; +use crate::container::{CapacityContainerBuilder, PushInto}; use crate::dataflow::channels::Message; use crate::dataflow::operators::Capability; use crate::progress::Timestamp; -use crate::{Container, Accountable}; +use crate::{Container, ContainerBuilder, Accountable}; /// Buffers data sent at the same time, for efficient communication. /// diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 81e168c32..100b4988c 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -1,7 +1,8 @@ //! The exchange pattern distributes pushed data between many target pushees. +use crate::ContainerBuilder; use crate::communication::Push; -use crate::container::{ContainerBuilder, DrainContainer, PushInto}; +use crate::container::{DrainContainer, PushInto}; use crate::dataflow::channels::Message; /// Distribute containers to several pushers. diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 6a71f63b4..ea012d09f 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -3,14 +3,14 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto}; +use crate::container::{CapacityContainerBuilder, PushInto}; use crate::scheduling::{Schedule, Activator}; use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch}; use crate::progress::Source; use crate::progress::operate::Connectivity; -use crate::{Accountable, Container}; +use crate::{Accountable, Container, ContainerBuilder}; use crate::communication::Push; use crate::dataflow::{Scope, ScopeParent, StreamCore}; use crate::dataflow::channels::pushers::{Tee, Counter}; diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index 7602b4892..9c43636c1 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -1,11 +1,11 @@ //! Partition a stream of records into multiple streams. -use crate::container::{DrainContainer, ContainerBuilder, PushInto}; +use crate::container::{DrainContainer, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::operators::InputCapability; use crate::dataflow::{Scope, StreamCore}; -use crate::Container; +use crate::{Container, ContainerBuilder}; /// Partition a stream of records into multiple streams. pub trait Partition { diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index 4d2dea525..82b03e1dc 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -1,7 +1,7 @@ //! Conversion to the `StreamCore` type from iterators. -use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, PushInto}; -use crate::Container; +use crate::container::{CapacityContainerBuilder, SizableContainer, PushInto}; +use crate::{Container, ContainerBuilder}; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::{StreamCore, Scope}; diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 703a461d3..f7d7956db 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -3,8 +3,8 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::Container; -use crate::container::{ContainerBuilder, CapacityContainerBuilder}; +use crate::{Container, ContainerBuilder}; +use crate::container::{CapacityContainerBuilder}; use crate::scheduling::{Schedule, ActivateOnDrop}; diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index fee122014..5f4d42e99 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -8,8 +8,7 @@ use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::operate::SharedProgress; use crate::progress::frontier::{Antichain, MutableAntichain}; -use crate::Container; -use crate::container::ContainerBuilder; +use crate::{Container, ContainerBuilder}; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pushers::Tee; use crate::dataflow::channels::pushers::Counter as PushCounter; diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index a10ff2f40..826178e46 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -15,8 +15,8 @@ use crate::dataflow::channels::pushers::Counter as PushCounter; use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; use crate::dataflow::channels::Message; use crate::communication::{Push, Pull}; -use crate::{Container, Accountable}; -use crate::container::{ContainerBuilder, CapacityContainerBuilder}; +use crate::{Accountable, Container, ContainerBuilder}; +use crate::container::CapacityContainerBuilder; use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 79f6076e4..08e558aff 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -12,8 +12,8 @@ use crate::dataflow::{Scope, StreamCore}; use super::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OperatorInfo; use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator}; -use crate::Container; -use crate::container::{ContainerBuilder, CapacityContainerBuilder}; +use crate::{Container, ContainerBuilder}; +use crate::container::CapacityContainerBuilder; /// Methods to construct generic streaming and blocking operators. pub trait Operator { diff --git a/timely/src/lib.rs b/timely/src/lib.rs index 20f3f995e..a21c63343 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -110,6 +110,10 @@ impl Data for T { } pub trait Container: Accountable + Default + Clone + 'static { } impl Container for C { } +/// A composite trait for types usable as container builders in timely dataflow. +pub trait ContainerBuilder: timely_container::ContainerBuilder + Default + 'static {} +impl + Default + 'static> ContainerBuilder for CB {} + /// A composite trait for types usable on exchange channels in timely dataflow. /// /// The `ExchangeData` trait extends `Data` with any requirements imposed by the `timely_communication` diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 999754ecd..29f40aeda 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -205,7 +205,7 @@ pub trait AsWorker : Scheduler { /// Acquires a logger by name, if the log register exists and the name is registered. /// /// For a more precise understanding of why a result is `None` one can use the direct functions. - fn logger_for(&self, name: &str) -> Option> { + fn logger_for(&self, name: &str) -> Option> { self.log_register().and_then(|l| l.get(name)) } /// Provides access to the timely logging stream.