Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 107 additions & 5 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//! builder.add_edge(Source::new(2, 0), Target::new(0, 0));
//!
//! // Construct a reachability tracker.
//! let (mut tracker, _) = builder.build();
//! let (mut tracker, _) = builder.build(None);
//!
//! // Introduce a pointstamp at the output of the first node.
//! tracker.update_source(Source::new(0, 0), 17, 1);
Expand Down Expand Up @@ -123,7 +123,7 @@ use crate::progress::timestamp::PathSummary;
/// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
///
/// // Summarize reachability information.
/// let (tracker, _) = builder.build();
/// let (tracker, _) = builder.build(None);
/// ```
#[derive(Clone, Debug)]
pub struct Builder<T: Timestamp> {
Expand Down Expand Up @@ -193,14 +193,16 @@ impl<T: Timestamp> Builder<T> {
/// This method has the opportunity to perform some error checking that the path summaries
/// are valid, including references to undefined nodes and ports, as well as self-loops with
/// default summaries (a serious liveness issue).
pub fn build(self) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>) {
///
/// The optional logger information is baked into the resulting tracker.
pub fn build(self, logger: Option<logging::TrackerLogger>) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>) {

if !self.is_acyclic() {
println!("Cycle detected without timestamp increment");
println!("{:?}", self);
}

Tracker::allocate_from(self)
Tracker::allocate_from(self, logger)
}

/// Tests whether the graph a cycle of default path summaries.
Expand Down Expand Up @@ -394,6 +396,9 @@ pub struct Tracker<T:Timestamp> {
/// always be exactly equal to the sum across all operators of the frontier sizes
/// of the target and source `pointstamps` member.
total_counts: i64,

/// Optionally, a unique logging identifier and logging for tracking events.
logger: Option<logging::TrackerLogger>,
}

/// Target and source information for each operator.
Expand Down Expand Up @@ -483,7 +488,9 @@ impl<T:Timestamp> Tracker<T> {
///
/// The result is a pair of tracker, and the summaries from each input port to each
/// output port.
pub fn allocate_from(builder: Builder<T>) -> (Self, Vec<Vec<Antichain<T::Summary>>>) {
///
/// If the optional logger is provided, it will be used to log various tracker events.
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger>) -> (Self, Vec<Vec<Antichain<T::Summary>>>) {

// Allocate buffer space for each input and input port.
let mut per_operator =
Expand Down Expand Up @@ -535,6 +542,7 @@ impl<T:Timestamp> Tracker<T> {
pushed_changes: ChangeBatch::new(),
output_changes,
total_counts: 0,
logger,
};

(tracker, builder_summary)
Expand All @@ -546,6 +554,30 @@ impl<T:Timestamp> Tracker<T> {
/// until we cease deriving new implications.
pub fn propagate_all(&mut self) {

// Step 0: If logging is enabled, construct and log inbound changes.
if let Some(logger) = &mut self.logger {

let target_changes =
self.target_changes
.iter()
.map(|((target, time), diff)| (target.node, target.port, time.clone(), *diff))
.collect::<Vec<_>>();

if !target_changes.is_empty() {
logger.log_target_updates(Box::new(target_changes));
}

let source_changes =
self.source_changes
.iter()
.map(|((source, time), diff)| (source.node, source.port, time.clone(), *diff))
.collect::<Vec<_>>();

if !source_changes.is_empty() {
logger.log_source_updates(Box::new(source_changes));
}
}

// Step 1: Drain `self.input_changes` and determine actual frontier changes.
//
// Not all changes in `self.input_changes` may alter the frontier at a location.
Expand Down Expand Up @@ -775,3 +807,73 @@ fn summarize_outputs<T: Timestamp>(

results
}

/// Logging types for reachability tracking events.
pub mod logging {

use crate::logging::{Logger, ProgressEventTimestampVec};

/// A logger with additional identifying information about the tracker.
pub struct TrackerLogger {
path: Vec<usize>,
logger: Logger<TrackerEvent>,
}

impl TrackerLogger {
/// Create a new tracker logger from its fields.
pub fn new(path: Vec<usize>, logger: Logger<TrackerEvent>) -> Self {
Self { path, logger }
}

/// Log source update events with additional identifying information.
pub fn log_source_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
SourceUpdate {
tracker_id: self.path.clone(),
updates,
}
})
}
/// Log target update events with additional identifying information.
pub fn log_target_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
TargetUpdate {
tracker_id: self.path.clone(),
updates,
}
})
}
}

/// Events that the tracker may record.
pub enum TrackerEvent {
/// Updates made at a source of data.
SourceUpdate(SourceUpdate),
/// Updates made at a target of data.
TargetUpdate(TargetUpdate),
}

/// An update made at a source of data.
pub struct SourceUpdate {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

/// An update made at a target of data.
pub struct TargetUpdate {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

impl From<SourceUpdate> for TrackerEvent {
fn from(v: SourceUpdate) -> TrackerEvent { TrackerEvent::SourceUpdate(v) }
}

impl From<TargetUpdate> for TrackerEvent {
fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) }
}
}
8 changes: 7 additions & 1 deletion timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,13 @@ where
builder.add_edge(source, target);
}

let (tracker, scope_summary) = builder.build();
// The `None` argument is optional logging infrastructure.
let path = self.path.clone();
let reachability_logging =
worker.log_register()
.get::<reachability::logging::TrackerEvent>("timely/reachability")
.map(|logger| reachability::logging::TrackerLogger::new(path, logger));
let (tracker, scope_summary) = builder.build(reachability_logging);

let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());

Expand Down