@@ -752,7 +752,7 @@ where
752752
753753 fn size ( & self ) -> usize {
754754 self . vals . capacity ( ) * size_of :: < T :: Native > ( )
755- + self . null_builder . capacity ( ) / 8 // capacity is in bits, so convert to bytes
755+ + self . null_builder . capacity ( ) / 8 // capacity is in bits, so convert to bytes
756756 + self . is_sets . capacity ( ) / 8
757757 + self . size_of_orderings
758758 + self . min_of_each_group_buf . 0 . capacity ( ) * size_of :: < usize > ( )
@@ -827,9 +827,13 @@ impl FirstValueAccumulator {
827827 }
828828
829829 // Updates state with the values in the given row.
830- fn update_with_new_row ( & mut self , row : & [ ScalarValue ] ) {
831- self . first = row[ 0 ] . clone ( ) ;
832- self . orderings = row[ 1 ..] . to_vec ( ) ;
830+ fn update_with_new_row ( & mut self , mut row : Vec < ScalarValue > ) {
831+ row. iter_mut ( ) . for_each ( |s| {
832+ s. compact ( ) ;
833+ } ) ;
834+
835+ self . first = row. remove ( 0 ) ;
836+ self . orderings = row;
833837 self . is_set = true ;
834838 }
835839
@@ -888,7 +892,7 @@ impl Accumulator for FirstValueAccumulator {
888892 if !self . is_set {
889893 if let Some ( first_idx) = self . get_first_idx ( values) ? {
890894 let row = get_row_at_idx ( values, first_idx) ?;
891- self . update_with_new_row ( & row) ;
895+ self . update_with_new_row ( row) ;
892896 }
893897 } else if !self . requirement_satisfied {
894898 if let Some ( first_idx) = self . get_first_idx ( values) ? {
@@ -901,7 +905,7 @@ impl Accumulator for FirstValueAccumulator {
901905 ) ?
902906 . is_gt ( )
903907 {
904- self . update_with_new_row ( & row) ;
908+ self . update_with_new_row ( row) ;
905909 }
906910 }
907911 }
@@ -925,7 +929,7 @@ impl Accumulator for FirstValueAccumulator {
925929 let min = ( 0 ..filtered_states[ 0 ] . len ( ) ) . min_by ( |& a, & b| comparator. compare ( a, b) ) ;
926930
927931 if let Some ( first_idx) = min {
928- let first_row = get_row_at_idx ( & filtered_states, first_idx) ?;
932+ let mut first_row = get_row_at_idx ( & filtered_states, first_idx) ?;
929933 // When collecting orderings, we exclude the is_set flag from the state.
930934 let first_ordering = & first_row[ 1 ..is_set_idx] ;
931935 let sort_options = get_sort_options ( self . ordering_req . as_ref ( ) ) ;
@@ -936,7 +940,9 @@ impl Accumulator for FirstValueAccumulator {
936940 // Update with first value in the state. Note that we should exclude the
937941 // is_set flag from the state. Otherwise, we will end up with a state
938942 // containing two is_set flags.
939- self . update_with_new_row ( & first_row[ 0 ..is_set_idx] ) ;
943+ assert ! ( is_set_idx <= first_row. len( ) ) ;
944+ first_row. resize ( is_set_idx, ScalarValue :: Null ) ;
945+ self . update_with_new_row ( first_row) ;
940946 }
941947 }
942948 Ok ( ( ) )
@@ -1226,9 +1232,13 @@ impl LastValueAccumulator {
12261232 }
12271233
12281234 // Updates state with the values in the given row.
1229- fn update_with_new_row ( & mut self , row : & [ ScalarValue ] ) {
1230- self . last = row[ 0 ] . clone ( ) ;
1231- self . orderings = row[ 1 ..] . to_vec ( ) ;
1235+ fn update_with_new_row ( & mut self , mut row : Vec < ScalarValue > ) {
1236+ row. iter_mut ( ) . for_each ( |s| {
1237+ s. compact ( ) ;
1238+ } ) ;
1239+
1240+ self . last = row. remove ( 0 ) ;
1241+ self . orderings = row;
12321242 self . is_set = true ;
12331243 }
12341244
@@ -1289,7 +1299,7 @@ impl Accumulator for LastValueAccumulator {
12891299 if !self . is_set || self . requirement_satisfied {
12901300 if let Some ( last_idx) = self . get_last_idx ( values) ? {
12911301 let row = get_row_at_idx ( values, last_idx) ?;
1292- self . update_with_new_row ( & row) ;
1302+ self . update_with_new_row ( row) ;
12931303 }
12941304 } else if let Some ( last_idx) = self . get_last_idx ( values) ? {
12951305 let row = get_row_at_idx ( values, last_idx) ?;
@@ -1302,7 +1312,7 @@ impl Accumulator for LastValueAccumulator {
13021312 ) ?
13031313 . is_lt ( )
13041314 {
1305- self . update_with_new_row ( & row) ;
1315+ self . update_with_new_row ( row) ;
13061316 }
13071317 }
13081318
@@ -1326,7 +1336,7 @@ impl Accumulator for LastValueAccumulator {
13261336 let max = ( 0 ..filtered_states[ 0 ] . len ( ) ) . max_by ( |& a, & b| comparator. compare ( a, b) ) ;
13271337
13281338 if let Some ( last_idx) = max {
1329- let last_row = get_row_at_idx ( & filtered_states, last_idx) ?;
1339+ let mut last_row = get_row_at_idx ( & filtered_states, last_idx) ?;
13301340 // When collecting orderings, we exclude the is_set flag from the state.
13311341 let last_ordering = & last_row[ 1 ..is_set_idx] ;
13321342 let sort_options = get_sort_options ( self . ordering_req . as_ref ( ) ) ;
@@ -1339,7 +1349,9 @@ impl Accumulator for LastValueAccumulator {
13391349 // Update with last value in the state. Note that we should exclude the
13401350 // is_set flag from the state. Otherwise, we will end up with a state
13411351 // containing two is_set flags.
1342- self . update_with_new_row ( & last_row[ 0 ..is_set_idx] ) ;
1352+ assert ! ( is_set_idx <= last_row. len( ) ) ;
1353+ last_row. resize ( is_set_idx, ScalarValue :: Null ) ;
1354+ self . update_with_new_row ( last_row) ;
13431355 }
13441356 }
13451357 Ok ( ( ) )
@@ -1382,7 +1394,13 @@ fn convert_to_sort_cols(arrs: &[ArrayRef], sort_exprs: &LexOrdering) -> Vec<Sort
13821394
13831395#[ cfg( test) ]
13841396mod tests {
1385- use arrow:: { array:: Int64Array , compute:: SortOptions , datatypes:: Schema } ;
1397+ use std:: iter:: repeat_with;
1398+
1399+ use arrow:: {
1400+ array:: { Int64Array , ListArray } ,
1401+ compute:: SortOptions ,
1402+ datatypes:: Schema ,
1403+ } ;
13861404 use datafusion_physical_expr:: { expressions:: col, PhysicalSortExpr } ;
13871405
13881406 use super :: * ;
@@ -1772,4 +1790,60 @@ mod tests {
17721790
17731791 Ok ( ( ) )
17741792 }
1793+
1794+ #[ test]
1795+ fn test_first_list_acc_size ( ) -> Result < ( ) > {
1796+ fn size_after_batch ( values : & [ ArrayRef ] ) -> Result < usize > {
1797+ let mut first_accumulator = FirstValueAccumulator :: try_new (
1798+ & DataType :: List ( Arc :: new ( Field :: new_list_field ( DataType :: Int64 , false ) ) ) ,
1799+ & [ ] ,
1800+ LexOrdering :: default ( ) ,
1801+ false ,
1802+ ) ?;
1803+
1804+ first_accumulator. update_batch ( values) ?;
1805+
1806+ Ok ( first_accumulator. size ( ) )
1807+ }
1808+
1809+ let batch1 = ListArray :: from_iter_primitive :: < Int32Type , _ , _ > (
1810+ repeat_with ( || Some ( vec ! [ Some ( 1 ) ] ) ) . take ( 10000 ) ,
1811+ ) ;
1812+ let batch2 =
1813+ ListArray :: from_iter_primitive :: < Int32Type , _ , _ > ( [ Some ( vec ! [ Some ( 1 ) ] ) ] ) ;
1814+
1815+ let size1 = size_after_batch ( & [ Arc :: new ( batch1) ] ) ?;
1816+ let size2 = size_after_batch ( & [ Arc :: new ( batch2) ] ) ?;
1817+ assert_eq ! ( size1, size2) ;
1818+
1819+ Ok ( ( ) )
1820+ }
1821+
1822+ #[ test]
1823+ fn test_last_list_acc_size ( ) -> Result < ( ) > {
1824+ fn size_after_batch ( values : & [ ArrayRef ] ) -> Result < usize > {
1825+ let mut last_accumulator = FirstValueAccumulator :: try_new (
1826+ & DataType :: List ( Arc :: new ( Field :: new_list_field ( DataType :: Int64 , false ) ) ) ,
1827+ & [ ] ,
1828+ LexOrdering :: default ( ) ,
1829+ false ,
1830+ ) ?;
1831+
1832+ last_accumulator. update_batch ( values) ?;
1833+
1834+ Ok ( last_accumulator. size ( ) )
1835+ }
1836+
1837+ let batch1 = ListArray :: from_iter_primitive :: < Int32Type , _ , _ > (
1838+ repeat_with ( || Some ( vec ! [ Some ( 1 ) ] ) ) . take ( 10000 ) ,
1839+ ) ;
1840+ let batch2 =
1841+ ListArray :: from_iter_primitive :: < Int32Type , _ , _ > ( [ Some ( vec ! [ Some ( 1 ) ] ) ] ) ;
1842+
1843+ let size1 = size_after_batch ( & [ Arc :: new ( batch1) ] ) ?;
1844+ let size2 = size_after_batch ( & [ Arc :: new ( batch2) ] ) ?;
1845+ assert_eq ! ( size1, size2) ;
1846+
1847+ Ok ( ( ) )
1848+ }
17751849}
0 commit comments