@@ -39,7 +39,9 @@ mod test {
3939 use datafusion_physical_plan:: projection:: ProjectionExec ;
4040 use datafusion_physical_plan:: sorts:: sort:: SortExec ;
4141 use datafusion_physical_plan:: union:: UnionExec ;
42- use datafusion_physical_plan:: { execute_stream_partitioned, ExecutionPlan } ;
42+ use datafusion_physical_plan:: {
43+ execute_stream_partitioned, ExecutionPlan , ExecutionPlanProperties ,
44+ } ;
4345 use futures:: TryStreamExt ;
4446 use std:: sync:: Arc ;
4547
@@ -186,7 +188,9 @@ mod test {
186188 #[ tokio:: test]
187189 async fn test_statistics_by_partition_of_data_source ( ) -> Result < ( ) > {
188190 let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
189- let statistics = scan. statistics_by_partition ( ) ?;
191+ let statistics = ( 0 ..scan. output_partitioning ( ) . partition_count ( ) )
192+ . map ( |idx| scan. partition_statistics ( Some ( idx) ) )
193+ . collect :: < Result < Vec < _ > > > ( ) ?;
190194 let expected_statistic_partition_1 =
191195 create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
192196 let expected_statistic_partition_2 =
@@ -212,8 +216,11 @@ mod test {
212216 // Add projection execution plan
213217 let exprs: Vec < ( Arc < dyn PhysicalExpr > , String ) > =
214218 vec ! [ ( Arc :: new( Column :: new( "id" , 0 ) ) , "id" . to_string( ) ) ] ;
215- let projection = ProjectionExec :: try_new ( exprs, scan) ?;
216- let statistics = projection. statistics_by_partition ( ) ?;
219+ let projection: Arc < dyn ExecutionPlan > =
220+ Arc :: new ( ProjectionExec :: try_new ( exprs, scan) ?) ;
221+ let statistics = ( 0 ..projection. output_partitioning ( ) . partition_count ( ) )
222+ . map ( |idx| projection. partition_statistics ( Some ( idx) ) )
223+ . collect :: < Result < Vec < _ > > > ( ) ?;
217224 let expected_statistic_partition_1 =
218225 create_partition_statistics ( 2 , 8 , 3 , 4 , false ) ;
219226 let expected_statistic_partition_2 =
@@ -225,7 +232,7 @@ mod test {
225232
226233 // Check the statistics_by_partition with real results
227234 let expected_stats = vec ! [ ( 3 , 4 , 2 ) , ( 1 , 2 , 2 ) ] ;
228- validate_statistics_with_data ( Arc :: new ( projection) , expected_stats, 0 ) . await ?;
235+ validate_statistics_with_data ( projection, expected_stats, 0 ) . await ?;
229236 Ok ( ( ) )
230237 }
231238
@@ -243,8 +250,10 @@ mod test {
243250 } ] ) ,
244251 scan_1,
245252 ) ;
246- let sort_exec = Arc :: new ( sort. clone ( ) ) ;
247- let statistics = sort_exec. statistics_by_partition ( ) ?;
253+ let sort_exec: Arc < dyn ExecutionPlan > = Arc :: new ( sort. clone ( ) ) ;
254+ let statistics = ( 0 ..sort_exec. output_partitioning ( ) . partition_count ( ) )
255+ . map ( |idx| sort_exec. partition_statistics ( Some ( idx) ) )
256+ . collect :: < Result < Vec < _ > > > ( ) ?;
248257 let expected_statistic_partition =
249258 create_partition_statistics ( 4 , 220 , 1 , 4 , true ) ;
250259 assert_eq ! ( statistics. len( ) , 1 ) ;
@@ -256,7 +265,7 @@ mod test {
256265 // Sort with preserve_partitioning
257266 let scan_2 = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
258267 // Add sort execution plan
259- let sort_exec = Arc :: new (
268+ let sort_exec: Arc < dyn ExecutionPlan > = Arc :: new (
260269 SortExec :: new (
261270 LexOrdering :: new ( vec ! [ PhysicalSortExpr {
262271 expr: Arc :: new( Column :: new( "id" , 0 ) ) ,
@@ -273,7 +282,9 @@ mod test {
273282 create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
274283 let expected_statistic_partition_2 =
275284 create_partition_statistics ( 2 , 110 , 1 , 2 , true ) ;
276- let statistics = sort_exec. statistics_by_partition ( ) ?;
285+ let statistics = ( 0 ..sort_exec. output_partitioning ( ) . partition_count ( ) )
286+ . map ( |idx| sort_exec. partition_statistics ( Some ( idx) ) )
287+ . collect :: < Result < Vec < _ > > > ( ) ?;
277288 assert_eq ! ( statistics. len( ) , 2 ) ;
278289 assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
279290 assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
@@ -296,7 +307,7 @@ mod test {
296307 ) ?;
297308 let filter: Arc < dyn ExecutionPlan > =
298309 Arc :: new ( FilterExec :: try_new ( predicate, scan) ?) ;
299- let full_statistics = filter. statistics ( ) ?;
310+ let full_statistics = filter. partition_statistics ( None ) ?;
300311 let expected_full_statistic = Statistics {
301312 num_rows : Precision :: Inexact ( 0 ) ,
302313 total_byte_size : Precision :: Inexact ( 0 ) ,
@@ -319,7 +330,9 @@ mod test {
319330 } ;
320331 assert_eq ! ( full_statistics, expected_full_statistic) ;
321332
322- let statistics = filter. statistics_by_partition ( ) ?;
333+ let statistics = ( 0 ..filter. output_partitioning ( ) . partition_count ( ) )
334+ . map ( |idx| filter. partition_statistics ( Some ( idx) ) )
335+ . collect :: < Result < Vec < _ > > > ( ) ?;
323336 assert_eq ! ( statistics. len( ) , 2 ) ;
324337 assert_eq ! ( statistics[ 0 ] , expected_full_statistic) ;
325338 assert_eq ! ( statistics[ 1 ] , expected_full_statistic) ;
@@ -329,8 +342,11 @@ mod test {
329342 #[ tokio:: test]
330343 async fn test_statistic_by_partition_of_union ( ) -> Result < ( ) > {
331344 let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
332- let union_exec = Arc :: new ( UnionExec :: new ( vec ! [ scan. clone( ) , scan] ) ) ;
333- let statistics = union_exec. statistics_by_partition ( ) ?;
345+ let union_exec: Arc < dyn ExecutionPlan > =
346+ Arc :: new ( UnionExec :: new ( vec ! [ scan. clone( ) , scan] ) ) ;
347+ let statistics = ( 0 ..union_exec. output_partitioning ( ) . partition_count ( ) )
348+ . map ( |idx| union_exec. partition_statistics ( Some ( idx) ) )
349+ . collect :: < Result < Vec < _ > > > ( ) ?;
334350 // Check that we have 4 partitions (2 from each scan)
335351 assert_eq ! ( statistics. len( ) , 4 ) ;
336352 let expected_statistic_partition_1 =
@@ -360,8 +376,11 @@ mod test {
360376 WITH ORDER (id ASC);";
361377 let right_scan =
362378 create_scan_exec_with_statistics ( Some ( right_create_table_sql) , Some ( 2 ) ) . await ;
363- let cross_join = CrossJoinExec :: new ( left_scan, right_scan) ;
364- let statistics = cross_join. statistics_by_partition ( ) ?;
379+ let cross_join: Arc < dyn ExecutionPlan > =
380+ Arc :: new ( CrossJoinExec :: new ( left_scan, right_scan) ) ;
381+ let statistics = ( 0 ..cross_join. output_partitioning ( ) . partition_count ( ) )
382+ . map ( |idx| cross_join. partition_statistics ( Some ( idx) ) )
383+ . collect :: < Result < Vec < _ > > > ( ) ?;
365384 // Check that we have 2 partitions
366385 assert_eq ! ( statistics. len( ) , 2 ) ;
367386 let mut expected_statistic_partition_1 =
@@ -391,52 +410,60 @@ mod test {
391410
392411 // Check the statistics_by_partition with real results
393412 let expected_stats = vec ! [ ( 1 , 4 , 8 ) , ( 1 , 4 , 8 ) ] ;
394- validate_statistics_with_data ( Arc :: new ( cross_join) , expected_stats, 0 ) . await ?;
413+ validate_statistics_with_data ( cross_join, expected_stats, 0 ) . await ?;
395414 Ok ( ( ) )
396415 }
397416
398417 #[ tokio:: test]
399418 async fn test_statistic_by_partition_of_coalesce_batches ( ) -> Result < ( ) > {
400419 let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
401- let coalesce_batches = CoalesceBatchesExec :: new ( scan, 2 ) ;
420+ dbg ! ( scan. partition_statistics( Some ( 0 ) ) ?) ;
421+ let coalesce_batches: Arc < dyn ExecutionPlan > =
422+ Arc :: new ( CoalesceBatchesExec :: new ( scan, 2 ) ) ;
402423 let expected_statistic_partition_1 =
403424 create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
404425 let expected_statistic_partition_2 =
405426 create_partition_statistics ( 2 , 110 , 1 , 2 , true ) ;
406- let statistics = coalesce_batches. statistics_by_partition ( ) ?;
427+ let statistics = ( 0 ..coalesce_batches. output_partitioning ( ) . partition_count ( ) )
428+ . map ( |idx| coalesce_batches. partition_statistics ( Some ( idx) ) )
429+ . collect :: < Result < Vec < _ > > > ( ) ?;
407430 assert_eq ! ( statistics. len( ) , 2 ) ;
408431 assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
409432 assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
410433
411434 // Check the statistics_by_partition with real results
412435 let expected_stats = vec ! [ ( 3 , 4 , 2 ) , ( 1 , 2 , 2 ) ] ;
413- validate_statistics_with_data ( Arc :: new ( coalesce_batches) , expected_stats, 0 )
414- . await ?;
436+ validate_statistics_with_data ( coalesce_batches, expected_stats, 0 ) . await ?;
415437 Ok ( ( ) )
416438 }
417439
418440 #[ tokio:: test]
419441 async fn test_statistic_by_partition_of_coalesce_partitions ( ) -> Result < ( ) > {
420442 let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
421- let coalesce_partitions = CoalescePartitionsExec :: new ( scan) ;
443+ let coalesce_partitions: Arc < dyn ExecutionPlan > =
444+ Arc :: new ( CoalescePartitionsExec :: new ( scan) ) ;
422445 let expected_statistic_partition =
423446 create_partition_statistics ( 4 , 220 , 1 , 4 , true ) ;
424- let statistics = coalesce_partitions. statistics_by_partition ( ) ?;
447+ let statistics = ( 0 ..coalesce_partitions. output_partitioning ( ) . partition_count ( ) )
448+ . map ( |idx| coalesce_partitions. partition_statistics ( Some ( idx) ) )
449+ . collect :: < Result < Vec < _ > > > ( ) ?;
425450 assert_eq ! ( statistics. len( ) , 1 ) ;
426451 assert_eq ! ( statistics[ 0 ] , expected_statistic_partition) ;
427452
428453 // Check the statistics_by_partition with real results
429454 let expected_stats = vec ! [ ( 1 , 4 , 4 ) ] ;
430- validate_statistics_with_data ( Arc :: new ( coalesce_partitions) , expected_stats, 0 )
431- . await ?;
455+ validate_statistics_with_data ( coalesce_partitions, expected_stats, 0 ) . await ?;
432456 Ok ( ( ) )
433457 }
434458
435459 #[ tokio:: test]
436460 async fn test_statistic_by_partition_of_local_limit ( ) -> Result < ( ) > {
437461 let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
438- let local_limit = LocalLimitExec :: new ( scan. clone ( ) , 1 ) ;
439- let statistics = local_limit. statistics_by_partition ( ) ?;
462+ let local_limit: Arc < dyn ExecutionPlan > =
463+ Arc :: new ( LocalLimitExec :: new ( scan. clone ( ) , 1 ) ) ;
464+ let statistics = ( 0 ..local_limit. output_partitioning ( ) . partition_count ( ) )
465+ . map ( |idx| local_limit. partition_statistics ( Some ( idx) ) )
466+ . collect :: < Result < Vec < _ > > > ( ) ?;
440467 assert_eq ! ( statistics. len( ) , 2 ) ;
441468 let schema = scan. schema ( ) ;
442469 let mut expected_statistic_partition = Statistics :: new_unknown ( & schema) ;
@@ -449,8 +476,11 @@ mod test {
449476 #[ tokio:: test]
450477 async fn test_statistic_by_partition_of_global_limit_partitions ( ) -> Result < ( ) > {
451478 let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
452- let global_limit = GlobalLimitExec :: new ( scan. clone ( ) , 0 , Some ( 2 ) ) ;
453- let statistics = global_limit. statistics_by_partition ( ) ?;
479+ let global_limit: Arc < dyn ExecutionPlan > =
480+ Arc :: new ( GlobalLimitExec :: new ( scan. clone ( ) , 0 , Some ( 2 ) ) ) ;
481+ let statistics = ( 0 ..global_limit. output_partitioning ( ) . partition_count ( ) )
482+ . map ( |idx| global_limit. partition_statistics ( Some ( idx) ) )
483+ . collect :: < Result < Vec < _ > > > ( ) ?;
454484 assert_eq ! ( statistics. len( ) , 1 ) ;
455485 let mut expected_statistic_partition = Statistics :: new_unknown ( & scan. schema ( ) ) ;
456486 expected_statistic_partition. num_rows = Precision :: Exact ( 2 ) ;
0 commit comments