@@ -22,7 +22,7 @@ use std::sync::Arc;
2222
2323use crate :: fuzz_cases:: aggregate_fuzz:: assert_spill_count_metric;
2424use crate :: fuzz_cases:: once_exec:: OnceExec ;
25- use arrow:: array:: UInt64Array ;
25+ use arrow:: array:: { UInt32Array , UInt64Array } ;
2626use arrow:: { array:: StringArray , compute:: SortOptions , record_batch:: RecordBatch } ;
2727use arrow_schema:: { DataType , Field , Schema } ;
2828use datafusion:: common:: Result ;
@@ -325,6 +325,138 @@ fn grow_memory_as_much_as_possible(
325325 Ok ( was_able_to_grow)
326326}
327327
328+ #[ tokio:: test]
329+ async fn test_sort_with_limited_memory_larger_cursor ( ) -> Result < ( ) > {
330+ let record_batch_size = 8192 ;
331+ let pool_size = 2 * MB as usize ;
332+ let task_ctx = {
333+ let memory_pool = Arc :: new ( FairSpillPool :: new ( pool_size) ) ;
334+ TaskContext :: default ( )
335+ . with_session_config (
336+ SessionConfig :: new ( )
337+ . with_batch_size ( record_batch_size)
338+ . with_sort_spill_reservation_bytes ( 1 ) ,
339+ )
340+ . with_runtime ( Arc :: new (
341+ RuntimeEnvBuilder :: new ( )
342+ . with_memory_pool ( memory_pool)
343+ . build ( ) ?,
344+ ) )
345+ } ;
346+
347+ // Test that the merge degree of multi level merge sort cannot be fixed size when there is not enough memory
348+ run_sort_test_q5_like_no_payload ( RunTestWithLimitedMemoryArgs {
349+ pool_size,
350+ task_ctx : Arc :: new ( task_ctx) ,
351+ number_of_record_batches : 100 ,
352+ get_size_of_record_batch_to_generate : Box :: pin ( move |_| pool_size / 6 ) ,
353+ memory_behavior : Default :: default ( ) ,
354+ } )
355+ . await ?;
356+
357+ Ok ( ( ) )
358+ }
359+ /// Q5: 3 sort keys + no payload
360+ async fn run_sort_test_q5_like_no_payload (
361+ mut args : RunTestWithLimitedMemoryArgs ,
362+ ) -> Result < usize > {
363+ let _ = std:: mem:: replace (
364+ & mut args. get_size_of_record_batch_to_generate ,
365+ Box :: pin ( move |_| unreachable ! ( "should not be called after take" ) ) ,
366+ ) ;
367+
368+ // l_linenumber: Int32, l_suppkey: Int64, l_orderkey: Int64
369+ let scan_schema = Arc :: new ( Schema :: new ( vec ! [
370+ Field :: new( "l_linenumber" , DataType :: UInt32 , false ) ,
371+ Field :: new( "l_suppkey" , DataType :: UInt64 , false ) ,
372+ Field :: new( "l_orderkey" , DataType :: UInt64 , false ) ,
373+ ] ) ) ;
374+
375+ let record_batch_size = args. task_ctx . session_config ( ) . batch_size ( ) as i64 ;
376+
377+ let lnum_step: i64 = 5 ;
378+ let supp_step: i64 = 9_973 ;
379+ let order_step: i64 = 104_729 ;
380+
381+ const L_LINE_NUMBER_CARD : i64 = 7 ;
382+ const L_SUPPKEY_CARD : i64 = 10_000 ;
383+ const L_ORDERKEY_CARD : i64 = 1_500_000 ;
384+ let schema = Arc :: clone ( & scan_schema) ;
385+ let plan: Arc < dyn ExecutionPlan > =
386+ Arc :: new ( OnceExec :: new ( Box :: pin ( RecordBatchStreamAdapter :: new (
387+ Arc :: clone ( & schema) ,
388+ futures:: stream:: iter ( ( 0 ..args. number_of_record_batches as i64 ) . map (
389+ move |batch_idx| {
390+ let start = batch_idx * record_batch_size;
391+
392+ // l_linenumber ∈ [1,7], l_suppkey ∈ [1,10_000], l_orderkey ∈ [1,1_500_000]
393+ let linenumbers =
394+ UInt32Array :: from_iter_values ( ( 0 ..record_batch_size) . map ( |i| {
395+ let n = start + i;
396+ // 1..=7
397+ ( ( n * lnum_step) . rem_euclid ( L_LINE_NUMBER_CARD ) + 1 ) as u32
398+ } ) ) ;
399+
400+ let suppkeys =
401+ UInt64Array :: from_iter_values ( ( 0 ..record_batch_size) . map ( |i| {
402+ let n = start + i;
403+ // 1..=10_000
404+ ( ( n * supp_step) . rem_euclid ( L_SUPPKEY_CARD ) + 1 ) as u64
405+ } ) ) ;
406+
407+ let orderkeys =
408+ UInt64Array :: from_iter_values ( ( 0 ..record_batch_size) . map ( |i| {
409+ let n = start + i;
410+ // 1..=1_500_000
411+ ( ( n * order_step) . rem_euclid ( L_ORDERKEY_CARD ) + 1 ) as u64
412+ } ) ) ;
413+
414+ RecordBatch :: try_new (
415+ Arc :: clone ( & schema) ,
416+ vec ! [
417+ Arc :: new( linenumbers) as _,
418+ Arc :: new( suppkeys) as _,
419+ Arc :: new( orderkeys) as _,
420+ ] ,
421+ )
422+ . map_err ( |e| e. into ( ) )
423+ } ,
424+ ) ) ,
425+ ) ) ) ) ;
426+
427+ // ORDER BY l_linenumber, l_suppkey, l_orderkey ASC
428+ let sort_exec = Arc :: new ( SortExec :: new (
429+ LexOrdering :: new ( vec ! [
430+ PhysicalSortExpr {
431+ expr: col( "l_linenumber" , & scan_schema) ?,
432+ options: SortOptions {
433+ descending: false ,
434+ nulls_first: true ,
435+ } ,
436+ } ,
437+ PhysicalSortExpr {
438+ expr: col( "l_suppkey" , & scan_schema) ?,
439+ options: SortOptions {
440+ descending: false ,
441+ nulls_first: true ,
442+ } ,
443+ } ,
444+ PhysicalSortExpr {
445+ expr: col( "l_orderkey" , & scan_schema) ?,
446+ options: SortOptions {
447+ descending: false ,
448+ nulls_first: true ,
449+ } ,
450+ } ,
451+ ] )
452+ . unwrap ( ) ,
453+ plan,
454+ ) ) ;
455+
456+ let result = sort_exec. execute ( 0 , Arc :: clone ( & args. task_ctx ) ) ?;
457+ run_test ( args, sort_exec, result) . await
458+ }
459+
328460#[ tokio:: test]
329461async fn test_aggregate_with_high_cardinality_with_limited_memory ( ) -> Result < ( ) > {
330462 let record_batch_size = 8192 ;
0 commit comments