Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ pub mod pullers;
pub mod pact;

/// The input to and output from timely dataflow communication channels.
pub type BundleCore<T, D> = crate::communication::Message<Message<T, D>>;

/// The input to and output from timely dataflow communication channels specialized to vectors.
pub type Bundle<T, D> = BundleCore<T, Vec<D>>;
pub type Bundle<T, D> = crate::communication::Message<Message<T, D>>;

/// A serializable representation of timestamped data.
#[derive(Clone, Abomonation, Serialize, Deserialize)]
Expand Down Expand Up @@ -46,11 +43,11 @@ impl<T, D: Container> Message<T, D> {
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element.
#[inline]
pub fn push_at<P: Push<BundleCore<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {
pub fn push_at<P: Push<Bundle<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let mut bundle = Some(BundleCore::from_typed(message));
let mut bundle = Some(Bundle::from_typed(message));

pusher.push(&mut bundle);

Expand Down
45 changes: 20 additions & 25 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,32 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::communication::{Push, Pull, Data};
use crate::container::PushPartitioned;
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use crate::dataflow::channels::{BundleCore, Message};
use crate::dataflow::channels::{Bundle, Message};
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
use crate::progress::Timestamp;
use crate::worker::AsWorker;

/// A `ParallelizationContractCore` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContractCore<T, D> {
/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T, D> {
/// Type implementing `Push` produced by this pact.
type Pusher: Push<BundleCore<T, D>>+'static;
type Pusher: Push<Bundle<T, D>>+'static;
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<BundleCore<T, D>>+'static;
type Puller: Pull<Bundle<T, D>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}

/// A `ParallelizationContractCore` specialized for `Vec` containers
/// TODO: Use trait aliases once stable.
pub trait ParallelizationContract<T, D: Clone>: ParallelizationContractCore<T, Vec<D>> { }
impl<T, D: Clone, P: ParallelizationContractCore<T, Vec<D>>> ParallelizationContract<T, D> for P { }

/// A direct connection
#[derive(Debug)]
pub struct Pipeline;

impl<T: 'static, D: Container> ParallelizationContractCore<T, D> for Pipeline {
type Pusher = LogPusher<T, D, ThreadPusher<BundleCore<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<BundleCore<T, D>>>;
impl<T: 'static, D: Container> ParallelizationContract<T, D> for Pipeline {
type Pusher = LogPusher<T, D, ThreadPusher<Bundle<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<Bundle<T, D>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, D>>(identifier, address);
// // ignore `&mut A` and use thread allocator
// let (pusher, puller) = Thread::new::<Bundle<T, D>>();
// let (pusher, puller) = Thread::new::<Bundle<T, Vec<D>>>();
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
}
Expand All @@ -71,13 +66,13 @@ where
}

// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Timestamp, C, H: 'static> ParallelizationContractCore<T, C> for ExchangeCore<C, H>
impl<T: Timestamp, C, H: 'static> ParallelizationContract<T, C> for ExchangeCore<C, H>
where
C: Data + PushPartitioned,
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<BundleCore<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<BundleCore<T, C>>>>;
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
Expand All @@ -94,7 +89,7 @@ impl<C, F> Debug for ExchangeCore<C, F> {

/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPusher<T, D, P: Push<BundleCore<T, D>>> {
pub struct LogPusher<T, D, P: Push<Bundle<T, D>>> {
pusher: P,
channel: usize,
counter: usize,
Expand All @@ -104,7 +99,7 @@ pub struct LogPusher<T, D, P: Push<BundleCore<T, D>>> {
logging: Option<Logger>,
}

impl<T, D, P: Push<BundleCore<T, D>>> LogPusher<T, D, P> {
impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
/// Allocates a new pusher.
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPusher {
Expand All @@ -119,9 +114,9 @@ impl<T, D, P: Push<BundleCore<T, D>>> LogPusher<T, D, P> {
}
}

impl<T, D: Container, P: Push<BundleCore<T, D>>> Push<BundleCore<T, D>> for LogPusher<T, D, P> {
impl<T, D: Container, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T, D, P> {
#[inline]
fn push(&mut self, pair: &mut Option<BundleCore<T, D>>) {
fn push(&mut self, pair: &mut Option<Bundle<T, D>>) {
if let Some(bundle) = pair {
self.counter += 1;

Expand Down Expand Up @@ -150,15 +145,15 @@ impl<T, D: Container, P: Push<BundleCore<T, D>>> Push<BundleCore<T, D>> for LogP

/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPuller<T, D, P: Pull<BundleCore<T, D>>> {
pub struct LogPuller<T, D, P: Pull<Bundle<T, D>>> {
puller: P,
channel: usize,
index: usize,
phantom: PhantomData<(T, D)>,
logging: Option<Logger>,
}

impl<T, D, P: Pull<BundleCore<T, D>>> LogPuller<T, D, P> {
impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
/// Allocates a new `Puller`.
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
Expand All @@ -171,9 +166,9 @@ impl<T, D, P: Pull<BundleCore<T, D>>> LogPuller<T, D, P> {
}
}

impl<T, D: Container, P: Pull<BundleCore<T, D>>> Pull<BundleCore<T, D>> for LogPuller<T, D, P> {
impl<T, D: Container, P: Pull<Bundle<T, D>>> Pull<Bundle<T, D>> for LogPuller<T, D, P> {
#[inline]
fn pull(&mut self) -> &mut Option<BundleCore<T,D>> {
fn pull(&mut self) -> &mut Option<Bundle<T,D>> {
let result = self.puller.pull();
if let Some(bundle) = result {
let channel = self.channel;
Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
use std::rc::Rc;
use std::cell::RefCell;

use crate::dataflow::channels::BundleCore;
use crate::dataflow::channels::Bundle;
use crate::progress::ChangeBatch;
use crate::communication::Pull;
use crate::Container;

/// A wrapper which accounts records pulled past in a shared count map.
pub struct Counter<T: Ord+Clone+'static, D, P: Pull<BundleCore<T, D>>> {
pub struct Counter<T: Ord+Clone+'static, D, P: Pull<Bundle<T, D>>> {
pullable: P,
consumed: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<D>,
Expand All @@ -36,15 +36,15 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
}
}

impl<T:Ord+Clone+'static, D: Container, P: Pull<BundleCore<T, D>>> Counter<T, D, P> {
impl<T:Ord+Clone+'static, D: Container, P: Pull<Bundle<T, D>>> Counter<T, D, P> {
/// Retrieves the next timestamp and batch of data.
#[inline]
pub fn next(&mut self) -> Option<&mut BundleCore<T, D>> {
pub fn next(&mut self) -> Option<&mut Bundle<T, D>> {
self.next_guarded().map(|(_guard, bundle)| bundle)
}

#[inline]
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut BundleCore<T, D>)> {
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, D>)> {
if let Some(message) = self.pullable.pull() {
let guard = ConsumedGuard {
consumed: Rc::clone(&self.consumed),
Expand All @@ -57,7 +57,7 @@ impl<T:Ord+Clone+'static, D: Container, P: Pull<BundleCore<T, D>>> Counter<T, D,
}
}

impl<T:Ord+Clone+'static, D, P: Pull<BundleCore<T, D>>> Counter<T, D, P> {
impl<T:Ord+Clone+'static, D, P: Pull<Bundle<T, D>>> Counter<T, D, P> {
/// Allocates a new `Counter` from a boxed puller.
pub fn new(pullable: P) -> Self {
Counter {
Expand Down
31 changes: 14 additions & 17 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::communication::Push;
use crate::container::{PushContainer, PushInto};
use crate::dataflow::channels::{BundleCore, Message};
use crate::dataflow::channels::{Bundle, Message};
use crate::dataflow::operators::Capability;
use crate::progress::Timestamp;
use crate::{Container, Data};
Expand All @@ -13,18 +13,15 @@ use crate::{Container, Data};
/// The `Buffer` type should be used by calling `session` with a time, which checks whether
/// data must be flushed and creates a `Session` object which allows sending at the given time.
#[derive(Debug)]
pub struct BufferCore<T, D: Container, P: Push<BundleCore<T, D>>> {
pub struct Buffer<T, D: Container, P: Push<Bundle<T, D>>> {
/// the currently open time, if it is open
time: Option<T>,
/// a buffer for records, to send at self.time
buffer: D,
pusher: P,
}

/// A buffer specialized to vector-based containers.
pub type Buffer<T, D, P> = BufferCore<T, Vec<D>, P>;

impl<T, C: Container, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq+Clone {
impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, C, P> where T: Eq+Clone {

/// Creates a new `Buffer`.
pub fn new(pusher: P) -> Self {
Expand Down Expand Up @@ -82,7 +79,7 @@ impl<T, C: Container, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq
}
}

impl<T, C: PushContainer, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq+Clone {
impl<T, C: PushContainer, P: Push<Bundle<T, C>>> Buffer<T, C, P> where T: Eq+Clone {
// internal method for use by `Session`.
#[inline]
fn give<D: PushInto<C>>(&mut self, data: D) {
Expand All @@ -97,7 +94,7 @@ impl<T, C: PushContainer, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T
}
}

impl<T, D: Data, P: Push<BundleCore<T, Vec<D>>>> Buffer<T, D, P> where T: Eq+Clone {
impl<T, D: Data, P: Push<Bundle<T, Vec<D>>>> Buffer<T, Vec<D>, P> where T: Eq+Clone {
// Gives an entire message at a specific time.
fn give_vec(&mut self, vector: &mut Vec<D>) {
// flush to ensure fifo-ness
Expand All @@ -114,18 +111,18 @@ impl<T, D: Data, P: Push<BundleCore<T, Vec<D>>>> Buffer<T, D, P> where T: Eq+Clo
/// The `Session` struct provides the user-facing interface to an operator output, namely
/// the `Buffer` type. A `Session` wraps a session of output at a specified time, and
/// avoids what would otherwise be a constant cost of checking timestamp equality.
pub struct Session<'a, T, C: Container, P: Push<BundleCore<T, C>>+'a> where T: Eq+Clone+'a, C: 'a {
buffer: &'a mut BufferCore<T, C, P>,
pub struct Session<'a, T, C: Container, P: Push<Bundle<T, C>>+'a> where T: Eq+Clone+'a, C: 'a {
buffer: &'a mut Buffer<T, C, P>,
}

impl<'a, T, C: Container, P: Push<BundleCore<T, C>>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
impl<'a, T, C: Container, P: Push<Bundle<T, C>>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
/// Provide a container at the time specified by the [Session].
pub fn give_container(&mut self, container: &mut C) {
self.buffer.give_container(container)
}
}

impl<'a, T, C, P: Push<BundleCore<T, C>>+'a> Session<'a, T, C, P>
impl<'a, T, C, P: Push<Bundle<T, C>>+'a> Session<'a, T, C, P>
where
T: Eq+Clone+'a,
C: 'a + PushContainer,
Expand All @@ -144,7 +141,7 @@ where
}
}

impl<'a, T, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> Session<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
impl<'a, T, D: Data, P: Push<Bundle<T, Vec<D>>>+'a> Session<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
/// Provides a fully formed `Content<D>` message for senders which can use this type.
///
/// The `Content` type is the backing memory for communication in timely, and it can
Expand All @@ -159,18 +156,18 @@ impl<'a, T, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> Session<'a, T, Vec<D>, P
}

/// A session which will flush itself when dropped.
pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>+'a> where
pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push<Bundle<T, C>>+'a> where
T: Eq+Clone+'a, C: 'a {
/// A reference to the underlying buffer.
buffer: &'a mut BufferCore<T, C, P>,
buffer: &'a mut Buffer<T, C, P>,
/// The capability being used to send the data.
_capability: Capability<T>,
}

/// Auto-flush session specialized to vector-based containers.
pub type AutoflushSession<'a, T, D, P> = AutoflushSessionCore<'a, T, Vec<D>, P>;

impl<'a, T: Timestamp, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> AutoflushSessionCore<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
impl<'a, T: Timestamp, D: Data, P: Push<Bundle<T, Vec<D>>>+'a> AutoflushSessionCore<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
/// Transmits a single record.
#[inline]
pub fn give(&mut self, data: D) {
Expand All @@ -192,7 +189,7 @@ impl<'a, T: Timestamp, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> AutoflushSess
}
}

impl<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
impl<'a, T: Timestamp, C: Container, P: Push<Bundle<T, C>>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
fn drop(&mut self) {
self.buffer.cease();
}
Expand Down
17 changes: 7 additions & 10 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@ use std::rc::Rc;
use std::cell::RefCell;

use crate::progress::{ChangeBatch, Timestamp};
use crate::dataflow::channels::BundleCore;
use crate::dataflow::channels::Bundle;
use crate::communication::Push;
use crate::Container;

/// A wrapper which updates shared `produced` based on the number of records pushed.
#[derive(Debug)]
pub struct CounterCore<T, D, P: Push<BundleCore<T, D>>> {
pub struct Counter<T, D, P: Push<Bundle<T, D>>> {
pushee: P,
produced: Rc<RefCell<ChangeBatch<T>>>,
phantom: PhantomData<D>,
}

/// A counter specialized to vector.
pub type Counter<T, D, P> = CounterCore<T, Vec<D>, P>;

impl<T: Timestamp, D: Container, P> Push<BundleCore<T, D>> for CounterCore<T, D, P> where P: Push<BundleCore<T, D>> {
impl<T: Timestamp, D: Container, P> Push<Bundle<T, D>> for Counter<T, D, P> where P: Push<Bundle<T, D>> {
#[inline]
fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
if let Some(message) = message {
self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
}
Expand All @@ -34,10 +31,10 @@ impl<T: Timestamp, D: Container, P> Push<BundleCore<T, D>> for CounterCore<T, D,
}
}

impl<T, D, P: Push<BundleCore<T, D>>> CounterCore<T, D, P> where T : Ord+Clone+'static {
impl<T, D, P: Push<Bundle<T, D>>> Counter<T, D, P> where T : Ord+Clone+'static {
/// Allocates a new `Counter` from a pushee and shared counts.
pub fn new(pushee: P) -> CounterCore<T, D, P> {
CounterCore {
pub fn new(pushee: P) -> Counter<T, D, P> {
Counter {
pushee,
produced: Rc::new(RefCell::new(ChangeBatch::new())),
phantom: PhantomData,
Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

use crate::communication::Push;
use crate::container::PushPartitioned;
use crate::dataflow::channels::{BundleCore, Message};
use crate::dataflow::channels::{Bundle, Message};
use crate::{Container, Data};

// TODO : Software write combining
/// Distributes records among target pushees according to a distribution function.
pub struct Exchange<T, C: PushPartitioned, P: Push<BundleCore<T, C>>, H>
pub struct Exchange<T, C: PushPartitioned, P: Push<Bundle<T, C>>, H>
where
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
Expand All @@ -17,7 +17,7 @@ where
hash_func: H,
}

impl<T: Clone, C: PushPartitioned, P: Push<BundleCore<T, C>>, H> Exchange<T, C, P, H>
impl<T: Clone, C: PushPartitioned, P: Push<Bundle<T, C>>, H> Exchange<T, C, P, H>
where
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
Expand All @@ -44,13 +44,13 @@ where
}
}

impl<T: Eq+Data, C: Container, P: Push<BundleCore<T, C>>, H, > Push<BundleCore<T, C>> for Exchange<T, C, P, H>
impl<T: Eq+Data, C: Container, P: Push<Bundle<T, C>>, H, > Push<Bundle<T, C>> for Exchange<T, C, P, H>
where
C: PushPartitioned,
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
#[inline(never)]
fn push(&mut self, message: &mut Option<BundleCore<T, C>>) {
fn push(&mut self, message: &mut Option<Bundle<T, C>>) {
// if only one pusher, no exchange
if self.pushers.len() == 1 {
self.pushers[0].push(message);
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/pushers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use self::tee::{Tee, TeeCore, TeeHelper};
pub use self::tee::{Tee, TeeHelper};
pub use self::exchange::Exchange;
pub use self::counter::{Counter, CounterCore};
pub use self::counter::Counter;

pub mod tee;
pub mod exchange;
Expand Down
Loading