7575use std:: collections:: { BinaryHeap , HashMap , VecDeque } ;
7676use std:: cmp:: Reverse ;
7777
78+ use columnar:: { Columnar , Len , Index } ;
79+ use columnar:: ColumnVec ;
80+
7881use crate :: progress:: Timestamp ;
7982use crate :: progress:: { Source , Target } ;
8083use crate :: progress:: ChangeBatch ;
@@ -84,6 +87,62 @@ use crate::progress::frontier::{Antichain, MutableAntichain};
8487use crate :: progress:: timestamp:: PathSummary ;
8588
8689
90+ use vec_antichain:: VecAntichain ;
91+
92+ /// A stand-in for `Vec<Antichain<T>>`.
93+ mod vec_antichain {
94+
95+ use columnar:: { Columnar , Len , Index , IndexMut } ;
96+ use columnar:: { ColumnVec , Slice } ;
97+
98+ use crate :: progress:: Antichain ;
99+
100+ #[ derive( Clone , Debug ) ]
101+ pub struct VecAntichain < T > ( ColumnVec < T > ) ;
102+
103+ impl < TC : Default > Default for VecAntichain < TC > {
104+ fn default ( ) -> Self {
105+ Self ( Default :: default ( ) )
106+ }
107+ }
108+
109+ impl < TC > Len for VecAntichain < TC > {
110+ #[ inline( always) ] fn len ( & self ) -> usize { self . 0 . len ( ) }
111+ }
112+
113+ impl < TC > Index for VecAntichain < TC > {
114+ type Index < ' a > = Slice < & ' a TC > where TC : ' a ;
115+
116+ #[ inline( always) ]
117+ fn index ( & self , index : usize ) -> Self :: Index < ' _ > {
118+ self . 0 . index ( index)
119+ }
120+ }
121+ impl < TC > IndexMut for VecAntichain < TC > {
122+ type IndexMut < ' a > = Slice < & ' a mut TC > where TC : ' a ;
123+
124+ #[ inline( always) ]
125+ fn index_mut ( & mut self , index : usize ) -> Self :: IndexMut < ' _ > {
126+ self . 0 . index_mut ( index)
127+ }
128+ }
129+
130+ impl < T , TC : Columnar < T > > Columnar < Antichain < T > > for VecAntichain < TC > {
131+ #[ inline( always) ]
132+ fn copy ( & mut self , item : & Antichain < T > ) {
133+ self . 0 . copy ( item. elements ( ) ) ;
134+ }
135+ fn clear ( & mut self ) {
136+ unimplemented ! ( )
137+ }
138+ fn heap_size ( & self ) -> ( usize , usize ) {
139+ unimplemented ! ( )
140+ }
141+ }
142+ }
143+
144+
145+
87146/// A topology builder, which can summarize reachability along paths.
88147///
89148/// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles
@@ -132,43 +191,42 @@ pub struct Builder<T: Timestamp> {
132191 /// Indexed by operator index, then input port, then output port. This is the
133192 /// same format returned by `get_internal_summary`, as if we simply appended
134193 /// all of the summaries for the hosted nodes.
135- pub nodes : Vec < Vec < Vec < Antichain < T :: Summary > > > > ,
194+ nodes : ColumnVec < ColumnVec < VecAntichain < Vec < T :: Summary > > > > ,
136195 /// Direct connections from sources to targets.
137196 ///
138197 /// Edges do not affect timestamps, so we only need to know the connectivity.
139198 /// Indexed by operator index then output port.
140- pub edges : Vec < Vec < Vec < Target > > > ,
199+ edges : Vec < Vec < Vec < Target > > > ,
141200 /// Numbers of inputs and outputs for each node.
142- pub shape : Vec < ( usize , usize ) > ,
201+ shape : Vec < ( usize , usize ) > ,
143202}
144203
145204impl < T : Timestamp > Builder < T > {
146205
147206 /// Create a new empty topology builder.
148207 pub fn new ( ) -> Self {
149208 Builder {
150- nodes : Vec :: new ( ) ,
209+ nodes : Default :: default ( ) ,
151210 edges : Vec :: new ( ) ,
152211 shape : Vec :: new ( ) ,
153212 }
154213 }
155214
156215 /// Add links internal to operators.
157216 ///
158- /// This method overwrites any existing summary, instead of anything more sophisticated .
217+ /// Nodes must be added in strictly increasing order of `index` .
159218 pub fn add_node ( & mut self , index : usize , inputs : usize , outputs : usize , summary : Vec < Vec < Antichain < T :: Summary > > > ) {
160219
161220 // Assert that all summaries exist.
162221 debug_assert_eq ! ( inputs, summary. len( ) ) ;
163222 for x in summary. iter ( ) { debug_assert_eq ! ( outputs, x. len( ) ) ; }
164223
165- while self . nodes . len ( ) <= index {
166- self . nodes . push ( Vec :: new ( ) ) ;
167- self . edges . push ( Vec :: new ( ) ) ;
168- self . shape . push ( ( 0 , 0 ) ) ;
169- }
224+ assert_eq ! ( self . nodes. len( ) , index) ;
225+
226+ self . nodes . push ( summary ) ;
227+ self . edges . push ( Vec :: new ( ) ) ;
228+ self . shape . push ( ( 0 , 0 ) ) ;
170229
171- self . nodes [ index] = summary;
172230 if self . edges [ index] . len ( ) != outputs {
173231 self . edges [ index] = vec ! [ Vec :: new( ) ; outputs] ;
174232 }
@@ -287,7 +345,7 @@ impl<T: Timestamp> Builder<T> {
287345 in_degree. entry ( target) . or_insert ( 0 ) ;
288346 for ( output, summaries) in outputs. iter ( ) . enumerate ( ) {
289347 let source = Location :: new_source ( index, output) ;
290- for summary in summaries. elements ( ) . iter ( ) {
348+ for summary in summaries. iter ( ) {
291349 if summary == & Default :: default ( ) {
292350 * in_degree. entry ( source) . or_insert ( 0 ) += 1 ;
293351 }
@@ -322,9 +380,9 @@ impl<T: Timestamp> Builder<T> {
322380 }
323381 } ,
324382 Port :: Target ( port) => {
325- for ( output, summaries) in self . nodes [ node] [ port] . iter ( ) . enumerate ( ) {
383+ for ( output, summaries) in self . nodes . index ( node) . index ( port) . iter ( ) . enumerate ( ) {
326384 let source = Location :: new_source ( node, output) ;
327- for summary in summaries. elements ( ) . iter ( ) {
385+ for summary in summaries. iter ( ) {
328386 if summary == & Default :: default ( ) {
329387 * in_degree. get_mut ( & source) . unwrap ( ) -= 1 ;
330388 if in_degree[ & source] == 0 {
@@ -361,12 +419,12 @@ pub struct Tracker<T:Timestamp> {
361419 /// Indexed by operator index, then input port, then output port. This is the
362420 /// same format returned by `get_internal_summary`, as if we simply appended
363421 /// all of the summaries for the hosted nodes.
364- nodes : Vec < Vec < Vec < Antichain < T :: Summary > > > > ,
422+ nodes : ColumnVec < ColumnVec < VecAntichain < Vec < T :: Summary > > > > ,
365423 /// Direct connections from sources to targets.
366424 ///
367425 /// Edges do not affect timestamps, so we only need to know the connectivity.
368426 /// Indexed by operator index then output port.
369- edges : Vec < Vec < Vec < Target > > > ,
427+ edges : ColumnVec < ColumnVec < Vec < Target > > > ,
370428
371429 // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
372430 // It seems we should be able to flatten most of these so that there are a few allocations
@@ -544,10 +602,15 @@ impl<T:Timestamp> Tracker<T> {
544602 let scope_outputs = builder. shape [ 0 ] . 0 ;
545603 let output_changes = vec ! [ ChangeBatch :: new( ) ; scope_outputs] ;
546604
605+ let mut edges: ColumnVec < ColumnVec < Vec < Target > > > = Default :: default ( ) ;
606+ for edge in builder. edges {
607+ edges. push ( edge) ;
608+ }
609+
547610 let tracker =
548611 Tracker {
549612 nodes : builder. nodes ,
550- edges : builder . edges ,
613+ edges,
551614 per_operator,
552615 target_changes : ChangeBatch :: new ( ) ,
553616 source_changes : ChangeBatch :: new ( ) ,
@@ -663,10 +726,10 @@ impl<T:Timestamp> Tracker<T> {
663726 . update_iter ( Some ( ( time, diff) ) ) ;
664727
665728 for ( time, diff) in changes {
666- let nodes = & self . nodes [ location. node ] [ port_index] ;
729+ let nodes = & self . nodes . index ( location. node ) . index ( port_index) ;
667730 for ( output_port, summaries) in nodes. iter ( ) . enumerate ( ) {
668731 let source = Location { node : location. node , port : Port :: Source ( output_port) } ;
669- for summary in summaries. elements ( ) . iter ( ) {
732+ for summary in summaries. iter ( ) {
670733 if let Some ( new_time) = summary. results_in ( & time) {
671734 self . worklist . push ( Reverse ( ( new_time, source, diff) ) ) ;
672735 }
@@ -686,7 +749,7 @@ impl<T:Timestamp> Tracker<T> {
686749 . update_iter ( Some ( ( time, diff) ) ) ;
687750
688751 for ( time, diff) in changes {
689- for new_target in self . edges [ location. node ] [ port_index] . iter ( ) {
752+ for new_target in self . edges . index ( location. node ) . index ( port_index) . iter ( ) {
690753 self . worklist . push ( Reverse ( (
691754 time. clone ( ) ,
692755 Location :: from ( * new_target) ,
@@ -738,7 +801,7 @@ impl<T:Timestamp> Tracker<T> {
738801/// Graph locations may be missing from the output, in which case they have no
739802/// paths to scope outputs.
740803fn summarize_outputs < T : Timestamp > (
741- nodes : & Vec < Vec < Vec < Antichain < T :: Summary > > > > ,
804+ nodes : & ColumnVec < ColumnVec < VecAntichain < Vec < T :: Summary > > > > ,
742805 edges : & Vec < Vec < Vec < Target > > > ,
743806 ) -> HashMap < Location , Vec < Antichain < T :: Summary > > >
744807{
@@ -780,7 +843,7 @@ fn summarize_outputs<T: Timestamp>(
780843 Port :: Source ( output_port) => {
781844
782845 // Consider each input port of the associated operator.
783- for ( input_port, summaries) in nodes[ location. node ] . iter ( ) . enumerate ( ) {
846+ for ( input_port, summaries) in nodes. index ( location. node ) . iter ( ) . enumerate ( ) {
784847
785848 // Determine the current path summaries from the input port.
786849 let location = Location { node : location. node , port : Port :: Target ( input_port) } ;
@@ -792,7 +855,7 @@ fn summarize_outputs<T: Timestamp>(
792855 while antichains. len ( ) <= output { antichains. push ( Antichain :: new ( ) ) ; }
793856
794857 // Combine each operator-internal summary to the output with `summary`.
795- for operator_summary in summaries[ output_port ] . elements ( ) . iter ( ) {
858+ for operator_summary in summaries. index ( output_port ) . iter ( ) {
796859 if let Some ( combined) = operator_summary. followed_by ( & summary) {
797860 if antichains[ output] . insert ( combined. clone ( ) ) {
798861 worklist. push_back ( ( location, output, combined) ) ;
0 commit comments