Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ serde_derive = "1.0"
abomonation = "0.7"
abomonation_derive = "0.5"
#timely = { version = "0.12", default-features = false }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "remove_core", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }
fnv="1.0.2"

Expand Down
3 changes: 2 additions & 1 deletion dogsdogsdogs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ license = "MIT"
[dependencies]
abomonation = "0.7"
abomonation_derive = "0.5"
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "remove_core", default-features = false }
differential-dataflow = { path = "../", default-features = false }
serde = "1"
serde_derive = "1"
Expand Down
4 changes: 2 additions & 2 deletions dogsdogsdogs/examples/delta_query2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ fn main() {

use timely::dataflow::operators::unordered_input::UnorderedHandle;

let ((input1, capability1), data1): ((UnorderedHandle<Product<usize, usize>, ((usize, usize), Product<usize, usize>, isize)>, _), _) = inner.new_unordered_input();
let ((input2, capability2), data2): ((UnorderedHandle<Product<usize, usize>, ((usize, usize), Product<usize, usize>, isize)>, _), _) = inner.new_unordered_input();
let ((input1, capability1), data1): ((UnorderedHandle<Product<usize, usize>, Vec<((usize, usize), Product<usize, usize>, isize)>>, _), _) = inner.new_unordered_input();
let ((input2, capability2), data2): ((UnorderedHandle<Product<usize, usize>, Vec<((usize, usize), Product<usize, usize>, isize)>>, _), _) = inner.new_unordered_input();

let edges1 = data1.as_collection();
let edges2 = data2.as_collection();
Expand Down
4 changes: 2 additions & 2 deletions examples/capture-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub mod kafka {
use differential_dataflow::lattice::Lattice;

/// Creates a Kafka source from supplied configuration information.
pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, Vec<(D, T, R)>>)
where
G: Scope<Timestamp = T>,
D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
Expand All @@ -166,7 +166,7 @@ pub mod kafka {
})
}

pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
pub fn create_sink<G, D, T, R>(stream: &Stream<G, Vec<(D, T, R)>>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
where
G: Scope<Timestamp = T>,
D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
Expand Down
4 changes: 2 additions & 2 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ pub mod source {
pub fn build<G, B, I, D, T, R>(
scope: G,
source_builder: B,
) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, Vec<(D, T, R)>>)
where
G: Scope<Timestamp = T>,
B: FnOnce(SyncActivator) -> I,
Expand Down Expand Up @@ -606,7 +606,7 @@ pub mod sink {
/// performed before calling the method, the recorded output may not be correctly
/// reconstructed by readers.
pub fn build<G, BS, D, T, R>(
stream: &Stream<G, (D, T, R)>,
stream: &Stream<G, Vec<(D, T, R)>>,
sink_hash: u64,
updates_sink: Weak<RefCell<BS>>,
progress_sink: Weak<RefCell<BS>>,
Expand Down
6 changes: 3 additions & 3 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct Collection<G: Scope, D, R: Semigroup = isize> {
///
/// This field is exposed to support direct timely dataflow manipulation when required, but it is
/// not intended to be the idiomatic way to work with the collection.
pub inner: Stream<G, (D, G::Timestamp, R)>
pub inner: Stream<G, Vec<(D, G::Timestamp, R)>>
}

impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Data {
Expand All @@ -52,7 +52,7 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
/// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
/// provides a `new_collection` method which will create a new collection for you without exposing
/// the underlying timely stream at all.
pub fn new(stream: Stream<G, (D, G::Timestamp, R)>) -> Collection<G, D, R> {
pub fn new(stream: Stream<G, Vec<(D, G::Timestamp, R)>>) -> Collection<G, D, R> {
Collection { inner: stream }
}
/// Creates a new collection by applying the supplied function to each input element.
Expand Down Expand Up @@ -678,7 +678,7 @@ pub trait AsCollection<G: Scope, D: Data, R: Semigroup> {
fn as_collection(&self) -> Collection<G, D, R>;
}

impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, (D, G::Timestamp, R)> {
impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, Vec<(D, G::Timestamp, R)>> {
fn as_collection(&self) -> Collection<G, D, R> {
Collection::new(self.clone())
}
Expand Down
4 changes: 2 additions & 2 deletions src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl<G: TimelyInput> Input for G where <G as ScopeParent>::Timestamp: Lattice {
pub struct InputSession<T: Timestamp+Clone, D: Data, R: Semigroup> {
time: T,
buffer: Vec<(D, T, R)>,
handle: Handle<T,(D,T,R)>,
handle: Handle<T,Vec<(D,T,R)>>,
}

impl<T: Timestamp+Clone, D: Data> InputSession<T, D, isize> {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl<T: Timestamp+Clone, D: Data, R: Semigroup> InputSession<T, D, R> {
}

/// Creates a new session from a reference to an input handle.
pub fn from(handle: Handle<T,(D,T,R)>) -> Self {
pub fn from(handle: Handle<T,Vec<(D,T,R)>>) -> Self {
InputSession {
time: handle.time().clone(),
buffer: Vec::new(),
Expand Down
12 changes: 6 additions & 6 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where
/// This stream contains the same batches of updates the trace itself accepts, so there should
/// be no additional overhead to receiving these records. The batches can be navigated just as
/// the batches in the trace, by key and by value.
pub stream: Stream<G, Tr::Batch>,
pub stream: Stream<G, Vec<Tr::Batch>>,
/// A shared trace, updated by the `Arrange` operator and readable by others.
pub trace: Tr,
// TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
Expand Down Expand Up @@ -225,7 +225,7 @@ where
///
/// This method exists for streams of batches without the corresponding arrangement.
/// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::R>
pub fn flat_map_batches<I, L>(stream: &Stream<G, Vec<Tr::Batch>>, mut logic: L) -> Collection<G, I::Item, Tr::R>
where
Tr::R: Semigroup,
I: IntoIterator,
Expand Down Expand Up @@ -259,7 +259,7 @@ where
///
/// This method consumes a stream of (key, time) queries and reports the corresponding stream of
/// (key, value, time, diff) accumulations in the `self` trace.
pub fn lookup(&self, queries: &Stream<G, (Tr::Key, G::Timestamp)>) -> Stream<G, (Tr::Key, Tr::Val, G::Timestamp, Tr::R)>
pub fn lookup(&self, queries: &Stream<G, Vec<(Tr::Key, G::Timestamp)>>) -> Stream<G, Vec<(Tr::Key, Tr::Val, G::Timestamp, Tr::R)>>
where
G::Timestamp: Data+Lattice+Ord+TotalOrder,
Tr::Key: ExchangeData+Hashable,
Expand Down Expand Up @@ -485,7 +485,7 @@ where
/// is the correct way to determine that times in the shared trace are committed.
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
P: ParallelizationContract<G::Timestamp, Vec<((K,V),G::Timestamp,R)>>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch,
;
Expand Down Expand Up @@ -522,7 +522,7 @@ where

fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
P: ParallelizationContract<G::Timestamp, Vec<((K,V),G::Timestamp,R)>>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch,
{
Expand Down Expand Up @@ -689,7 +689,7 @@ where
{
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
P: ParallelizationContract<G::Timestamp, Vec<((K,()),G::Timestamp,R)>>,
Tr: Trace+TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R>+'static,
Tr::Batch: Batch,
{
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ use super::TraceAgent;
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert<G, Tr>(
stream: &Stream<G, (Tr::Key, Option<Tr::Val>, G::Timestamp)>,
stream: &Stream<G, Vec<(Tr::Key, Option<Tr::Val>, G::Timestamp)>>,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where
Expand Down
4 changes: 2 additions & 2 deletions src/operators/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl<G: Scope, D: Ord+Data+Debug, R: Semigroup> Iterate<G, D, R> for G {
pub struct Variable<G: Scope, D: Data, R: Abelian>
where G::Timestamp: Lattice {
collection: Collection<G, D, R>,
feedback: Handle<G, (D, G::Timestamp, R)>,
feedback: Handle<G, Vec<(D, G::Timestamp, R)>>,
source: Option<Collection<G, D, R>>,
step: <G::Timestamp as Timestamp>::Summary,
}
Expand Down Expand Up @@ -235,7 +235,7 @@ impl<G: Scope, D: Data, R: Abelian> Deref for Variable<G, D, R> where G::Timesta
pub struct SemigroupVariable<G: Scope, D: Data, R: Semigroup>
where G::Timestamp: Lattice {
collection: Collection<G, D, R>,
feedback: Handle<G, (D, G::Timestamp, R)>,
feedback: Handle<G, Vec<(D, G::Timestamp, R)>>,
step: <G::Timestamp as Timestamp>::Summary,
}

Expand Down
2 changes: 1 addition & 1 deletion src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ where

/// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
#[inline(never)]
fn work<L, I>(&mut self, output: &mut OutputHandle<T, (D, T, R), Tee<T, (D, T, R)>>, mut logic: L, fuel: &mut usize)
fn work<L, I>(&mut self, output: &mut OutputHandle<T, Vec<(D, T, R)>, Tee<T, Vec<(D, T, R)>>>, mut logic: L, fuel: &mut usize)
where I: IntoIterator<Item=(D, T, R)>, L: FnMut(&K, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I {

let meet = self.capability.time();
Expand Down
2 changes: 1 addition & 1 deletion tests/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::trace::TraceReader;
use itertools::Itertools;

type Result = std::sync::mpsc::Receiver<timely::dataflow::operators::capture::Event<usize, ((u64, i64), usize, i64)>>;
type Result = std::sync::mpsc::Receiver<timely::dataflow::operators::capture::Event<usize, Vec<((u64, i64), usize, i64)>>>;

fn run_test<T>(test: T, expected: Vec<(usize, Vec<((u64, i64), i64)>)>) -> ()
where T: FnOnce(Vec<Vec<((u64, u64), i64)>>)-> Result + ::std::panic::UnwindSafe
Expand Down