@@ -24,6 +24,7 @@ use arrow::array::*;
2424use arrow:: buffer:: OffsetBuffer ;
2525use arrow:: compute;
2626use arrow:: datatypes:: { DataType , Field , UInt64Type } ;
27+ use arrow:: row:: { RowConverter , SortField } ;
2728use arrow_buffer:: NullBuffer ;
2829
2930use datafusion_common:: cast:: {
@@ -577,58 +578,6 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
577578 )
578579}
579580
580- macro_rules! append {
581- ( $ARRAY: expr, $ELEMENT: expr, $ARRAY_TYPE: ident) => { {
582- let mut offsets: Vec <i32 > = vec![ 0 ] ;
583- let mut values =
584- downcast_arg!( new_empty_array( $ELEMENT. data_type( ) ) , $ARRAY_TYPE) . clone( ) ;
585-
586- let element = downcast_arg!( $ELEMENT, $ARRAY_TYPE) ;
587- for ( arr, el) in $ARRAY. iter( ) . zip( element. iter( ) ) {
588- let last_offset: i32 = offsets. last( ) . copied( ) . ok_or_else( || {
589- DataFusionError :: Internal ( format!( "offsets should not be empty" ) )
590- } ) ?;
591- match arr {
592- Some ( arr) => {
593- let child_array = downcast_arg!( arr, $ARRAY_TYPE) ;
594- values = downcast_arg!(
595- compute:: concat( & [
596- & values,
597- child_array,
598- & $ARRAY_TYPE:: from( vec![ el] )
599- ] ) ?
600- . clone( ) ,
601- $ARRAY_TYPE
602- )
603- . clone( ) ;
604- offsets. push( last_offset + child_array. len( ) as i32 + 1i32 ) ;
605- }
606- None => {
607- values = downcast_arg!(
608- compute:: concat( & [
609- & values,
610- & $ARRAY_TYPE:: from( vec![ el. clone( ) ] )
611- ] ) ?
612- . clone( ) ,
613- $ARRAY_TYPE
614- )
615- . clone( ) ;
616- offsets. push( last_offset + 1i32 ) ;
617- }
618- }
619- }
620-
621- let field = Arc :: new( Field :: new( "item" , $ELEMENT. data_type( ) . clone( ) , true ) ) ;
622-
623- Arc :: new( ListArray :: try_new(
624- field,
625- OffsetBuffer :: new( offsets. into( ) ) ,
626- Arc :: new( values) ,
627- None ,
628- ) ?)
629- } } ;
630- }
631-
632581/// Array_append SQL function
633582pub fn array_append ( args : & [ ArrayRef ] ) -> Result < ArrayRef > {
634583 let arr = as_list_array ( & args[ 0 ] ) ?;
@@ -638,71 +587,50 @@ pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
638587 let res = match arr. value_type ( ) {
639588 DataType :: List ( _) => concat_internal ( args) ?,
640589 DataType :: Null => return make_array ( & [ element. to_owned ( ) ] ) ,
641- data_type => {
642- macro_rules! array_function {
643- ( $ARRAY_TYPE: ident) => {
644- append!( arr, element, $ARRAY_TYPE)
590+ dt => {
591+ let converter = RowConverter :: new ( vec ! [ SortField :: new( dt. clone( ) ) ] ) ?;
592+ let r_rows = converter. convert_columns ( & [ element. clone ( ) ] ) ?;
593+ let mut offsets = vec ! [ 0 ] ;
594+ let mut new_arrays = vec ! [ ] ;
595+ for ( i, arr) in arr. iter ( ) . enumerate ( ) {
596+ let r_row = r_rows. row ( i) ;
597+ let rows = if let Some ( arr) = arr {
598+ let mut l_rows = converter. convert_columns ( & [ arr] ) ?;
599+ l_rows. push ( r_row) ;
600+ l_rows
601+ } else {
602+ let mut rows = converter. empty_rows ( 1 , 1 ) ;
603+ rows. push ( r_row) ;
604+ rows
645605 } ;
606+ let last_offset: i32 = match offsets. last ( ) . copied ( ) {
607+ Some ( offset) => offset,
608+ None => return internal_err ! ( "offsets should not be empty" ) ,
609+ } ;
610+ offsets. push ( last_offset + rows. num_rows ( ) as i32 ) ;
611+ let arrays = converter. convert_rows ( rows. iter ( ) ) ?;
612+ let array = match arrays. get ( 0 ) {
613+ Some ( array) => array. clone ( ) ,
614+ None => {
615+ return internal_err ! (
616+ "array_append: failed to get value from rows"
617+ )
618+ }
619+ } ;
620+ new_arrays. push ( array) ;
646621 }
647- call_array_function ! ( data_type, false )
622+ let field = Arc :: new ( Field :: new ( "item" , dt, true ) ) ;
623+ let offsets = OffsetBuffer :: new ( offsets. into ( ) ) ;
624+ let new_arrays_ref =
625+ new_arrays. iter ( ) . map ( |v| v. as_ref ( ) ) . collect :: < Vec < _ > > ( ) ;
626+ let values = compute:: concat ( & new_arrays_ref) ?;
627+ Arc :: new ( ListArray :: try_new ( field, offsets, values, None ) ?)
648628 }
649629 } ;
650630
651631 Ok ( res)
652632}
653633
654- macro_rules! prepend {
655- ( $ARRAY: expr, $ELEMENT: expr, $ARRAY_TYPE: ident) => { {
656- let mut offsets: Vec <i32 > = vec![ 0 ] ;
657- let mut values =
658- downcast_arg!( new_empty_array( $ELEMENT. data_type( ) ) , $ARRAY_TYPE) . clone( ) ;
659-
660- let element = downcast_arg!( $ELEMENT, $ARRAY_TYPE) ;
661- for ( arr, el) in $ARRAY. iter( ) . zip( element. iter( ) ) {
662- let last_offset: i32 = offsets. last( ) . copied( ) . ok_or_else( || {
663- DataFusionError :: Internal ( format!( "offsets should not be empty" ) )
664- } ) ?;
665- match arr {
666- Some ( arr) => {
667- let child_array = downcast_arg!( arr, $ARRAY_TYPE) ;
668- values = downcast_arg!(
669- compute:: concat( & [
670- & values,
671- & $ARRAY_TYPE:: from( vec![ el] ) ,
672- child_array
673- ] ) ?
674- . clone( ) ,
675- $ARRAY_TYPE
676- )
677- . clone( ) ;
678- offsets. push( last_offset + child_array. len( ) as i32 + 1i32 ) ;
679- }
680- None => {
681- values = downcast_arg!(
682- compute:: concat( & [
683- & values,
684- & $ARRAY_TYPE:: from( vec![ el. clone( ) ] )
685- ] ) ?
686- . clone( ) ,
687- $ARRAY_TYPE
688- )
689- . clone( ) ;
690- offsets. push( last_offset + 1i32 ) ;
691- }
692- }
693- }
694-
695- let field = Arc :: new( Field :: new( "item" , $ELEMENT. data_type( ) . clone( ) , true ) ) ;
696-
697- Arc :: new( ListArray :: try_new(
698- field,
699- OffsetBuffer :: new( offsets. into( ) ) ,
700- Arc :: new( values) ,
701- None ,
702- ) ?)
703- } } ;
704- }
705-
706634/// Array_prepend SQL function
707635pub fn array_prepend ( args : & [ ArrayRef ] ) -> Result < ArrayRef > {
708636 let element = & args[ 0 ] ;
@@ -712,13 +640,42 @@ pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
712640 let res = match arr. value_type ( ) {
713641 DataType :: List ( _) => concat_internal ( args) ?,
714642 DataType :: Null => return make_array ( & [ element. to_owned ( ) ] ) ,
715- data_type => {
716- macro_rules! array_function {
717- ( $ARRAY_TYPE: ident) => {
718- prepend!( arr, element, $ARRAY_TYPE)
643+ dt => {
644+ let converter = RowConverter :: new ( vec ! [ SortField :: new( dt. clone( ) ) ] ) ?;
645+ let r_rows = converter. convert_columns ( & [ element. clone ( ) ] ) ?;
646+ let mut offsets = vec ! [ 0 ] ;
647+ let mut new_arrays = vec ! [ ] ;
648+ for ( i, arr) in arr. iter ( ) . enumerate ( ) {
649+ let mut rows = converter. empty_rows ( 1 , 1 ) ;
650+ rows. push ( r_rows. row ( i) ) ;
651+ if let Some ( arr) = arr {
652+ let l_rows = converter. convert_columns ( & [ arr] ) ?;
653+ for row in l_rows. iter ( ) {
654+ rows. push ( row) ;
655+ }
656+ }
657+ let last_offset: i32 = match offsets. last ( ) . copied ( ) {
658+ Some ( offset) => offset,
659+ None => return internal_err ! ( "offsets should not be empty" ) ,
719660 } ;
661+ offsets. push ( last_offset + rows. num_rows ( ) as i32 ) ;
662+ let arrays = converter. convert_rows ( rows. iter ( ) ) ?;
663+ let array = match arrays. get ( 0 ) {
664+ Some ( array) => array. clone ( ) ,
665+ None => {
666+ return internal_err ! (
667+ "array_append: failed to get value from rows"
668+ )
669+ }
670+ } ;
671+ new_arrays. push ( array) ;
720672 }
721- call_array_function ! ( data_type, false )
673+ let field = Arc :: new ( Field :: new ( "item" , dt, true ) ) ;
674+ let offsets = OffsetBuffer :: new ( offsets. into ( ) ) ;
675+ let new_arrays_ref =
676+ new_arrays. iter ( ) . map ( |v| v. as_ref ( ) ) . collect :: < Vec < _ > > ( ) ;
677+ let values = compute:: concat ( & new_arrays_ref) ?;
678+ Arc :: new ( ListArray :: try_new ( field, offsets, values, None ) ?)
722679 }
723680 } ;
724681
0 commit comments