From c851a3408a1cd554fd19d8e79b3b6f9f5e9ee092 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 15 Jul 2022 17:44:22 +0200 Subject: [PATCH] Container-invariant Exchange Convert the current vector-based implementation for the Exchange operator into a container-invariant one. The PushPartitioned trait enables the implementation to be generic over all containers that support it. Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/operators/exchange.rs | 32 ++++++++++++++--------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/exchange.rs index 878e4cd97..1f603df25 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/exchange.rs @@ -1,12 +1,13 @@ //! Exchange records between workers. use crate::ExchangeData; -use crate::dataflow::channels::pact::Exchange as ExchangePact; -use crate::dataflow::{Stream, Scope}; +use crate::container::PushPartitioned; +use crate::dataflow::channels::pact::ExchangeCore; use crate::dataflow::operators::generic::operator::Operator; +use crate::dataflow::{Scope, StreamCore}; /// Exchange records between workers. -pub trait Exchange { +pub trait Exchange { /// Exchange records between workers. /// /// The closure supplied should map a reference to a record to a `u64`, @@ -22,18 +23,23 @@ pub trait Exchange { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn exchange(&self, route: impl FnMut(&D)->u64+'static) -> Self; + fn exchange(&self, route: impl FnMut(&D) -> u64 + 'static) -> Self; } -// impl, D: ExchangeData> Exchange for Stream { -impl Exchange for Stream { - fn exchange(&self, route: impl FnMut(&D)->u64+'static) -> Stream { - let mut vector = Default::default(); - self.unary(ExchangePact::new(route), "Exchange", move |_,_| move |input, output| { - input.for_each(|time, data| { - data.swap(&mut vector); - output.session(&time).give_container(&mut vector); - }); +impl Exchange for StreamCore +where + C: PushPartitioned + ExchangeData, + C::Item: ExchangeData, +{ + fn exchange(&self, route: impl FnMut(&C::Item) -> u64 + 'static) -> StreamCore { + let mut container = Default::default(); + self.unary(ExchangeCore::new(route), "Exchange", |_, _| { + move |input, output| { + input.for_each(|time, data| { + data.swap(&mut container); + output.session(&time).give_container(&mut container); + }); + } }) } }