Skip to content

Commit 843ac7f

Browse files
committed
Form key batches in columnar
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent cecae53 commit 843ac7f

File tree

2 files changed

+146
-16
lines changed

2 files changed

+146
-16
lines changed

differential-dataflow/examples/columnar.rs

Lines changed: 144 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ use {
77
timely::dataflow::ProbeHandle,
88
};
99

10-
11-
// use differential_dataflow::trace::implementations::ord_neu::ColKeyBuilder;
12-
use differential_dataflow::trace::implementations::ord_neu::ColValSpine;
10+
use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;
1311

1412
use differential_dataflow::operators::arrange::arrangement::arrange_core;
1513

@@ -46,8 +44,8 @@ fn main() {
4644
let data_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>() as u64);
4745
let keys_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>() as u64);
4846

49-
let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColValSpine<_,_,_,_>>(&data, data_pact, "Data");
50-
let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColValSpine<_,_,_,_>>(&keys, keys_pact, "Keys");
47+
let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data");
48+
let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys");
5149

5250
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
5351
.probe_with(&mut probe);
@@ -374,7 +372,7 @@ pub mod batcher {
374372

375373
/// A batcher for columnar storage.
376374
pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<Column<((K,V),T,R)>, Chunker<Column<((K,V),T,R)>>, merger::ColumnMerger<(K,V),T,R>>;
377-
pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
375+
pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
378376

379377
// First draft: build a "chunker" and a "merger".
380378

@@ -522,7 +520,7 @@ pub mod batcher {
522520
}
523521
}
524522

525-
impl<D, T, R> MergerChunk for Column<(D, T, R)>
523+
impl<D, T, R> MergerChunk for Column<(D, T, R)>
526524
where
527525
D: Columnar + 'static,
528526
T: timely::PartialOrder + Clone + Columnar + 'static,
@@ -575,23 +573,23 @@ pub mod dd_builder {
575573
use columnar::Columnar;
576574

577575
use timely::container::PushInto;
578-
576+
579577
use differential_dataflow::IntoOwned;
580578
use differential_dataflow::trace::Builder;
581579
use differential_dataflow::trace::Description;
582580
use differential_dataflow::trace::implementations::Layout;
583581
use differential_dataflow::trace::implementations::Update;
584582
use differential_dataflow::trace::implementations::BatchContainer;
585-
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, val_batch::OrdValStorage};
586-
583+
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, val_batch::OrdValStorage, OrdKeyBatch};
584+
use differential_dataflow::trace::implementations::ord_neu::key_batch::OrdKeyStorage;
587585
use crate::Column;
588586

589587

590588
use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
591589
use differential_dataflow::trace::implementations::TStack;
592-
590+
593591
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>>>;
594-
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdValBuilder<TStack<((K,()),T,R)>>>;
592+
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>>>;
595593

596594
/// A builder for creating layers from unsorted update tuples.
597595
pub struct OrdValBuilder<L: Layout> {
@@ -659,7 +657,7 @@ pub mod dd_builder {
659657

660658
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
661659
// We don't introduce zero offsets as they will be introduced by the first `push` call.
662-
Self {
660+
Self {
663661
result: OrdValStorage {
664662
keys: L::KeyContainer::with_capacity(keys),
665663
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
@@ -736,8 +734,139 @@ pub mod dd_builder {
736734
for mut chunk in chain.drain(..) {
737735
builder.push(&mut chunk);
738736
}
739-
737+
740738
builder.done(description)
741739
}
742740
}
743-
}
741+
742+
/// A builder for creating layers from unsorted update tuples.
743+
pub struct OrdKeyBuilder<L: Layout> {
744+
/// The in-progress result.
745+
///
746+
/// This is public to allow container implementors to set and inspect their container.
747+
pub result: OrdKeyStorage<L>,
748+
singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
749+
/// Counts the number of singleton optimizations we performed.
750+
///
751+
/// This number allows us to correctly gauge the total number of updates reflected in a batch,
752+
/// even though `updates.len()` may be much shorter than this amount.
753+
singletons: usize,
754+
}
755+
756+
impl<L: Layout> OrdKeyBuilder<L> {
757+
/// Pushes a single update, which may set `self.singleton` rather than push.
758+
///
759+
/// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
760+
/// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
761+
/// to encode a singleton update with an "absert" update: repeating the most recent offset.
762+
/// This otherwise invalid state encodes "look back one element".
763+
///
764+
/// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
765+
/// previously pushed update exactly. In that case, we do not push the update into `updates`.
766+
/// The update tuple is retained in `self.singleton` in case we see another update and need
767+
/// to recover the singleton to push it into `updates` to join the second update.
768+
fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
769+
// If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
770+
if self.result.times.last().map(|t| t == <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) &&
771+
self.result.diffs.last().map(|d| d == <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true)
772+
{
773+
assert!(self.singleton.is_none());
774+
self.singleton = Some((time, diff));
775+
}
776+
else {
777+
// If we have pushed a single element, we need to copy it out to meet this one.
778+
if let Some((time, diff)) = self.singleton.take() {
779+
self.result.times.push(time);
780+
self.result.diffs.push(diff);
781+
}
782+
self.result.times.push(time);
783+
self.result.diffs.push(diff);
784+
}
785+
}
786+
}
787+
788+
// The layout `L` determines the key, val, time, and diff types.
789+
impl<L> Builder for OrdKeyBuilder<L>
790+
where
791+
L: Layout,
792+
<L::KeyContainer as BatchContainer>::Owned: Columnar,
793+
<L::ValContainer as BatchContainer>::Owned: Columnar,
794+
<L::TimeContainer as BatchContainer>::Owned: Columnar,
795+
<L::DiffContainer as BatchContainer>::Owned: Columnar,
796+
// These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
797+
for<'a> L::KeyContainer: PushInto<&'a <L::KeyContainer as BatchContainer>::Owned>,
798+
for<'a> L::ValContainer: PushInto<&'a <L::ValContainer as BatchContainer>::Owned>,
799+
for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
800+
for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
801+
{
802+
type Input = Column<((<L::KeyContainer as BatchContainer>::Owned,<L::ValContainer as BatchContainer>::Owned),<L::TimeContainer as BatchContainer>::Owned,<L::DiffContainer as BatchContainer>::Owned)>;
803+
type Time = <L::Target as Update>::Time;
804+
type Output = OrdKeyBatch<L>;
805+
806+
fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
807+
// We don't introduce zero offsets as they will be introduced by the first `push` call.
808+
Self {
809+
result: OrdKeyStorage {
810+
keys: L::KeyContainer::with_capacity(keys),
811+
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
812+
times: L::TimeContainer::with_capacity(upds),
813+
diffs: L::DiffContainer::with_capacity(upds),
814+
},
815+
singleton: None,
816+
singletons: 0,
817+
}
818+
}
819+
820+
#[inline]
821+
fn push(&mut self, chunk: &mut Self::Input) {
822+
use timely::Container;
823+
824+
// NB: Maintaining owned key and val across iterations to track the "last", which we clone into,
825+
// is somewhat appealing from an ease point of view. Might still allocate, do work we don't need,
826+
// but avoids e.g. calls into `last()` and breaks horrid trait requirements.
827+
// Owned key and val would need to be members of `self`, as this method can be called multiple times,
828+
// and we need to correctly cache last for reasons of correctness, not just performance.
829+
830+
for ((key,_val),time,diff) in chunk.drain() {
831+
// It would be great to avoid.
832+
let key = <<L::KeyContainer as BatchContainer>::Owned as Columnar>::into_owned(key);
833+
// These feel fine (wrt the other versions)
834+
let time = <<L::TimeContainer as BatchContainer>::Owned as Columnar>::into_owned(time);
835+
let diff = <<L::DiffContainer as BatchContainer>::Owned as Columnar>::into_owned(diff);
836+
837+
// Perhaps this is a continuation of an already received key.
838+
if self.result.keys.last().map(|k| <<L::KeyContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&key).eq(&k)).unwrap_or(false) {
839+
self.push_update(time, diff);
840+
} else {
841+
// New key; complete representation of prior key.
842+
self.result.keys_offs.push(self.result.times.len());
843+
if self.singleton.take().is_some() { self.singletons += 1; }
844+
self.push_update(time, diff);
845+
self.result.keys.push(&key);
846+
}
847+
}
848+
}
849+
850+
#[inline(never)]
851+
fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
852+
// Record the final offsets
853+
self.result.keys_offs.push(self.result.times.len());
854+
// Remove any pending singleton, and if it was set increment our count.
855+
if self.singleton.take().is_some() { self.singletons += 1; }
856+
OrdKeyBatch {
857+
updates: self.result.times.len() + self.singletons,
858+
storage: self.result,
859+
description,
860+
}
861+
}
862+
863+
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
864+
let mut builder = Self::with_capacity(0, 0, 0);
865+
for mut chunk in chain.drain(..) {
866+
builder.push(&mut chunk);
867+
}
868+
869+
builder.done(description)
870+
}
871+
}
872+
}

differential-dataflow/src/trace/implementations/ord_neu.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,8 @@ pub mod val_batch {
677677
}
678678
}
679679

680-
mod key_batch {
680+
/// Types related to forming batches of keys.
681+
pub mod key_batch {
681682

682683
use std::marker::PhantomData;
683684
use serde::{Deserialize, Serialize};

0 commit comments

Comments
 (0)