Skip to content

Commit c7b387b

Browse files
authored
Adjust Differential to recent Timely changes (#643)
* Adjust Differential to recent Timely changes Signed-off-by: Moritz Hoffmann <[email protected]> * Cleanup, use MergerChunk to clear in reduce Signed-off-by: Moritz Hoffmann <[email protected]> * Address feedback Signed-off-by: Moritz Hoffmann <[email protected]> * Back to published Timely Signed-off-by: Moritz Hoffmann <[email protected]> --------- Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent a6eee82 commit c7b387b

File tree

13 files changed

+102
-106
lines changed

13 files changed

+102
-106
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ resolver = "2"
1717

1818
[workspace.dependencies]
1919
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.16.2" }
20-
timely = { version = "0.23", default-features = false }
20+
timely = { version = "0.24", default-features = false }
2121
columnar = { version = "0.10", default-features = false }
2222
#timely = { path = "../timely-dataflow/timely/", default-features = false }
2323

differential-dataflow/examples/columnar.rs

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
//! Wordcount based on `columnar`.
22
3-
use {
4-
timely::container::{Container, CapacityContainerBuilder},
5-
timely::dataflow::channels::pact::ExchangeCore,
6-
timely::dataflow::InputHandleCore,
7-
timely::dataflow::ProbeHandle,
8-
};
3+
use timely::container::{CapacityContainerBuilder, PushInto};
4+
use timely::dataflow::channels::pact::ExchangeCore;
5+
use timely::dataflow::InputHandleCore;
6+
use timely::dataflow::ProbeHandle;
97

108
use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;
119

@@ -65,7 +63,7 @@ fn main() {
6563
while i < size {
6664
let val = (counter + i) % keys;
6765
write!(buffer, "{:?}", val).unwrap();
68-
container.push(((&buffer, ()), time, 1));
66+
container.push_into(((&buffer, ()), time, 1));
6967
buffer.clear();
7068
i += worker.peers();
7169
}
@@ -88,7 +86,7 @@ fn main() {
8886
while i < size {
8987
let val = (queries + i) % keys;
9088
write!(buffer, "{:?}", val).unwrap();
91-
container.push(((&buffer, ()), time, 1));
89+
container.push_into(((&buffer, ()), time, 1));
9290
buffer.clear();
9391
i += worker.peers();
9492
}
@@ -169,26 +167,29 @@ mod container {
169167
pub fn get(&self, index: usize) -> columnar::Ref<'_, C> {
170168
self.borrow().get(index)
171169
}
172-
}
173-
174-
use timely::Container;
175-
impl<C: Columnar> Container for Column<C> {
176-
fn len(&self) -> usize {
177-
match self {
178-
Column::Typed(t) => t.len(),
179-
Column::Bytes(b) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(),
180-
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(),
181-
}
182-
}
183170
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
184-
fn clear(&mut self) {
171+
pub(crate) fn clear(&mut self) {
185172
match self {
186173
Column::Typed(t) => t.clear(),
187174
Column::Bytes(_) => *self = Column::Typed(Default::default()),
188175
Column::Align(_) => *self = Column::Typed(Default::default()),
189176
}
190177
}
178+
#[inline]
179+
pub fn len(&self) -> usize {
180+
match self {
181+
Column::Typed(t) => t.len(),
182+
Column::Bytes(b) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(),
183+
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(),
184+
}
185+
}
186+
}
187+
188+
impl<C: Columnar> timely::Accountable for Column<C> {
189+
#[inline] fn record_count(&self) -> i64 { self.len() as i64 }
190+
}
191191

192+
impl<C: Columnar> timely::container::IterContainer for Column<C> {
192193
type ItemRef<'a> = columnar::Ref<'a, C>;
193194
type Iter<'a> = IterOwn<BorrowedOf<'a, C>>;
194195
fn iter<'a>(&'a self) -> Self::Iter<'a> {
@@ -198,7 +199,9 @@ mod container {
198199
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(),
199200
}
200201
}
202+
}
201203

204+
impl<C: Columnar> timely::container::DrainContainer for Column<C> {
202205
type Item<'a> = columnar::Ref<'a, C>;
203206
type DrainIter<'a> = IterOwn<BorrowedOf<'a, C>>;
204207
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
@@ -366,7 +369,7 @@ pub mod batcher {
366369
use std::collections::VecDeque;
367370
use columnar::Columnar;
368371
use timely::Container;
369-
use timely::container::{ContainerBuilder, PushInto};
372+
use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
370373
use differential_dataflow::difference::Semigroup;
371374
use crate::Column;
372375

@@ -389,7 +392,7 @@ pub mod batcher {
389392
ready: VecDeque<C>,
390393
}
391394

392-
impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
395+
impl<C: Container> ContainerBuilder for Chunker<C> {
393396
type Container = C;
394397

395398
fn extract(&mut self) -> Option<&mut Self::Container> {
@@ -414,9 +417,11 @@ pub mod batcher {
414417
for<'b> columnar::Ref<'b, T>: Ord,
415418
R: for<'b> Columnar + for<'b> Semigroup<columnar::Ref<'b, R>>,
416419
for<'b> columnar::Ref<'b, R>: Ord,
417-
C2: Container + for<'b, 'c> PushInto<(columnar::Ref<'b, D>, columnar::Ref<'b, T>, &'c R)>,
420+
C2: Container + SizableContainer + for<'b, 'c> PushInto<(columnar::Ref<'b, D>, columnar::Ref<'b, T>, &'c R)>,
418421
{
419422
fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
423+
let mut target: C2 = Default::default();
424+
target.ensure_capacity(&mut Some(std::mem::take(&mut self.empty)));
420425

421426
// Scoped to let borrow through `permutation` drop.
422427
{
@@ -426,7 +431,6 @@ pub mod batcher {
426431
permutation.extend(container.drain());
427432
permutation.sort();
428433

429-
self.empty.clear();
430434
// Iterate over the data, accumulating diffs for like keys.
431435
let mut iter = permutation.drain(..);
432436
if let Some((data, time, diff)) = iter.next() {
@@ -442,7 +446,7 @@ pub mod batcher {
442446
else {
443447
if !prev_diff.is_zero() {
444448
let tuple = (prev_data, prev_time, &prev_diff);
445-
self.empty.push_into(tuple);
449+
target.push_into(tuple);
446450
}
447451
prev_data = data;
448452
prev_time = time;
@@ -452,13 +456,13 @@ pub mod batcher {
452456

453457
if !prev_diff.is_zero() {
454458
let tuple = (prev_data, prev_time, &prev_diff);
455-
self.empty.push_into(tuple);
459+
target.push_into(tuple);
456460
}
457461
}
458462
}
459463

460-
if !self.empty.is_empty() {
461-
self.ready.push_back(std::mem::take(&mut self.empty));
464+
if !target.is_empty() {
465+
self.ready.push_back(target);
462466
}
463467
}
464468
}
@@ -468,7 +472,7 @@ pub mod batcher {
468472

469473
use timely::progress::{Antichain, frontier::AntichainRef};
470474
use columnar::Columnar;
471-
475+
use timely::container::PushInto;
472476
use crate::container::Column;
473477
use differential_dataflow::difference::Semigroup;
474478

@@ -502,7 +506,6 @@ pub mod batcher {
502506
}
503507
}
504508
fn is_empty(&self) -> bool {
505-
use timely::Container;
506509
self.head == self.list.len()
507510
}
508511
fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
@@ -551,8 +554,7 @@ pub mod batcher {
551554
let stash2: R = R::into_owned(diff2);
552555
stash.plus_equals(&stash2);
553556
if !stash.is_zero() {
554-
use timely::Container;
555-
self.push((data, time, &*stash));
557+
self.push_into((data, time, &*stash));
556558
}
557559
}
558560
fn account(&self) -> (usize, usize, usize, usize) {
@@ -568,6 +570,7 @@ pub mod batcher {
568570
// self.heap_size(cb);
569571
// (self.len(), size, capacity, allocations)
570572
}
573+
#[inline] fn clear(&mut self) { self.clear() }
571574
}
572575
}
573576

@@ -578,7 +581,7 @@ use dd_builder::ColKeyBuilder;
578581
pub mod dd_builder {
579582

580583
use columnar::Columnar;
581-
584+
use timely::container::DrainContainer;
582585
use differential_dataflow::trace::Builder;
583586
use differential_dataflow::trace::Description;
584587
use differential_dataflow::trace::implementations::Layout;
@@ -630,8 +633,6 @@ pub mod dd_builder {
630633

631634
#[inline]
632635
fn push(&mut self, chunk: &mut Self::Input) {
633-
use timely::Container;
634-
635636
// NB: Maintaining owned key and val across iterations to track the "last", which we clone into,
636637
// is somewhat appealing from an ease point of view. Might still allocate, do work we don't need,
637638
// but avoids e.g. calls into `last()` and breaks horrid trait requirements.
@@ -737,8 +738,6 @@ pub mod dd_builder {
737738

738739
#[inline]
739740
fn push(&mut self, chunk: &mut Self::Input) {
740-
use timely::Container;
741-
742741
// NB: Maintaining owned key and val across iterations to track the "last", which we clone into,
743742
// is somewhat appealing from an ease point of view. Might still allocate, do work we don't need,
744743
// but avoids e.g. calls into `last()` and breaks horrid trait requirements.

differential-dataflow/src/collection.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
1111
use std::hash::Hash;
1212

13-
use timely::Container;
14-
use timely::Data;
13+
use timely::{Container, Data};
1514
use timely::progress::Timestamp;
1615
use timely::order::Product;
1716
use timely::dataflow::scopes::{Child, child::Iterative};
@@ -67,7 +66,7 @@ impl<G: Scope, D, R, C> Collection<G, D, R, C> {
6766
Collection { inner: stream, phantom: std::marker::PhantomData }
6867
}
6968
}
70-
impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
69+
impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
7170
/// Creates a new collection accumulating the contents of the two collections.
7271
///
7372
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
@@ -684,7 +683,7 @@ where
684683
G: Scope,
685684
D: Data,
686685
R: Semigroup + 'static,
687-
C: Container + Clone + 'static,
686+
C: Container,
688687
I: IntoIterator<Item=Collection<G, D, R, C>>,
689688
{
690689
scope

differential-dataflow/src/consolidation.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
1313
use std::cmp::Ordering;
1414
use std::collections::VecDeque;
15-
use timely::Container;
16-
use timely::container::{ContainerBuilder, PushInto};
15+
use timely::container::{ContainerBuilder, DrainContainer, PushInto};
1716
use crate::Data;
1817
use crate::difference::{IsZero, Semigroup};
1918

@@ -239,7 +238,7 @@ where
239238
/// items. Consolidation accumulates the diffs per key.
240239
///
241240
/// The trait requires `Container` to have access to its `Item` GAT.
242-
pub trait ConsolidateLayout: Container {
241+
pub trait ConsolidateLayout: DrainContainer {
243242
/// Key portion of data, essentially everything minus the diff
244243
type Key<'a>: Eq where Self: 'a;
245244

@@ -269,6 +268,12 @@ pub trait ConsolidateLayout: Container {
269268
/// Compare two items by key to sort containers.
270269
fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
271270

271+
/// Returns the number of items in the container.
272+
fn len(&self) -> usize;
273+
274+
/// Clear the container. Afterwards, `len()` should return 0.
275+
fn clear(&mut self);
276+
272277
/// Consolidate the supplied container.
273278
fn consolidate_into(&mut self, target: &mut Self) {
274279
// Sort input data
@@ -329,6 +334,9 @@ where
329334
self.push((data, time, diff));
330335
}
331336

337+
#[inline] fn len(&self) -> usize { Vec::len(self) }
338+
#[inline] fn clear(&mut self) { Vec::clear(self) }
339+
332340
/// Consolidate the supplied container.
333341
fn consolidate_into(&mut self, target: &mut Self) {
334342
consolidate_updates(self);

differential-dataflow/src/containers.rs

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -271,41 +271,27 @@ mod container {
271271
use std::ops::Deref;
272272

273273
use columnation::Columnation;
274-
use timely::Container;
275-
use timely::container::SizableContainer;
276274

277275
use crate::containers::TimelyStack;
278276

279-
impl<T: Columnation> Container for TimelyStack<T> {
277+
impl<T: Columnation> timely::container::Accountable for TimelyStack<T> {
278+
#[inline] fn record_count(&self) -> i64 { i64::try_from(self.local.len()).unwrap() }
279+
#[inline] fn is_empty(&self) -> bool { self.local.is_empty() }
280+
}
281+
impl<T: Columnation> timely::container::IterContainer for TimelyStack<T> {
280282
type ItemRef<'a> = &'a T where Self: 'a;
281-
type Item<'a> = &'a T where Self: 'a;
282-
283-
fn len(&self) -> usize {
284-
self.local.len()
285-
}
286-
287-
fn is_empty(&self) -> bool {
288-
self.local.is_empty()
289-
}
290-
291-
fn clear(&mut self) {
292-
TimelyStack::clear(self)
293-
}
294-
295283
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
296-
297-
fn iter(&self) -> Self::Iter<'_> {
298-
self.deref().iter()
299-
}
300-
284+
#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
285+
}
286+
impl<T: Columnation> timely::container::DrainContainer for TimelyStack<T> {
287+
type Item<'a> = &'a T where Self: 'a;
301288
type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a;
302-
303-
fn drain(&mut self) -> Self::DrainIter<'_> {
289+
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
304290
(*self).iter()
305291
}
306292
}
307293

308-
impl<T: Columnation> SizableContainer for TimelyStack<T> {
294+
impl<T: Columnation> timely::container::SizableContainer for TimelyStack<T> {
309295
fn at_capacity(&self) -> bool {
310296
self.len() == self.capacity()
311297
}

differential-dataflow/src/operators/arrange/arrangement.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::difference::Semigroup;
3131
use crate::lattice::Lattice;
3232
use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
3333
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};
34+
use crate::trace::implementations::merge_batcher::container::MergerChunk;
3435

3536
use trace::wrappers::enter::{TraceEnter, BatchEnter,};
3637
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
@@ -254,7 +255,7 @@ where
254255
Time=T1::Time,
255256
Diff: Abelian,
256257
>+'static,
257-
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
258+
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
258259
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
259260
{
260261
self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
@@ -276,7 +277,7 @@ where
276277
ValOwn: Data,
277278
Time=T1::Time,
278279
>+'static,
279-
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
280+
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
280281
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
281282
{
282283
use crate::operators::reduce::reduce_trace;
@@ -354,7 +355,7 @@ pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P
354355
where
355356
G: Scope<Timestamp: Lattice>,
356357
P: ParallelizationContract<G::Timestamp, Ba::Input>,
357-
Ba: Batcher<Time=G::Timestamp,Input: Container + Clone + 'static> + 'static,
358+
Ba: Batcher<Time=G::Timestamp,Input: Container> + 'static,
358359
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
359360
Tr: Trace<Time=G::Timestamp>+'static,
360361
{

0 commit comments

Comments
 (0)