From d6521f6072643045835892a08cf0c73e99d9fc5d Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 16 Mar 2023 22:40:45 -0400 Subject: [PATCH 1/2] Apply changes from Timely TimelyDataflow/timely-dataflow#515 Signed-off-by: Moritz Hoffmann --- dogsdogsdogs/examples/delta_query2.rs | 4 ++-- examples/capture-test.rs | 4 ++-- src/capture.rs | 4 ++-- src/collection.rs | 6 +++--- src/input.rs | 4 ++-- src/operators/arrange/arrangement.rs | 12 ++++++------ src/operators/arrange/upsert.rs | 2 +- src/operators/iterate.rs | 4 ++-- src/operators/join.rs | 2 +- tests/import.rs | 2 +- 10 files changed, 22 insertions(+), 22 deletions(-) diff --git a/dogsdogsdogs/examples/delta_query2.rs b/dogsdogsdogs/examples/delta_query2.rs index 7f747122b..637f4e64f 100644 --- a/dogsdogsdogs/examples/delta_query2.rs +++ b/dogsdogsdogs/examples/delta_query2.rs @@ -24,8 +24,8 @@ fn main() { use timely::dataflow::operators::unordered_input::UnorderedHandle; - let ((input1, capability1), data1): ((UnorderedHandle, ((usize, usize), Product, isize)>, _), _) = inner.new_unordered_input(); - let ((input2, capability2), data2): ((UnorderedHandle, ((usize, usize), Product, isize)>, _), _) = inner.new_unordered_input(); + let ((input1, capability1), data1): ((UnorderedHandle, Vec<((usize, usize), Product, isize)>>, _), _) = inner.new_unordered_input(); + let ((input2, capability2), data2): ((UnorderedHandle, Vec<((usize, usize), Product, isize)>>, _), _) = inner.new_unordered_input(); let edges1 = data1.as_collection(); let edges2 = data2.as_collection(); diff --git a/examples/capture-test.rs b/examples/capture-test.rs index 19b00a354..a28e7adbd 100644 --- a/examples/capture-test.rs +++ b/examples/capture-test.rs @@ -153,7 +153,7 @@ pub mod kafka { use differential_dataflow::lattice::Lattice; /// Creates a Kafka source from supplied configuration information. - pub fn create_source(scope: G, addr: &str, topic: &str, group: &str) -> (Box, Stream) + pub fn create_source(scope: G, addr: &str, topic: &str, group: &str) -> (Box, Stream>) where G: Scope, D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>, @@ -166,7 +166,7 @@ pub mod kafka { }) } - pub fn create_sink(stream: &Stream, addr: &str, topic: &str) -> Box + pub fn create_sink(stream: &Stream>, addr: &str, topic: &str) -> Box where G: Scope, D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>, diff --git a/src/capture.rs b/src/capture.rs index 9ad7725ca..e339b2dbb 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -292,7 +292,7 @@ pub mod source { pub fn build( scope: G, source_builder: B, - ) -> (Box, Stream) + ) -> (Box, Stream>) where G: Scope, B: FnOnce(SyncActivator) -> I, @@ -606,7 +606,7 @@ pub mod sink { /// performed before calling the method, the recorded output may not be correctly /// reconstructed by readers. pub fn build( - stream: &Stream, + stream: &Stream>, sink_hash: u64, updates_sink: Weak>, progress_sink: Weak>, diff --git a/src/collection.rs b/src/collection.rs index eb82c5d0d..ce7915c92 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -42,7 +42,7 @@ pub struct Collection { /// /// This field is exposed to support direct timely dataflow manipulation when required, but it is /// not intended to be the idiomatic way to work with the collection. - pub inner: Stream + pub inner: Stream> } impl Collection where G::Timestamp: Data { @@ -52,7 +52,7 @@ impl Collection where G::Timestamp: Da /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait /// provides a `new_collection` method which will create a new collection for you without exposing /// the underlying timely stream at all. - pub fn new(stream: Stream) -> Collection { + pub fn new(stream: Stream>) -> Collection { Collection { inner: stream } } /// Creates a new collection by applying the supplied function to each input element. @@ -678,7 +678,7 @@ pub trait AsCollection { fn as_collection(&self) -> Collection; } -impl AsCollection for Stream { +impl AsCollection for Stream> { fn as_collection(&self) -> Collection { Collection::new(self.clone()) } diff --git a/src/input.rs b/src/input.rs index d3cdff34f..b5407b379 100644 --- a/src/input.rs +++ b/src/input.rs @@ -189,7 +189,7 @@ impl Input for G where ::Timestamp: Lattice { pub struct InputSession { time: T, buffer: Vec<(D, T, R)>, - handle: Handle, + handle: Handle>, } impl InputSession { @@ -236,7 +236,7 @@ impl InputSession { } /// Creates a new session from a reference to an input handle. - pub fn from(handle: Handle) -> Self { + pub fn from(handle: Handle>) -> Self { InputSession { time: handle.time().clone(), buffer: Vec::new(), diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 2d4318e43..134496097 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -54,7 +54,7 @@ where /// This stream contains the same batches of updates the trace itself accepts, so there should /// be no additional overhead to receiving these records. The batches can be navigated just as /// the batches in the trace, by key and by value. - pub stream: Stream, + pub stream: Stream>, /// A shared trace, updated by the `Arrange` operator and readable by others. pub trace: Tr, // TODO : We might have an `Option>` here, which `as_collection` sets and @@ -225,7 +225,7 @@ where /// /// This method exists for streams of batches without the corresponding arrangement. /// If you have the arrangement, its `flat_map_ref` method is equivalent to this. - pub fn flat_map_batches(stream: &Stream, mut logic: L) -> Collection + pub fn flat_map_batches(stream: &Stream>, mut logic: L) -> Collection where Tr::R: Semigroup, I: IntoIterator, @@ -259,7 +259,7 @@ where /// /// This method consumes a stream of (key, time) queries and reports the corresponding stream of /// (key, value, time, diff) accumulations in the `self` trace. - pub fn lookup(&self, queries: &Stream) -> Stream + pub fn lookup(&self, queries: &Stream>) -> Stream> where G::Timestamp: Data+Lattice+Ord+TotalOrder, Tr::Key: ExchangeData+Hashable, @@ -485,7 +485,7 @@ where /// is the correct way to determine that times in the shared trace are committed. fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, + P: ParallelizationContract>, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, ; @@ -522,7 +522,7 @@ where fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, + P: ParallelizationContract>, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, { @@ -689,7 +689,7 @@ where { fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, + P: ParallelizationContract>, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, { diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 468398ae6..f4812d3c5 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -136,7 +136,7 @@ use super::TraceAgent; /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. pub fn arrange_from_upsert( - stream: &Stream, G::Timestamp)>, + stream: &Stream, G::Timestamp)>>, name: &str, ) -> Arranged> where diff --git a/src/operators/iterate.rs b/src/operators/iterate.rs index ac3636703..23601bd09 100644 --- a/src/operators/iterate.rs +++ b/src/operators/iterate.rs @@ -164,7 +164,7 @@ impl Iterate for G { pub struct Variable where G::Timestamp: Lattice { collection: Collection, - feedback: Handle, + feedback: Handle>, source: Option>, step: ::Summary, } @@ -235,7 +235,7 @@ impl Deref for Variable where G::Timesta pub struct SemigroupVariable where G::Timestamp: Lattice { collection: Collection, - feedback: Handle, + feedback: Handle>, step: ::Summary, } diff --git a/src/operators/join.rs b/src/operators/join.rs index 2db2560e1..645e5983d 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -689,7 +689,7 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) + fn work(&mut self, output: &mut OutputHandle, Tee>>, mut logic: L, fuel: &mut usize) where I: IntoIterator, L: FnMut(&K, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I { let meet = self.capability.time(); diff --git a/tests/import.rs b/tests/import.rs index fb2fd6136..5427db781 100644 --- a/tests/import.rs +++ b/tests/import.rs @@ -13,7 +13,7 @@ use differential_dataflow::operators::reduce::Reduce; use differential_dataflow::trace::TraceReader; use itertools::Itertools; -type Result = std::sync::mpsc::Receiver>; +type Result = std::sync::mpsc::Receiver>>; fn run_test(test: T, expected: Vec<(usize, Vec<((u64, i64), i64)>)>) -> () where T: FnOnce(Vec>)-> Result + ::std::panic::UnwindSafe From abbaadc33c3118388f1e2d2885232ea53330be2a Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 16 Mar 2023 22:41:04 -0400 Subject: [PATCH 2/2] (remove) Point at antiguru's fork Signed-off-by: Moritz Hoffmann --- Cargo.toml | 3 ++- dogsdogsdogs/Cargo.toml | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 220eacef6..5bd7efb59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,8 @@ serde_derive = "1.0" abomonation = "0.7" abomonation_derive = "0.5" #timely = { version = "0.12", default-features = false } -timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } +#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } +timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "remove_core", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } fnv="1.0.2" diff --git a/dogsdogsdogs/Cargo.toml b/dogsdogsdogs/Cargo.toml index cae04f734..bb7cbb836 100644 --- a/dogsdogsdogs/Cargo.toml +++ b/dogsdogsdogs/Cargo.toml @@ -7,7 +7,8 @@ license = "MIT" [dependencies] abomonation = "0.7" abomonation_derive = "0.5" -timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } +#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } +timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "remove_core", default-features = false } differential-dataflow = { path = "../", default-features = false } serde = "1" serde_derive = "1"