diff --git a/timely/src/dataflow/operators/reclock.rs b/timely/src/dataflow/operators/reclock.rs index 65d4f89e2..b656e0aaf 100644 --- a/timely/src/dataflow/operators/reclock.rs +++ b/timely/src/dataflow/operators/reclock.rs @@ -1,13 +1,13 @@ //! Extension methods for `Stream` based on record-by-record transformation. -use crate::Data; +use crate::Container; use crate::order::PartialOrder; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for reclocking a stream. -pub trait Reclock { +pub trait Reclock { /// Delays records until an input is observed on the `clock` input. /// /// The source stream is buffered until a record is seen on the clock input, @@ -45,11 +45,11 @@ pub trait Reclock { /// assert_eq!(extracted[1], (5, vec![4,5])); /// assert_eq!(extracted[2], (8, vec![6,7,8])); /// ``` - fn reclock(&self, clock: &Stream) -> Stream; + fn reclock>(&self, clock: &StreamCore) -> Self; } -impl Reclock for Stream { - fn reclock(&self, clock: &Stream) -> Stream { +impl Reclock for StreamCore { + fn reclock>(&self, clock: &StreamCore) -> StreamCore { let mut stash = vec![]; @@ -57,7 +57,7 @@ impl Reclock for Stream { // stash each data input with its timestamp. input1.for_each(|cap, data| { - stash.push((cap.time().clone(), data.replace(Vec::new()))); + stash.push((cap.time().clone(), data.replace(Default::default()))); }); // request notification at time, to flush stash. @@ -70,7 +70,7 @@ impl Reclock for Stream { let mut session = output.session(&cap); for &mut (ref t, ref mut data) in &mut stash { if t.less_equal(cap.time()) { - session.give_vec(data); + session.give_container(data); } } stash.retain(|x| !x.0.less_equal(cap.time()));