77 timely:: dataflow:: ProbeHandle ,
88} ;
99
10-
11- // use differential_dataflow::trace::implementations::ord_neu::ColKeyBuilder;
12- use differential_dataflow:: trace:: implementations:: ord_neu:: ColValSpine ;
10+ use differential_dataflow:: trace:: implementations:: ord_neu:: ColKeySpine ;
1311
1412use differential_dataflow:: operators:: arrange:: arrangement:: arrange_core;
1513
@@ -46,8 +44,8 @@ fn main() {
4644 let data_pact = ExchangeCore :: < ColumnBuilder < ( ( String , ( ) ) , u64 , i64 ) > , _ > :: new_core ( |x : & ( ( & str , ( ) ) , & u64 , & i64 ) | ( x. 0 ) . 0 . as_bytes ( ) . iter ( ) . map ( |x| * x as u64 ) . sum :: < u64 > ( ) as u64 ) ;
4745 let keys_pact = ExchangeCore :: < ColumnBuilder < ( ( String , ( ) ) , u64 , i64 ) > , _ > :: new_core ( |x : & ( ( & str , ( ) ) , & u64 , & i64 ) | ( x. 0 ) . 0 . as_bytes ( ) . iter ( ) . map ( |x| * x as u64 ) . sum :: < u64 > ( ) as u64 ) ;
4846
49- let data = arrange_core :: < _ , _ , Col2KeyBatcher < _ , _ , _ > , ColKeyBuilder < _ , _ , _ > , ColValSpine < _ , _ , _ , _ > > ( & data, data_pact, "Data" ) ;
50- let keys = arrange_core :: < _ , _ , Col2KeyBatcher < _ , _ , _ > , ColKeyBuilder < _ , _ , _ > , ColValSpine < _ , _ , _ , _ > > ( & keys, keys_pact, "Keys" ) ;
47+ let data = arrange_core :: < _ , _ , Col2KeyBatcher < _ , _ , _ > , ColKeyBuilder < _ , _ , _ > , ColKeySpine < _ , _ , _ > > ( & data, data_pact, "Data" ) ;
48+ let keys = arrange_core :: < _ , _ , Col2KeyBatcher < _ , _ , _ > , ColKeyBuilder < _ , _ , _ > , ColKeySpine < _ , _ , _ > > ( & keys, keys_pact, "Keys" ) ;
5149
5250 keys. join_core ( & data, |_k, & ( ) , & ( ) | Option :: < ( ) > :: None )
5351 . probe_with ( & mut probe) ;
@@ -374,7 +372,7 @@ pub mod batcher {
374372
375373 /// A batcher for columnar storage.
376374 pub type Col2ValBatcher < K , V , T , R > = MergeBatcher < Column < ( ( K , V ) , T , R ) > , Chunker < Column < ( ( K , V ) , T , R ) > > , merger:: ColumnMerger < ( K , V ) , T , R > > ;
377- pub type Col2KeyBatcher < K , T , R > = Col2ValBatcher < K , ( ) , T , R > ;
375+ pub type Col2KeyBatcher < K , T , R > = Col2ValBatcher < K , ( ) , T , R > ;
378376
379377 // First draft: build a "chunker" and a "merger".
380378
@@ -522,7 +520,7 @@ pub mod batcher {
522520 }
523521 }
524522
525- impl < D , T , R > MergerChunk for Column < ( D , T , R ) >
523+ impl < D , T , R > MergerChunk for Column < ( D , T , R ) >
526524 where
527525 D : Columnar + ' static ,
528526 T : timely:: PartialOrder + Clone + Columnar + ' static ,
@@ -575,23 +573,23 @@ pub mod dd_builder {
575573 use columnar:: Columnar ;
576574
577575 use timely:: container:: PushInto ;
578-
576+
579577 use differential_dataflow:: IntoOwned ;
580578 use differential_dataflow:: trace:: Builder ;
581579 use differential_dataflow:: trace:: Description ;
582580 use differential_dataflow:: trace:: implementations:: Layout ;
583581 use differential_dataflow:: trace:: implementations:: Update ;
584582 use differential_dataflow:: trace:: implementations:: BatchContainer ;
585- use differential_dataflow:: trace:: implementations:: ord_neu:: { OrdValBatch , val_batch:: OrdValStorage } ;
586-
583+ use differential_dataflow:: trace:: implementations:: ord_neu:: { OrdValBatch , val_batch:: OrdValStorage , OrdKeyBatch } ;
584+ use differential_dataflow :: trace :: implementations :: ord_neu :: key_batch :: OrdKeyStorage ;
587585 use crate :: Column ;
588586
589587
590588 use differential_dataflow:: trace:: rc_blanket_impls:: RcBuilder ;
591589 use differential_dataflow:: trace:: implementations:: TStack ;
592-
590+
593591 pub type ColValBuilder < K , V , T , R > = RcBuilder < OrdValBuilder < TStack < ( ( K , V ) , T , R ) > > > ;
594- pub type ColKeyBuilder < K , T , R > = RcBuilder < OrdValBuilder < TStack < ( ( K , ( ) ) , T , R ) > > > ;
592+ pub type ColKeyBuilder < K , T , R > = RcBuilder < OrdKeyBuilder < TStack < ( ( K , ( ) ) , T , R ) > > > ;
595593
596594 /// A builder for creating layers from unsorted update tuples.
597595 pub struct OrdValBuilder < L : Layout > {
@@ -659,7 +657,7 @@ pub mod dd_builder {
659657
660658 fn with_capacity ( keys : usize , vals : usize , upds : usize ) -> Self {
661659 // We don't introduce zero offsets as they will be introduced by the first `push` call.
662- Self {
660+ Self {
663661 result : OrdValStorage {
664662 keys : L :: KeyContainer :: with_capacity ( keys) ,
665663 keys_offs : L :: OffsetContainer :: with_capacity ( keys + 1 ) ,
@@ -736,8 +734,139 @@ pub mod dd_builder {
736734 for mut chunk in chain. drain ( ..) {
737735 builder. push ( & mut chunk) ;
738736 }
739-
737+
740738 builder. done ( description)
741739 }
742740 }
743- }
741+
742+ /// A builder for creating layers from unsorted update tuples.
743+ pub struct OrdKeyBuilder < L : Layout > {
744+ /// The in-progress result.
745+ ///
746+ /// This is public to allow container implementors to set and inspect their container.
747+ pub result : OrdKeyStorage < L > ,
748+ singleton : Option < ( <L :: Target as Update >:: Time , <L :: Target as Update >:: Diff ) > ,
749+ /// Counts the number of singleton optimizations we performed.
750+ ///
751+ /// This number allows us to correctly gauge the total number of updates reflected in a batch,
752+ /// even though `updates.len()` may be much shorter than this amount.
753+ singletons : usize ,
754+ }
755+
756+ impl < L : Layout > OrdKeyBuilder < L > {
757+ /// Pushes a single update, which may set `self.singleton` rather than push.
758+ ///
759+ /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
760+ /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
761+ /// to encode a singleton update with an "absert" update: repeating the most recent offset.
762+ /// This otherwise invalid state encodes "look back one element".
763+ ///
764+ /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
765+ /// previously pushed update exactly. In that case, we do not push the update into `updates`.
766+ /// The update tuple is retained in `self.singleton` in case we see another update and need
767+ /// to recover the singleton to push it into `updates` to join the second update.
768+ fn push_update ( & mut self , time : <L :: Target as Update >:: Time , diff : <L :: Target as Update >:: Diff ) {
769+ // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
770+ if self . result . times . last ( ) . map ( |t| t == <<L :: TimeContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & time) ) == Some ( true ) &&
771+ self . result . diffs . last ( ) . map ( |d| d == <<L :: DiffContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & diff) ) == Some ( true )
772+ {
773+ assert ! ( self . singleton. is_none( ) ) ;
774+ self . singleton = Some ( ( time, diff) ) ;
775+ }
776+ else {
777+ // If we have pushed a single element, we need to copy it out to meet this one.
778+ if let Some ( ( time, diff) ) = self . singleton . take ( ) {
779+ self . result . times . push ( time) ;
780+ self . result . diffs . push ( diff) ;
781+ }
782+ self . result . times . push ( time) ;
783+ self . result . diffs . push ( diff) ;
784+ }
785+ }
786+ }
787+
788+ // The layout `L` determines the key, val, time, and diff types.
789+ impl < L > Builder for OrdKeyBuilder < L >
790+ where
791+ L : Layout ,
792+ <L :: KeyContainer as BatchContainer >:: Owned : Columnar ,
793+ <L :: ValContainer as BatchContainer >:: Owned : Columnar ,
794+ <L :: TimeContainer as BatchContainer >:: Owned : Columnar ,
795+ <L :: DiffContainer as BatchContainer >:: Owned : Columnar ,
796+ // These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
797+ for < ' a > L :: KeyContainer : PushInto < & ' a <L :: KeyContainer as BatchContainer >:: Owned > ,
798+ for < ' a > L :: ValContainer : PushInto < & ' a <L :: ValContainer as BatchContainer >:: Owned > ,
799+ for < ' a > <L :: TimeContainer as BatchContainer >:: ReadItem < ' a > : IntoOwned < ' a , Owned = <L :: Target as Update >:: Time > ,
800+ for < ' a > <L :: DiffContainer as BatchContainer >:: ReadItem < ' a > : IntoOwned < ' a , Owned = <L :: Target as Update >:: Diff > ,
801+ {
802+ type Input = Column < ( ( <L :: KeyContainer as BatchContainer >:: Owned , <L :: ValContainer as BatchContainer >:: Owned ) , <L :: TimeContainer as BatchContainer >:: Owned , <L :: DiffContainer as BatchContainer >:: Owned ) > ;
803+ type Time = <L :: Target as Update >:: Time ;
804+ type Output = OrdKeyBatch < L > ;
805+
806+ fn with_capacity ( keys : usize , _vals : usize , upds : usize ) -> Self {
807+ // We don't introduce zero offsets as they will be introduced by the first `push` call.
808+ Self {
809+ result : OrdKeyStorage {
810+ keys : L :: KeyContainer :: with_capacity ( keys) ,
811+ keys_offs : L :: OffsetContainer :: with_capacity ( keys + 1 ) ,
812+ times : L :: TimeContainer :: with_capacity ( upds) ,
813+ diffs : L :: DiffContainer :: with_capacity ( upds) ,
814+ } ,
815+ singleton : None ,
816+ singletons : 0 ,
817+ }
818+ }
819+
820+ #[ inline]
821+ fn push ( & mut self , chunk : & mut Self :: Input ) {
822+ use timely:: Container ;
823+
824+ // NB: Maintaining owned key and val across iterations to track the "last", which we clone into,
825+ // is somewhat appealing from an ease point of view. Might still allocate, do work we don't need,
826+ // but avoids e.g. calls into `last()` and breaks horrid trait requirements.
827+ // Owned key and val would need to be members of `self`, as this method can be called multiple times,
828+ // and we need to correctly cache last for reasons of correctness, not just performance.
829+
830+ for ( ( key, _val) , time, diff) in chunk. drain ( ) {
831+ // It would be great to avoid.
832+ let key = <<L :: KeyContainer as BatchContainer >:: Owned as Columnar >:: into_owned ( key) ;
833+ // These feel fine (wrt the other versions)
834+ let time = <<L :: TimeContainer as BatchContainer >:: Owned as Columnar >:: into_owned ( time) ;
835+ let diff = <<L :: DiffContainer as BatchContainer >:: Owned as Columnar >:: into_owned ( diff) ;
836+
837+ // Perhaps this is a continuation of an already received key.
838+ if self . result . keys . last ( ) . map ( |k| <<L :: KeyContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & key) . eq ( & k) ) . unwrap_or ( false ) {
839+ self . push_update ( time, diff) ;
840+ } else {
841+ // New key; complete representation of prior key.
842+ self . result . keys_offs . push ( self . result . times . len ( ) ) ;
843+ if self . singleton . take ( ) . is_some ( ) { self . singletons += 1 ; }
844+ self . push_update ( time, diff) ;
845+ self . result . keys . push ( & key) ;
846+ }
847+ }
848+ }
849+
850+ #[ inline( never) ]
851+ fn done ( mut self , description : Description < Self :: Time > ) -> OrdKeyBatch < L > {
852+ // Record the final offsets
853+ self . result . keys_offs . push ( self . result . times . len ( ) ) ;
854+ // Remove any pending singleton, and if it was set increment our count.
855+ if self . singleton . take ( ) . is_some ( ) { self . singletons += 1 ; }
856+ OrdKeyBatch {
857+ updates : self . result . times . len ( ) + self . singletons ,
858+ storage : self . result ,
859+ description,
860+ }
861+ }
862+
863+ fn seal ( chain : & mut Vec < Self :: Input > , description : Description < Self :: Time > ) -> Self :: Output {
864+ let mut builder = Self :: with_capacity ( 0 , 0 , 0 ) ;
865+ for mut chunk in chain. drain ( ..) {
866+ builder. push ( & mut chunk) ;
867+ }
868+
869+ builder. done ( description)
870+ }
871+ }
872+ }
0 commit comments