Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions timely/src/dataflow/operators/reclock.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Extension methods for `Stream` based on record-by-record transformation.

use crate::Data;
use crate::Container;
use crate::order::PartialOrder;
use crate::dataflow::{Stream, Scope};
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::operator::Operator;

/// Extension trait for reclocking a stream.
pub trait Reclock<S: Scope, D: Data> {
pub trait Reclock<S: Scope> {
/// Delays records until an input is observed on the `clock` input.
///
/// The source stream is buffered until a record is seen on the clock input,
Expand Down Expand Up @@ -45,19 +45,19 @@ pub trait Reclock<S: Scope, D: Data> {
/// assert_eq!(extracted[1], (5, vec![4,5]));
/// assert_eq!(extracted[2], (8, vec![6,7,8]));
/// ```
fn reclock(&self, clock: &Stream<S, ()>) -> Stream<S, D>;
fn reclock<TC: Container<Item=()>>(&self, clock: &StreamCore<S, TC>) -> Self;
}

impl<S: Scope, D: Data> Reclock<S, D> for Stream<S, D> {
fn reclock(&self, clock: &Stream<S, ()>) -> Stream<S, D> {
impl<S: Scope, C: Container> Reclock<S> for StreamCore<S, C> {
fn reclock<TC: Container<Item=()>>(&self, clock: &StreamCore<S, TC>) -> StreamCore<S, C> {

let mut stash = vec![];

self.binary_notify(clock, Pipeline, Pipeline, "Reclock", vec![], move |input1, input2, output, notificator| {

// stash each data input with its timestamp.
input1.for_each(|cap, data| {
stash.push((cap.time().clone(), data.replace(Vec::new())));
stash.push((cap.time().clone(), data.replace(Default::default())));
});

// request notification at time, to flush stash.
Expand All @@ -70,7 +70,7 @@ impl<S: Scope, D: Data> Reclock<S, D> for Stream<S, D> {
let mut session = output.session(&cap);
for &mut (ref t, ref mut data) in &mut stash {
if t.less_equal(cap.time()) {
session.give_vec(data);
session.give_container(data);
}
}
stash.retain(|x| !x.0.less_equal(cap.time()));
Expand Down