Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ pub trait PushInto<T> {
///
/// The trait does not prescribe any specific ordering guarantees, and each implementation can
/// decide to represent a push order for `extract` and `finish`, or not.
pub trait ContainerBuilder: Default + 'static {
pub trait ContainerBuilder: Default {
/// The container type we're building.
// The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
// all over Timely. `'static` because we don't want lifetimes everywhere.
type Container: Accountable + Default + Clone + 'static;
type Container;
/// Extract assembled containers, potentially leaving unfinished data behind. Can
/// be called repeatedly, for example while the caller can send data.
///
Expand Down
34 changes: 17 additions & 17 deletions logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Registry {
/// need to do with containers they receive and what properties to uphold.
///
/// Passing a `&mut None` container to an action indicates a flush.
pub fn insert<CB: ContainerBuilder, F: FnMut(&Duration, &mut Option<CB::Container>)+'static>(
pub fn insert<CB: ContainerBuilder<Container: Default> + 'static, F: FnMut(&Duration, &mut Option<CB::Container>)+'static>(
&mut self,
name: &str,
action: F) -> Option<Box<dyn Any>>
Expand All @@ -44,7 +44,7 @@ impl Registry {
}

/// Binds a log name to a logger.
pub fn insert_logger<CB: ContainerBuilder>(&mut self, name: &str, logger: Logger<CB>) -> Option<Box<dyn Any>> {
pub fn insert_logger<CB: ContainerBuilder<Container: Default> + 'static>(&mut self, name: &str, logger: Logger<CB>) -> Option<Box<dyn Any>> {
self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0)
}

Expand All @@ -59,7 +59,7 @@ impl Registry {
}

/// Retrieves a shared logger, if one has been inserted.
pub fn get<CB: ContainerBuilder>(&self, name: &str) -> Option<Logger<CB>> {
pub fn get<CB: ContainerBuilder<Container: Default> + 'static>(&self, name: &str) -> Option<Logger<CB>> {
self.map
.get(name)
.and_then(|entry| entry.0.downcast_ref::<Logger<CB>>())
Expand Down Expand Up @@ -89,27 +89,27 @@ impl Flush for Registry {
}

/// A buffering logger.
pub struct Logger<CB: ContainerBuilder> {
pub struct Logger<CB: ContainerBuilder<Container: Default>> {
inner: Rc<RefCell<LoggerInner<CB, dyn FnMut(&Duration, &mut Option<CB::Container>)>>>,
}

impl<CB: ContainerBuilder> Clone for Logger<CB> {
impl<CB: ContainerBuilder<Container: Default>> Clone for Logger<CB> {
fn clone(&self) -> Self {
Self {
inner: Rc::clone(&self.inner)
}
}
}

impl<CB: ContainerBuilder + Debug> Debug for Logger<CB> {
impl<CB: ContainerBuilder<Container: Default> + Debug> Debug for Logger<CB> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Logger")
.field("inner", &self.inner)
.finish()
}
}

struct LoggerInner<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> {
struct LoggerInner<CB: ContainerBuilder<Container: Default>, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> {
/// common instant used for all loggers.
time: Instant,
/// offset to allow re-calibration.
Expand All @@ -120,7 +120,7 @@ struct LoggerInner<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Optio
action: A,
}

impl<CB: ContainerBuilder> Logger<CB> {
impl<CB: ContainerBuilder<Container: Default>> Logger<CB> {
/// Allocates a new shareable logger bound to a write destination.
pub fn new<F>(time: Instant, offset: Duration, action: F) -> Self
where
Expand Down Expand Up @@ -185,12 +185,12 @@ impl<CB: ContainerBuilder> Logger<CB> {
///
/// Construct a `TypedLogger` with [`Logger::into_typed`] or by calling `into` on a `Logger`.
#[derive(Debug)]
pub struct TypedLogger<CB: ContainerBuilder, T> {
pub struct TypedLogger<CB: ContainerBuilder<Container: Default>, T> {
inner: Logger<CB>,
_marker: PhantomData<T>,
}

impl<CB: ContainerBuilder, T> TypedLogger<CB, T> {
impl<CB: ContainerBuilder<Container: Default>, T> TypedLogger<CB, T> {
/// Logs an event. Equivalent to [`Logger::log`], with the exception that it converts the
/// event to `T` before logging.
pub fn log<S: Into<T>>(&self, event: S)
Expand All @@ -211,7 +211,7 @@ impl<CB: ContainerBuilder, T> TypedLogger<CB, T> {
}
}

impl<CB: ContainerBuilder, T> Clone for TypedLogger<CB, T> {
impl<CB: ContainerBuilder<Container: Default>, T> Clone for TypedLogger<CB, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
Expand All @@ -220,7 +220,7 @@ impl<CB: ContainerBuilder, T> Clone for TypedLogger<CB, T> {
}
}

impl<CB: ContainerBuilder, T> From<Logger<CB>> for TypedLogger<CB, T> {
impl<CB: ContainerBuilder<Container: Default>, T> From<Logger<CB>> for TypedLogger<CB, T> {
fn from(inner: Logger<CB>) -> Self {
TypedLogger {
inner,
Expand All @@ -229,14 +229,14 @@ impl<CB: ContainerBuilder, T> From<Logger<CB>> for TypedLogger<CB, T> {
}
}

impl<CB: ContainerBuilder, T> std::ops::Deref for TypedLogger<CB, T> {
impl<CB: ContainerBuilder<Container: Default>, T> std::ops::Deref for TypedLogger<CB, T> {
type Target = Logger<CB>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> LoggerInner<CB, A> {
impl<CB: ContainerBuilder<Container: Default>, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> LoggerInner<CB, A> {
/// Push a container with a time at an action.
#[inline]
fn push(action: &mut A, time: &Duration, container: &mut CB::Container) {
Expand Down Expand Up @@ -272,15 +272,15 @@ impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Containe
}

/// Flush on the *last* drop of a logger.
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Drop for LoggerInner<CB, A> {
impl<CB: ContainerBuilder<Container: Default>, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Drop for LoggerInner<CB, A> {
fn drop(&mut self) {
self.flush();
}
}

impl<CB, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Debug for LoggerInner<CB, A>
where
CB: ContainerBuilder + Debug,
CB: ContainerBuilder<Container: Default> + Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LoggerInner")
Expand All @@ -298,7 +298,7 @@ trait Flush {
fn flush(&self);
}

impl<CB: ContainerBuilder> Flush for Logger<CB> {
impl<CB: ContainerBuilder<Container: Default>> Flush for Logger<CB> {
fn flush(&self) {
self.inner.borrow_mut().flush()
}
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
//! with the performance of batched sends.

use crate::communication::Push;
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
use crate::container::{CapacityContainerBuilder, PushInto};
use crate::dataflow::channels::Message;
use crate::dataflow::operators::Capability;
use crate::progress::Timestamp;
use crate::{Container, Accountable};
use crate::{Container, ContainerBuilder, Accountable};

/// Buffers data sent at the same time, for efficient communication.
///
Expand Down
3 changes: 2 additions & 1 deletion timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! The exchange pattern distributes pushed data between many target pushees.

use crate::ContainerBuilder;
use crate::communication::Push;
use crate::container::{ContainerBuilder, DrainContainer, PushInto};
use crate::container::{DrainContainer, PushInto};
use crate::dataflow::channels::Message;

/// Distribute containers to several pushers.
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
use std::rc::Rc;
use std::cell::RefCell;

use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
use crate::container::{CapacityContainerBuilder, PushInto};

use crate::scheduling::{Schedule, Activator};

use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
use crate::progress::Source;
use crate::progress::operate::Connectivity;
use crate::{Accountable, Container};
use crate::{Accountable, Container, ContainerBuilder};
use crate::communication::Push;
use crate::dataflow::{Scope, ScopeParent, StreamCore};
use crate::dataflow::channels::pushers::{Tee, Counter};
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/core/partition.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! Partition a stream of records into multiple streams.

use crate::container::{DrainContainer, ContainerBuilder, PushInto};
use crate::container::{DrainContainer, PushInto};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::operators::InputCapability;
use crate::dataflow::{Scope, StreamCore};
use crate::Container;
use crate::{Container, ContainerBuilder};

/// Partition a stream of records into multiple streams.
pub trait Partition<G: Scope, C: DrainContainer> {
Expand Down Expand Up @@ -67,7 +67,7 @@
todo.sort_unstable_by(|a, b| a.0.cmp(&b.0));

for (cap, mut data) in todo.drain(..) {
if sessions_cap.as_ref().map_or(true, |s_cap| s_cap.time() != cap.time()) {

Check warning on line 70 in timely/src/dataflow/operators/core/partition.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

this `map_or` can be simplified
sessions = handles.iter_mut().map(|h| (None, Some(h))).collect();
sessions_cap = Some(cap);
}
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/core/to_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Conversion to the `StreamCore` type from iterators.

use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, PushInto};
use crate::Container;
use crate::container::{CapacityContainerBuilder, SizableContainer, PushInto};
use crate::{Container, ContainerBuilder};
use crate::dataflow::operators::generic::operator::source;
use crate::dataflow::{StreamCore, Scope};

Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use std::rc::Rc;
use std::cell::RefCell;

use crate::Container;
use crate::container::{ContainerBuilder, CapacityContainerBuilder};
use crate::{Container, ContainerBuilder};
use crate::container::{CapacityContainerBuilder};

use crate::scheduling::{Schedule, ActivateOnDrop};

Expand Down
3 changes: 1 addition & 2 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::operate::SharedProgress;
use crate::progress::frontier::{Antichain, MutableAntichain};

use crate::Container;
use crate::container::ContainerBuilder;
use crate::{Container, ContainerBuilder};
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::channels::pushers::Tee;
use crate::dataflow::channels::pushers::Counter as PushCounter;
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
use crate::dataflow::channels::Message;
use crate::communication::{Push, Pull};
use crate::{Container, Accountable};
use crate::container::{ContainerBuilder, CapacityContainerBuilder};
use crate::{Accountable, Container, ContainerBuilder};
use crate::container::CapacityContainerBuilder;

use crate::dataflow::operators::InputCapability;
use crate::dataflow::operators::capability::CapabilityTrait;
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::dataflow::{Scope, StreamCore};
use super::builder_rc::OperatorBuilder;
use crate::dataflow::operators::generic::OperatorInfo;
use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
use crate::Container;
use crate::container::{ContainerBuilder, CapacityContainerBuilder};
use crate::{Container, ContainerBuilder};
use crate::container::CapacityContainerBuilder;

/// Methods to construct generic streaming and blocking operators.
pub trait Operator<G: Scope, C1> {
Expand Down
4 changes: 4 additions & 0 deletions timely/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ impl<T: Clone+'static> Data for T { }
pub trait Container: Accountable + Default + Clone + 'static { }
impl<C: Accountable + Default + Clone + 'static> Container for C { }

/// A composite trait for types usable as container builders in timely dataflow.
pub trait ContainerBuilder: timely_container::ContainerBuilder<Container: Container> + Default + 'static {}
impl<CB: timely_container::ContainerBuilder<Container: Container> + Default + 'static> ContainerBuilder for CB {}

/// A composite trait for types usable on exchange channels in timely dataflow.
///
/// The `ExchangeData` trait extends `Data` with any requirements imposed by the `timely_communication`
Expand Down
2 changes: 1 addition & 1 deletion timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ pub trait AsWorker : Scheduler {
/// Acquires a logger by name, if the log register exists and the name is registered.
///
/// For a more precise understanding of why a result is `None` one can use the direct functions.
fn logger_for<CB: timely_container::ContainerBuilder>(&self, name: &str) -> Option<timely_logging::Logger<CB>> {
fn logger_for<CB: crate::ContainerBuilder>(&self, name: &str) -> Option<timely_logging::Logger<CB>> {
self.log_register().and_then(|l| l.get(name))
}
/// Provides access to the timely logging stream.
Expand Down
Loading