@@ -193,3 +193,149 @@ pub fn merge_ordered_arrays(
193193
194194 Ok ( ( merged_values, merged_orderings) )
195195}
196+
197+ #[ cfg( test) ]
198+ mod tests {
199+ use super :: * ;
200+
201+ use std:: collections:: VecDeque ;
202+ use std:: sync:: Arc ;
203+
204+ use arrow:: array:: { ArrayRef , Int64Array } ;
205+
206+ use datafusion_common:: utils:: get_row_at_idx;
207+ use datafusion_common:: { Result , ScalarValue } ;
208+
209+ #[ test]
210+ fn test_merge_asc ( ) -> Result < ( ) > {
211+ let lhs_arrays: Vec < ArrayRef > = vec ! [
212+ Arc :: new( Int64Array :: from( vec![ 0 , 0 , 1 , 1 , 2 ] ) ) ,
213+ Arc :: new( Int64Array :: from( vec![ 0 , 1 , 2 , 3 , 4 ] ) ) ,
214+ ] ;
215+ let n_row = lhs_arrays[ 0 ] . len ( ) ;
216+ let lhs_orderings = ( 0 ..n_row)
217+ . map ( |idx| get_row_at_idx ( & lhs_arrays, idx) )
218+ . collect :: < Result < VecDeque < _ > > > ( ) ?;
219+
220+ let rhs_arrays: Vec < ArrayRef > = vec ! [
221+ Arc :: new( Int64Array :: from( vec![ 0 , 0 , 1 , 1 , 2 ] ) ) ,
222+ Arc :: new( Int64Array :: from( vec![ 0 , 1 , 2 , 3 , 4 ] ) ) ,
223+ ] ;
224+ let n_row = rhs_arrays[ 0 ] . len ( ) ;
225+ let rhs_orderings = ( 0 ..n_row)
226+ . map ( |idx| get_row_at_idx ( & rhs_arrays, idx) )
227+ . collect :: < Result < VecDeque < _ > > > ( ) ?;
228+ let sort_options = vec ! [
229+ SortOptions {
230+ descending: false ,
231+ nulls_first: false ,
232+ } ,
233+ SortOptions {
234+ descending: false ,
235+ nulls_first: false ,
236+ } ,
237+ ] ;
238+
239+ let lhs_vals_arr = Arc :: new ( Int64Array :: from ( vec ! [ 0 , 1 , 2 , 3 , 4 ] ) ) as ArrayRef ;
240+ let lhs_vals = ( 0 ..lhs_vals_arr. len ( ) )
241+ . map ( |idx| ScalarValue :: try_from_array ( & lhs_vals_arr, idx) )
242+ . collect :: < Result < VecDeque < _ > > > ( ) ?;
243+
244+ let rhs_vals_arr = Arc :: new ( Int64Array :: from ( vec ! [ 0 , 1 , 2 , 3 , 4 ] ) ) as ArrayRef ;
245+ let rhs_vals = ( 0 ..rhs_vals_arr. len ( ) )
246+ . map ( |idx| ScalarValue :: try_from_array ( & rhs_vals_arr, idx) )
247+ . collect :: < Result < VecDeque < _ > > > ( ) ?;
248+ let expected =
249+ Arc :: new ( Int64Array :: from ( vec ! [ 0 , 0 , 1 , 1 , 2 , 2 , 3 , 3 , 4 , 4 ] ) ) as ArrayRef ;
250+ let expected_ts = vec ! [
251+ Arc :: new( Int64Array :: from( vec![ 0 , 0 , 0 , 0 , 1 , 1 , 1 , 1 , 2 , 2 ] ) ) as ArrayRef ,
252+ Arc :: new( Int64Array :: from( vec![ 0 , 0 , 1 , 1 , 2 , 2 , 3 , 3 , 4 , 4 ] ) ) as ArrayRef ,
253+ ] ;
254+
255+ let ( merged_vals, merged_ts) = merge_ordered_arrays (
256+ & mut [ lhs_vals, rhs_vals] ,
257+ & mut [ lhs_orderings, rhs_orderings] ,
258+ & sort_options,
259+ ) ?;
260+ let merged_vals = ScalarValue :: iter_to_array ( merged_vals. into_iter ( ) ) ?;
261+ let merged_ts = ( 0 ..merged_ts[ 0 ] . len ( ) )
262+ . map ( |col_idx| {
263+ ScalarValue :: iter_to_array (
264+ ( 0 ..merged_ts. len ( ) )
265+ . map ( |row_idx| merged_ts[ row_idx] [ col_idx] . clone ( ) ) ,
266+ )
267+ } )
268+ . collect :: < Result < Vec < _ > > > ( ) ?;
269+
270+ assert_eq ! ( & merged_vals, & expected) ;
271+ assert_eq ! ( & merged_ts, & expected_ts) ;
272+
273+ Ok ( ( ) )
274+ }
275+
276+ #[ test]
277+ fn test_merge_desc ( ) -> Result < ( ) > {
278+ let lhs_arrays: Vec < ArrayRef > = vec ! [
279+ Arc :: new( Int64Array :: from( vec![ 2 , 1 , 1 , 0 , 0 ] ) ) ,
280+ Arc :: new( Int64Array :: from( vec![ 4 , 3 , 2 , 1 , 0 ] ) ) ,
281+ ] ;
282+ let n_row = lhs_arrays[ 0 ] . len ( ) ;
283+ let lhs_orderings = ( 0 ..n_row)
284+ . map ( |idx| get_row_at_idx ( & lhs_arrays, idx) )
285+ . collect :: < Result < VecDeque < _ > > > ( ) ?;
286+
287+ let rhs_arrays: Vec < ArrayRef > = vec ! [
288+ Arc :: new( Int64Array :: from( vec![ 2 , 1 , 1 , 0 , 0 ] ) ) ,
289+ Arc :: new( Int64Array :: from( vec![ 4 , 3 , 2 , 1 , 0 ] ) ) ,
290+ ] ;
291+ let n_row = rhs_arrays[ 0 ] . len ( ) ;
292+ let rhs_orderings = ( 0 ..n_row)
293+ . map ( |idx| get_row_at_idx ( & rhs_arrays, idx) )
294+ . collect :: < Result < VecDeque < _ > > > ( ) ?;
295+ let sort_options = vec ! [
296+ SortOptions {
297+ descending: true ,
298+ nulls_first: false ,
299+ } ,
300+ SortOptions {
301+ descending: true ,
302+ nulls_first: false ,
303+ } ,
304+ ] ;
305+
306+ // Values (which will be merged) doesn't have to be ordered.
307+ let lhs_vals_arr = Arc :: new ( Int64Array :: from ( vec ! [ 0 , 1 , 2 , 1 , 2 ] ) ) as ArrayRef ;
308+ let lhs_vals = ( 0 ..lhs_vals_arr. len ( ) )
309+ . map ( |idx| ScalarValue :: try_from_array ( & lhs_vals_arr, idx) )
310+ . collect :: < Result < VecDeque < _ > > > ( ) ?;
311+
312+ let rhs_vals_arr = Arc :: new ( Int64Array :: from ( vec ! [ 0 , 1 , 2 , 1 , 2 ] ) ) as ArrayRef ;
313+ let rhs_vals = ( 0 ..rhs_vals_arr. len ( ) )
314+ . map ( |idx| ScalarValue :: try_from_array ( & rhs_vals_arr, idx) )
315+ . collect :: < Result < VecDeque < _ > > > ( ) ?;
316+ let expected =
317+ Arc :: new ( Int64Array :: from ( vec ! [ 0 , 0 , 1 , 1 , 2 , 2 , 1 , 1 , 2 , 2 ] ) ) as ArrayRef ;
318+ let expected_ts = vec ! [
319+ Arc :: new( Int64Array :: from( vec![ 2 , 2 , 1 , 1 , 1 , 1 , 0 , 0 , 0 , 0 ] ) ) as ArrayRef ,
320+ Arc :: new( Int64Array :: from( vec![ 4 , 4 , 3 , 3 , 2 , 2 , 1 , 1 , 0 , 0 ] ) ) as ArrayRef ,
321+ ] ;
322+ let ( merged_vals, merged_ts) = merge_ordered_arrays (
323+ & mut [ lhs_vals, rhs_vals] ,
324+ & mut [ lhs_orderings, rhs_orderings] ,
325+ & sort_options,
326+ ) ?;
327+ let merged_vals = ScalarValue :: iter_to_array ( merged_vals. into_iter ( ) ) ?;
328+ let merged_ts = ( 0 ..merged_ts[ 0 ] . len ( ) )
329+ . map ( |col_idx| {
330+ ScalarValue :: iter_to_array (
331+ ( 0 ..merged_ts. len ( ) )
332+ . map ( |row_idx| merged_ts[ row_idx] [ col_idx] . clone ( ) ) ,
333+ )
334+ } )
335+ . collect :: < Result < Vec < _ > > > ( ) ?;
336+
337+ assert_eq ! ( & merged_vals, & expected) ;
338+ assert_eq ! ( & merged_ts, & expected_ts) ;
339+ Ok ( ( ) )
340+ }
341+ }
0 commit comments