Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn main() {

// Acquire a re-activator for this operator.
use timely::scheduling::Scheduler;
let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);

let mut cap = Some(capability);
move |output| {
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub trait ParallelizationContract<T, C> {
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<Bundle<T, C>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}

/// A direct connection
Expand All @@ -36,7 +36,7 @@ pub struct Pipeline;
impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
type Pusher = LogPusher<T, C, ThreadPusher<Bundle<T, C>>>;
type Puller = LogPuller<T, C, ThreadPuller<Bundle<T, C>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
Expand Down Expand Up @@ -72,7 +72,7 @@ where
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {

impl<T: Timestamp> ActivateCapability<T> {
/// Creates a new activating capability.
pub fn new(capability: Capability<T>, address: &[usize], activations: Rc<RefCell<Activations>>) -> Self {
pub fn new(capability: Capability<T>, address: Vec<usize>, activations: Rc<RefCell<Activations>>) -> Self {
Self {
capability,
address: Rc::new(address.to_vec()),
address: Rc::new(address),
activations,
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where
let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());

let address = builder.operator_info().address;
let activator = scope.activator_for(&address[..]);
let activator = scope.activator_for(address);

let (targets, stream) = builder.new_output();

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T
let ingress = IngressNub {
targets: Counter::new(targets),
phantom: PhantomData,
activator: scope.activator_for(&scope.addr()),
activator: scope.activator_for(scope.addr()),
active: false,
};
let produced = ingress.targets.produced().clone();
Expand Down
5 changes: 2 additions & 3 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,9 @@ impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
let produced = counter.produced().clone();

let index = self.allocate_operator_index();
let mut address = self.addr();
address.push(index);
let address = self.addr_for_child(index);

handle.activate.push(self.activator_for(&address[..]));
handle.activate.push(self.activator_for(address.clone()));

let progress = Rc::new(RefCell::new(ChangeBatch::new()));

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/to_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl<CB: ContainerBuilder, I: IntoIterator+'static> ToStreamBuilder<CB> for I wh
source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| {

// Acquire an activator, so that the operator can rescheduled itself.
let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);

let mut iterator = self.into_iter().fuse();
let mut capability = Some(capability);
Expand Down
5 changes: 2 additions & 3 deletions timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,9 @@ impl<G: Scope> UnorderedInput<G> for G {
let peers = self.peers();

let index = self.allocate_operator_index();
let mut address = self.addr();
address.push(index);
let address = self.addr_for_child(index);

let cap = ActivateCapability::new(cap, &address, self.activations());
let cap = ActivateCapability::new(cap, address.clone(), self.activations());

let helper = UnorderedHandle::new(counter);

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/flow_controlled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub fn iterator_source<
let mut target = G::Timestamp::minimum();
source(scope, name, |cap, info| {
let mut cap = Some(cap);
let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);
move |output| {
cap = cap.take().and_then(|mut cap| {
loop {
Expand Down
7 changes: 3 additions & 4 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ impl<G: Scope> OperatorBuilder<G> {

let global = scope.new_identifier();
let index = scope.allocate_operator_index();
let mut address = scope.addr();
address.push(index);
let address = scope.addr_for_child(index);
let peers = scope.peers();

OperatorBuilder {
Expand Down Expand Up @@ -119,7 +118,7 @@ impl<G: Scope> OperatorBuilder<G> {

let channel_id = self.scope.new_identifier();
let logging = self.scope.logging();
let (sender, receiver) = pact.connect(&mut self.scope, channel_id, &self.address[..], logging);
let (sender, receiver) = pact.connect(&mut self.scope, channel_id, self.address.clone(), logging);
let target = Target::new(self.index, self.shape.inputs);
stream.connect_to(target, sender, channel_id);

Expand Down Expand Up @@ -175,7 +174,7 @@ impl<G: Scope> OperatorBuilder<G> {

/// Information describing the operator.
pub fn operator_info(&self) -> OperatorInfo {
OperatorInfo::new(self.index, self.global, &self.address[..])
OperatorInfo::new(self.index, self.global, self.address.clone())
}
}

Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<G: Scope> OperatorBuilder<G> {
where
P: ParallelizationContract<G::Timestamp, C> {

let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().outputs()];
let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for future selves, this is a common enough pattern (antichain with default summary) that it probably wants to be represented more compactly, perhaps as an enum variant. It seems that such enums around vectors are surprisingly cheap now (more than just the one additional variant allowed by e.g. Option<Vec<_>>).

self.new_input_connection(stream, pact, connection)
}

Expand Down Expand Up @@ -94,7 +94,7 @@ impl<G: Scope> OperatorBuilder<G> {

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()];
let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect();
self.new_output_connection(connection)
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
///
/// source(scope, "Source", |capability, info| {
///
/// let activator = scope.activator_for(&info.address[..]);
/// let activator = scope.activator_for(info.address);
///
/// let mut cap = Some(capability);
/// move |output| {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/operator_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ pub struct OperatorInfo {

impl OperatorInfo {
/// Construct a new `OperatorInfo`.
pub fn new(local_id: usize, global_id: usize, address: &[usize]) -> OperatorInfo {
pub fn new(local_id: usize, global_id: usize, address: Vec<usize>) -> OperatorInfo {
OperatorInfo {
local_id,
global_id,
address: address.to_vec(),
address,
}
}
}
13 changes: 11 additions & 2 deletions timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ where
fn config(&self) -> &Config { self.parent.config() }
fn index(&self) -> usize { self.parent.index() }
fn peers(&self) -> usize { self.parent.peers() }
fn allocate<D: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
fn allocate<D: Data>(&mut self, identifier: usize, address: Vec<usize>) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
self.parent.allocate(identifier, address)
}
fn pipeline<D: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<D>>, ThreadPuller<Message<D>>) {
fn pipeline<D: 'static>(&mut self, identifier: usize, address: Vec<usize>) -> (ThreadPusher<Message<D>>, ThreadPuller<Message<D>>) {
self.parent.pipeline(identifier, address)
}
fn new_identifier(&mut self) -> usize {
Expand Down Expand Up @@ -97,6 +97,15 @@ where
{
fn name(&self) -> String { self.subgraph.borrow().name.clone() }
fn addr(&self) -> Vec<usize> { self.subgraph.borrow().path.clone() }

fn addr_for_child(&self, index: usize) -> Vec<usize> {
let path = &self.subgraph.borrow().path[..];
let mut addr = Vec::with_capacity(path.len() + 1);
addr.extend_from_slice(path);
addr.push(index);
addr
}

fn add_edge(&self, source: Source, target: Target) {
self.subgraph.borrow_mut().connect(source, target);
}
Expand Down
4 changes: 4 additions & 0 deletions timely/src/dataflow/scopes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub trait Scope: ScopeParent {
/// A sequence of scope identifiers describing the path from the worker root to this scope.
fn addr(&self) -> Vec<usize>;

/// A sequence of scope identifiers describing the path from the worker root to the child
/// indicated by `index`.
fn addr_for_child(&self, index: usize) -> Vec<usize>;

/// Connects a source of data with a target of the data. This only links the two for
/// the purposes of tracking progress, rather than effect any data movement itself.
fn add_edge(&self, source: Source, target: Target);
Expand Down
5 changes: 2 additions & 3 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ pub struct Progcaster<T:Timestamp> {

impl<T:Timestamp+Send> Progcaster<T> {
/// Creates a new `Progcaster` using a channel from the supplied worker.
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, path: &Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {

let channel_identifier = worker.new_identifier();
let (pushers, puller) = worker.allocate(channel_identifier, &path[..]);
let (pushers, puller) = worker.allocate(channel_identifier, addr.clone());
logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent {
identifier: channel_identifier,
kind: crate::logging::CommChannelKind::Progress,
}));
let worker_index = worker.index();
let addr = path.clone();
Progcaster {
to_push: None,
pushers,
Expand Down
15 changes: 9 additions & 6 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,16 @@ where

/// Adds a new child to the subgraph.
pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
{
let mut child_path = self.path.clone();
if let Some(l) = &mut self.logging {
let mut child_path = Vec::with_capacity(self.path.len() + 1);
child_path.extend_from_slice(&self.path[..]);
child_path.push(index);
self.logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent {

l.log(crate::logging::OperatesEvent {
id: identifier,
addr: child_path,
name: child.name().to_owned(),
}));
});
}
self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()))
}
Expand All @@ -163,7 +165,8 @@ where
let mut builder = reachability::Builder::new();

// Child 0 has `inputs` outputs and `outputs` inputs, not yet connected.
builder.add_node(0, outputs, inputs, vec![vec![Antichain::new(); inputs]; outputs]);
let summary = (0..outputs).map(|_| (0..inputs).map(|_| Antichain::new()).collect()).collect();
builder.add_node(0, outputs, inputs, summary);
for (index, child) in self.children.iter().enumerate().skip(1) {
builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
}
Expand All @@ -181,7 +184,7 @@ where
.map(|logger| reachability::logging::TrackerLogger::new(path, logger));
let (tracker, scope_summary) = builder.build(reachability_logging);

let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());
let progcaster = Progcaster::new(worker, self.path.clone(), self.logging.clone(), self.progress_logging.clone());

let mut incomplete = vec![true; self.children.len()];
incomplete[0] = false;
Expand Down
8 changes: 4 additions & 4 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ pub struct Activator {

impl Activator {
/// Creates a new activation handle
pub fn new(path: &[usize], queue: Rc<RefCell<Activations>>) -> Self {
pub fn new(path: Vec<usize>, queue: Rc<RefCell<Activations>>) -> Self {
Self {
path: path.to_vec(),
path,
queue,
}
}
Expand Down Expand Up @@ -259,9 +259,9 @@ pub struct SyncActivator {

impl SyncActivator {
/// Creates a new thread-safe activation handle.
pub fn new(path: &[usize], queue: SyncActivations) -> Self {
pub fn new(path: Vec<usize>, queue: SyncActivations) -> Self {
Self {
path: path.to_vec(),
path,
queue,
}
}
Expand Down
4 changes: 2 additions & 2 deletions timely/src/scheduling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ pub trait Scheduler {
fn activations(&self) -> Rc<RefCell<Activations>>;

/// Constructs an `Activator` tied to the specified operator address.
fn activator_for(&self, path: &[usize]) -> Activator {
fn activator_for(&self, path: Vec<usize>) -> Activator {
Activator::new(path, self.activations())
}

/// Constructs a `SyncActivator` tied to the specified operator address.
fn sync_activator_for(&self, path: &[usize]) -> SyncActivator {
fn sync_activator_for(&self, path: Vec<usize>) -> SyncActivator {
let sync_activations = self.activations().borrow().sync();
SyncActivator::new(path, sync_activations)
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/synchronization/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl<T: ExchangeData> Sequencer<T> {
activator_source
.borrow_mut()
.replace(CatchupActivator {
activator: scope.activator_for(&info.address[..]),
activator: scope.activator_for(info.address),
catchup_until: None,
});

Expand Down
12 changes: 6 additions & 6 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ pub trait AsWorker : Scheduler {
/// scheduled in response to the receipt of records on the channel.
/// Most commonly, this would be the address of the *target* of the
/// channel.
fn allocate<T: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>);
fn allocate<T: Data>(&mut self, identifier: usize, address: Vec<usize>) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>);
/// Constructs a pipeline channel from the worker to itself.
///
/// By default this method uses the native channel allocation mechanism, but the expectation is
/// that this behavior will be overriden to be more efficient.
fn pipeline<T: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>);
fn pipeline<T: 'static>(&mut self, identifier: usize, address: Vec<usize>) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>);

/// Allocates a new worker-unique identifier.
fn new_identifier(&mut self) -> usize;
Expand Down Expand Up @@ -231,17 +231,17 @@ impl<A: Allocate> AsWorker for Worker<A> {
fn config(&self) -> &Config { &self.config }
fn index(&self) -> usize { self.allocator.borrow().index() }
fn peers(&self) -> usize { self.allocator.borrow().peers() }
fn allocate<D: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
fn allocate<D: Data>(&mut self, identifier: usize, address: Vec<usize>) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
if address.is_empty() { panic!("Unacceptable address: Length zero"); }
let mut paths = self.paths.borrow_mut();
paths.insert(identifier, address.to_vec());
paths.insert(identifier, address);
self.temp_channel_ids.borrow_mut().push(identifier);
self.allocator.borrow_mut().allocate(identifier)
}
fn pipeline<T: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>) {
fn pipeline<T: 'static>(&mut self, identifier: usize, address: Vec<usize>) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>) {
if address.is_empty() { panic!("Unacceptable address: Length zero"); }
let mut paths = self.paths.borrow_mut();
paths.insert(identifier, address.to_vec());
paths.insert(identifier, address);
self.temp_channel_ids.borrow_mut().push(identifier);
self.allocator.borrow_mut().pipeline(identifier)
}
Expand Down
Loading