1515// specific language governing permissions and limitations
1616// under the License.
1717
18- use arrow:: array:: BooleanBufferBuilder ;
1918use arrow:: array:: BufferBuilder ;
2019use arrow:: array:: GenericBinaryArray ;
2120use arrow:: array:: GenericStringArray ;
2221use arrow:: array:: OffsetSizeTrait ;
2322use arrow:: array:: PrimitiveArray ;
2423use arrow:: array:: { Array , ArrayRef , ArrowPrimitiveType , AsArray } ;
25- use arrow:: buffer:: NullBuffer ;
2624use arrow:: buffer:: OffsetBuffer ;
2725use arrow:: buffer:: ScalarBuffer ;
28- use arrow:: datatypes:: ArrowNativeType ;
2926use arrow:: datatypes:: ByteArrayType ;
3027use arrow:: datatypes:: DataType ;
3128use arrow:: datatypes:: GenericBinaryType ;
32- use arrow:: datatypes:: GenericStringType ;
3329use datafusion_common:: utils:: proxy:: VecAllocExt ;
3430
3531use crate :: aggregates:: group_values:: null_builder:: MaybeNullBufferBuilder ;
32+ use arrow_array:: types:: GenericStringType ;
3633use datafusion_physical_expr_common:: binary_map:: { OutputType , INITIAL_BUFFER_CAPACITY } ;
3734use std:: sync:: Arc ;
3835use std:: vec;
@@ -188,6 +185,11 @@ impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
188185 }
189186}
190187
188+ /// Stores a collection of binary or utf8 group values in a single buffer
189+ /// in a way that allows:
190+ ///
191+ /// 1. Efficient comparison of incoming rows to existing rows
192+ /// 2. Efficient construction of the final output array
191193pub struct ByteGroupValueBuilder < O >
192194where
193195 O : OffsetSizeTrait ,
@@ -199,8 +201,8 @@ where
199201 /// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values
200202 /// are stored as a zero length string.
201203 offsets : Vec < O > ,
202- /// Null indexes in offsets, if `i` is in nulls, `offsets[i]` should be equals to `offsets[i+1]`
203- nulls : Vec < usize > ,
204+ /// Nulls
205+ nulls : MaybeNullBufferBuilder ,
204206}
205207
206208impl < O > ByteGroupValueBuilder < O >
@@ -212,7 +214,7 @@ where
212214 output_type,
213215 buffer : BufferBuilder :: new ( INITIAL_BUFFER_CAPACITY ) ,
214216 offsets : vec ! [ O :: default ( ) ] ,
215- nulls : vec ! [ ] ,
217+ nulls : MaybeNullBufferBuilder :: new ( ) ,
216218 }
217219 }
218220
@@ -222,40 +224,33 @@ where
222224 {
223225 let arr = array. as_bytes :: < B > ( ) ;
224226 if arr. is_null ( row) {
225- self . nulls . push ( self . len ( ) ) ;
227+ self . nulls . append ( true ) ;
226228 // nulls need a zero length in the offset buffer
227229 let offset = self . buffer . len ( ) ;
228-
229230 self . offsets . push ( O :: usize_as ( offset) ) ;
230- return ;
231+ } else {
232+ self . nulls . append ( false ) ;
233+ let value: & [ u8 ] = arr. value ( row) . as_ref ( ) ;
234+ self . buffer . append_slice ( value) ;
235+ self . offsets . push ( O :: usize_as ( self . buffer . len ( ) ) ) ;
231236 }
232-
233- let value: & [ u8 ] = arr. value ( row) . as_ref ( ) ;
234- self . buffer . append_slice ( value) ;
235- self . offsets . push ( O :: usize_as ( self . buffer . len ( ) ) ) ;
236237 }
237238
238239 fn equal_to_inner < B > ( & self , lhs_row : usize , array : & ArrayRef , rhs_row : usize ) -> bool
239240 where
240241 B : ByteArrayType ,
241242 {
242- // Handle nulls
243- let is_lhs_null = self . nulls . iter ( ) . any ( |null_idx| * null_idx == lhs_row) ;
244243 let arr = array. as_bytes :: < B > ( ) ;
245- if is_lhs_null {
246- return arr. is_null ( rhs_row) ;
247- } else if arr. is_null ( rhs_row) {
248- return false ;
249- }
244+ self . nulls . is_null ( lhs_row) == arr. is_null ( rhs_row)
245+ && self . value ( lhs_row) == ( arr. value ( rhs_row) . as_ref ( ) as & [ u8 ] )
246+ }
250247
251- let arr = array. as_bytes :: < B > ( ) ;
252- let rhs_elem: & [ u8 ] = arr. value ( rhs_row) . as_ref ( ) ;
253- let rhs_elem_len = arr. value_length ( rhs_row) . as_usize ( ) ;
254- debug_assert_eq ! ( rhs_elem_len, rhs_elem. len( ) ) ;
255- let l = self . offsets [ lhs_row] . as_usize ( ) ;
256- let r = self . offsets [ lhs_row + 1 ] . as_usize ( ) ;
257- let existing_elem = unsafe { self . buffer . as_slice ( ) . get_unchecked ( l..r) } ;
258- rhs_elem == existing_elem
248+ /// return the current value of the specified row irrespective of null
249+ pub fn value ( & self , row : usize ) -> & [ u8 ] {
250+ let l = self . offsets [ row] . as_usize ( ) ;
251+ let r = self . offsets [ row + 1 ] . as_usize ( ) ;
252+ // Safety: the offsets are constructed correctly and never decrease
253+ unsafe { self . buffer . as_slice ( ) . get_unchecked ( l..r) }
259254 }
260255}
261256
@@ -323,18 +318,7 @@ where
323318 nulls,
324319 } = * self ;
325320
326- let null_buffer = if nulls. is_empty ( ) {
327- None
328- } else {
329- // Only make a `NullBuffer` if there was a null value
330- let num_values = offsets. len ( ) - 1 ;
331- let mut bool_builder = BooleanBufferBuilder :: new ( num_values) ;
332- bool_builder. append_n ( num_values, true ) ;
333- nulls. into_iter ( ) . for_each ( |null_index| {
334- bool_builder. set_bit ( null_index, false ) ;
335- } ) ;
336- Some ( NullBuffer :: from ( bool_builder. finish ( ) ) )
337- } ;
321+ let null_buffer = nulls. build ( ) ;
338322
339323 // SAFETY: the offsets were constructed correctly in `insert_if_new` --
340324 // monotonically increasing, overflows were checked.
@@ -351,9 +335,9 @@ where
351335 // SAFETY:
352336 // 1. the offsets were constructed safely
353337 //
354- // 2. we asserted the input arrays were all the correct type and
355- // thus since all the values that went in were valid (e.g. utf8)
356- // so are all the values that come out
338+ // 2. the input arrays were all the correct type and thus since
339+ // all the values that went in were valid (e.g. utf8) so are all
340+ // the values that come out
357341 Arc :: new ( unsafe {
358342 GenericStringArray :: new_unchecked ( offsets, values, null_buffer)
359343 } )
@@ -364,27 +348,7 @@ where
364348
365349 fn take_n ( & mut self , n : usize ) -> ArrayRef {
366350 debug_assert ! ( self . len( ) >= n) ;
367-
368- let null_buffer = if self . nulls . is_empty ( ) {
369- None
370- } else {
371- // Only make a `NullBuffer` if there was a null value
372- let mut bool_builder = BooleanBufferBuilder :: new ( n) ;
373- bool_builder. append_n ( n, true ) ;
374-
375- let mut new_nulls = vec ! [ ] ;
376- self . nulls . iter ( ) . for_each ( |null_index| {
377- if * null_index < n {
378- bool_builder. set_bit ( * null_index, false ) ;
379- } else {
380- new_nulls. push ( null_index - n) ;
381- }
382- } ) ;
383-
384- self . nulls = new_nulls;
385- Some ( NullBuffer :: from ( bool_builder. finish ( ) ) )
386- } ;
387-
351+ let null_buffer = self . nulls . take_n ( n) ;
388352 let first_remaining_offset = O :: as_usize ( self . offsets [ n] ) ;
389353
390354 // Given offests like [0, 2, 4, 5] and n = 1, we expect to get
0 commit comments