@@ -55,6 +55,7 @@ use super::ParquetFileMetrics;
5555/// Note: This method currently ignores ColumnOrder
5656/// <https://github.com/apache/arrow-datafusion/issues/8335>
5757pub ( crate ) fn prune_row_groups_by_statistics (
58+ arrow_schema : & Schema ,
5859 parquet_schema : & SchemaDescriptor ,
5960 groups : & [ RowGroupMetaData ] ,
6061 range : Option < FileRange > ,
@@ -80,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics(
8081 let pruning_stats = RowGroupPruningStatistics {
8182 parquet_schema,
8283 row_group_metadata : metadata,
83- arrow_schema : predicate . schema ( ) . as_ref ( ) ,
84+ arrow_schema,
8485 } ;
8586 match predicate. prune ( & pruning_stats) {
8687 Ok ( values) => {
@@ -415,11 +416,11 @@ mod tests {
415416 fn row_group_pruning_predicate_simple_expr ( ) {
416417 use datafusion_expr:: { col, lit} ;
417418 // int > 1 => c1_max > 1
418- let schema = Schema :: new ( vec ! [ Field :: new( "c1" , DataType :: Int32 , false ) ] ) ;
419+ let schema =
420+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "c1" , DataType :: Int32 , false ) ] ) ) ;
419421 let expr = col ( "c1" ) . gt ( lit ( 15 ) ) ;
420422 let expr = logical2physical ( & expr, & schema) ;
421- let pruning_predicate =
422- PruningPredicate :: try_new ( expr, Arc :: new ( schema) ) . unwrap ( ) ;
423+ let pruning_predicate = PruningPredicate :: try_new ( expr, schema. clone ( ) ) . unwrap ( ) ;
423424
424425 let field = PrimitiveTypeField :: new ( "c1" , PhysicalType :: INT32 ) ;
425426 let schema_descr = get_test_schema_descr ( vec ! [ field] ) ;
@@ -435,6 +436,7 @@ mod tests {
435436 let metrics = parquet_file_metrics ( ) ;
436437 assert_eq ! (
437438 prune_row_groups_by_statistics(
439+ & schema,
438440 & schema_descr,
439441 & [ rgm1, rgm2] ,
440442 None ,
@@ -449,11 +451,11 @@ mod tests {
449451 fn row_group_pruning_predicate_missing_stats ( ) {
450452 use datafusion_expr:: { col, lit} ;
451453 // int > 1 => c1_max > 1
452- let schema = Schema :: new ( vec ! [ Field :: new( "c1" , DataType :: Int32 , false ) ] ) ;
454+ let schema =
455+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "c1" , DataType :: Int32 , false ) ] ) ) ;
453456 let expr = col ( "c1" ) . gt ( lit ( 15 ) ) ;
454457 let expr = logical2physical ( & expr, & schema) ;
455- let pruning_predicate =
456- PruningPredicate :: try_new ( expr, Arc :: new ( schema) ) . unwrap ( ) ;
458+ let pruning_predicate = PruningPredicate :: try_new ( expr, schema. clone ( ) ) . unwrap ( ) ;
457459
458460 let field = PrimitiveTypeField :: new ( "c1" , PhysicalType :: INT32 ) ;
459461 let schema_descr = get_test_schema_descr ( vec ! [ field] ) ;
@@ -470,6 +472,7 @@ mod tests {
470472 // is null / undefined so the first row group can't be filtered out
471473 assert_eq ! (
472474 prune_row_groups_by_statistics(
475+ & schema,
473476 & schema_descr,
474477 & [ rgm1, rgm2] ,
475478 None ,
@@ -518,6 +521,7 @@ mod tests {
518521 // when conditions are joined using AND
519522 assert_eq ! (
520523 prune_row_groups_by_statistics(
524+ & schema,
521525 & schema_descr,
522526 groups,
523527 None ,
@@ -531,12 +535,13 @@ mod tests {
531535 // this bypasses the entire predicate expression and no row groups are filtered out
532536 let expr = col ( "c1" ) . gt ( lit ( 15 ) ) . or ( col ( "c2" ) . rem ( lit ( 2 ) ) . eq ( lit ( 0 ) ) ) ;
533537 let expr = logical2physical ( & expr, & schema) ;
534- let pruning_predicate = PruningPredicate :: try_new ( expr, schema) . unwrap ( ) ;
538+ let pruning_predicate = PruningPredicate :: try_new ( expr, schema. clone ( ) ) . unwrap ( ) ;
535539
536540 // if conditions in predicate are joined with OR and an unsupported expression is used
537541 // this bypasses the entire predicate expression and no row groups are filtered out
538542 assert_eq ! (
539543 prune_row_groups_by_statistics(
544+ & schema,
540545 & schema_descr,
541546 groups,
542547 None ,
@@ -547,6 +552,64 @@ mod tests {
547552 ) ;
548553 }
549554
555+ #[ test]
556+ fn row_group_pruning_predicate_file_schema ( ) {
557+ use datafusion_expr:: { col, lit} ;
558+ // test row group predicate when file schema is different than table schema
559+ // c1 > 0
560+ let table_schema = Arc :: new ( Schema :: new ( vec ! [
561+ Field :: new( "c1" , DataType :: Int32 , false ) ,
562+ Field :: new( "c2" , DataType :: Int32 , false ) ,
563+ ] ) ) ;
564+ let expr = col ( "c1" ) . gt ( lit ( 0 ) ) ;
565+ let expr = logical2physical ( & expr, & table_schema) ;
566+ let pruning_predicate =
567+ PruningPredicate :: try_new ( expr, table_schema. clone ( ) ) . unwrap ( ) ;
568+
569+ // Model a file schema's column order c2 then c1, which is the opposite
570+ // of the table schema
571+ let file_schema = Arc :: new ( Schema :: new ( vec ! [
572+ Field :: new( "c2" , DataType :: Int32 , false ) ,
573+ Field :: new( "c1" , DataType :: Int32 , false ) ,
574+ ] ) ) ;
575+ let schema_descr = get_test_schema_descr ( vec ! [
576+ PrimitiveTypeField :: new( "c2" , PhysicalType :: INT32 ) ,
577+ PrimitiveTypeField :: new( "c1" , PhysicalType :: INT32 ) ,
578+ ] ) ;
579+ // rg1 has c2 less than zero, c1 greater than zero
580+ let rgm1 = get_row_group_meta_data (
581+ & schema_descr,
582+ vec ! [
583+ ParquetStatistics :: int32( Some ( -10 ) , Some ( -1 ) , None , 0 , false ) , // c2
584+ ParquetStatistics :: int32( Some ( 1 ) , Some ( 10 ) , None , 0 , false ) ,
585+ ] ,
586+ ) ;
587+ // rg1 has c2 greater than zero, c1 less than zero
588+ let rgm2 = get_row_group_meta_data (
589+ & schema_descr,
590+ vec ! [
591+ ParquetStatistics :: int32( Some ( 1 ) , Some ( 10 ) , None , 0 , false ) ,
592+ ParquetStatistics :: int32( Some ( -10 ) , Some ( -1 ) , None , 0 , false ) ,
593+ ] ,
594+ ) ;
595+
596+ let metrics = parquet_file_metrics ( ) ;
597+ let groups = & [ rgm1, rgm2] ;
598+ // the first row group should be left because c1 is greater than zero
599+ // the second should be filtered out because c1 is less than zero
600+ assert_eq ! (
601+ prune_row_groups_by_statistics(
602+ & file_schema, // NB must be file schema, not table_schema
603+ & schema_descr,
604+ groups,
605+ None ,
606+ Some ( & pruning_predicate) ,
607+ & metrics
608+ ) ,
609+ vec![ 0 ]
610+ ) ;
611+ }
612+
550613 fn gen_row_group_meta_data_for_pruning_predicate ( ) -> Vec < RowGroupMetaData > {
551614 let schema_descr = get_test_schema_descr ( vec ! [
552615 PrimitiveTypeField :: new( "c1" , PhysicalType :: INT32 ) ,
@@ -580,13 +643,14 @@ mod tests {
580643 let schema_descr = arrow_to_parquet_schema ( & schema) . unwrap ( ) ;
581644 let expr = col ( "c1" ) . gt ( lit ( 15 ) ) . and ( col ( "c2" ) . is_null ( ) ) ;
582645 let expr = logical2physical ( & expr, & schema) ;
583- let pruning_predicate = PruningPredicate :: try_new ( expr, schema) . unwrap ( ) ;
646+ let pruning_predicate = PruningPredicate :: try_new ( expr, schema. clone ( ) ) . unwrap ( ) ;
584647 let groups = gen_row_group_meta_data_for_pruning_predicate ( ) ;
585648
586649 let metrics = parquet_file_metrics ( ) ;
587650 // First row group was filtered out because it contains no null value on "c2".
588651 assert_eq ! (
589652 prune_row_groups_by_statistics(
653+ & schema,
590654 & schema_descr,
591655 & groups,
592656 None ,
@@ -612,14 +676,15 @@ mod tests {
612676 . gt ( lit ( 15 ) )
613677 . and ( col ( "c2" ) . eq ( lit ( ScalarValue :: Boolean ( None ) ) ) ) ;
614678 let expr = logical2physical ( & expr, & schema) ;
615- let pruning_predicate = PruningPredicate :: try_new ( expr, schema) . unwrap ( ) ;
679+ let pruning_predicate = PruningPredicate :: try_new ( expr, schema. clone ( ) ) . unwrap ( ) ;
616680 let groups = gen_row_group_meta_data_for_pruning_predicate ( ) ;
617681
618682 let metrics = parquet_file_metrics ( ) ;
619683 // bool = NULL always evaluates to NULL (and thus will not
620684 // pass predicates. Ideally these should both be false
621685 assert_eq ! (
622686 prune_row_groups_by_statistics(
687+ & schema,
623688 & schema_descr,
624689 & groups,
625690 None ,
@@ -638,8 +703,11 @@ mod tests {
638703
639704 // INT32: c1 > 5, the c1 is decimal(9,2)
640705 // The type of scalar value if decimal(9,2), don't need to do cast
641- let schema =
642- Schema :: new ( vec ! [ Field :: new( "c1" , DataType :: Decimal128 ( 9 , 2 ) , false ) ] ) ;
706+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
707+ "c1" ,
708+ DataType :: Decimal128 ( 9 , 2 ) ,
709+ false ,
710+ ) ] ) ) ;
643711 let field = PrimitiveTypeField :: new ( "c1" , PhysicalType :: INT32 )
644712 . with_logical_type ( LogicalType :: Decimal {
645713 scale : 2 ,
@@ -650,8 +718,7 @@ mod tests {
650718 let schema_descr = get_test_schema_descr ( vec ! [ field] ) ;
651719 let expr = col ( "c1" ) . gt ( lit ( ScalarValue :: Decimal128 ( Some ( 500 ) , 9 , 2 ) ) ) ;
652720 let expr = logical2physical ( & expr, & schema) ;
653- let pruning_predicate =
654- PruningPredicate :: try_new ( expr, Arc :: new ( schema) ) . unwrap ( ) ;
721+ let pruning_predicate = PruningPredicate :: try_new ( expr, schema. clone ( ) ) . unwrap ( ) ;
655722 let rgm1 = get_row_group_meta_data (
656723 & schema_descr,
657724 // [1.00, 6.00]
@@ -679,6 +746,7 @@ mod tests {
679746 let metrics = parquet_file_metrics ( ) ;
680747 assert_eq ! (
681748 prune_row_groups_by_statistics(
749+ & schema,
682750 & schema_descr,
683751 & [ rgm1, rgm2, rgm3] ,
684752 None ,
@@ -692,8 +760,11 @@ mod tests {
692760 // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
693761 // We should convert all type to the coercion type, which is decimal(11,2)
694762 // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
695- let schema =
696- Schema :: new ( vec ! [ Field :: new( "c1" , DataType :: Decimal128 ( 9 , 0 ) , false ) ] ) ;
763+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
764+ "c1" ,
765+ DataType :: Decimal128 ( 9 , 0 ) ,
766+ false ,
767+ ) ] ) ) ;
697768
698769 let field = PrimitiveTypeField :: new ( "c1" , PhysicalType :: INT32 )
699770 . with_logical_type ( LogicalType :: Decimal {
@@ -708,8 +779,7 @@ mod tests {
708779 Decimal128 ( 11 , 2 ) ,
709780 ) ) ;
710781 let expr = logical2physical ( & expr, & schema) ;
711- let pruning_predicate =
712- PruningPredicate :: try_new ( expr, Arc :: new ( schema) ) . unwrap ( ) ;
782+ let pruning_predicate = PruningPredicate :: try_new ( expr, schema. clone ( ) ) . unwrap ( ) ;
713783 let rgm1 = get_row_group_meta_data (
714784 & schema_descr,
715785 // [100, 600]
@@ -743,6 +813,7 @@ mod tests {
743813 let metrics = parquet_file_metrics ( ) ;
744814 assert_eq ! (
745815 prune_row_groups_by_statistics(
816+ & schema,
746817 & schema_descr,
747818 & [ rgm1, rgm2, rgm3, rgm4] ,
748819 None ,
@@ -753,8 +824,11 @@ mod tests {
753824 ) ;
754825
755826 // INT64: c1 < 5, the c1 is decimal(18,2)
756- let schema =
757- Schema :: new ( vec ! [ Field :: new( "c1" , DataType :: Decimal128 ( 18 , 2 ) , false ) ] ) ;
827+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
828+ "c1" ,
829+ DataType :: Decimal128 ( 18 , 2 ) ,
830+ false ,
831+ ) ] ) ) ;
758832 let field = PrimitiveTypeField :: new ( "c1" , PhysicalType :: INT64 )
759833 . with_logical_type ( LogicalType :: Decimal {
760834 scale : 2 ,
@@ -765,8 +839,7 @@ mod tests {
765839 let schema_descr = get_test_schema_descr ( vec ! [ field] ) ;
766840 let expr = col ( "c1" ) . lt ( lit ( ScalarValue :: Decimal128 ( Some ( 500 ) , 18 , 2 ) ) ) ;
767841 let expr = logical2physical ( & expr, & schema) ;
768- let pruning_predicate =
769- PruningPredicate :: try_new ( expr, Arc :: new ( schema) ) . unwrap ( ) ;
842+ let pruning_predicate = PruningPredicate :: try_new ( expr, schema. clone ( ) ) . unwrap ( ) ;
770843 let rgm1 = get_row_group_meta_data (
771844 & schema_descr,
772845 // [6.00, 8.00]
@@ -791,6 +864,7 @@ mod tests {
791864 let metrics = parquet_file_metrics ( ) ;
792865 assert_eq ! (
793866 prune_row_groups_by_statistics(
867+ & schema,
794868 & schema_descr,
795869 & [ rgm1, rgm2, rgm3] ,
796870 None ,
@@ -802,8 +876,11 @@ mod tests {
802876
803877 // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
804878 // the type of parquet is decimal(18,2)
805- let schema =
806- Schema :: new ( vec ! [ Field :: new( "c1" , DataType :: Decimal128 ( 18 , 2 ) , false ) ] ) ;
879+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
880+ "c1" ,
881+ DataType :: Decimal128 ( 18 , 2 ) ,
882+ false ,
883+ ) ] ) ) ;
807884 let field = PrimitiveTypeField :: new ( "c1" , PhysicalType :: FIXED_LEN_BYTE_ARRAY )
808885 . with_logical_type ( LogicalType :: Decimal {
809886 scale : 2 ,
@@ -817,8 +894,7 @@ mod tests {
817894 let left = cast ( col ( "c1" ) , DataType :: Decimal128 ( 28 , 3 ) ) ;
818895 let expr = left. eq ( lit ( ScalarValue :: Decimal128 ( Some ( 100000 ) , 28 , 3 ) ) ) ;
819896 let expr = logical2physical ( & expr, & schema) ;
820- let pruning_predicate =
821- PruningPredicate :: try_new ( expr, Arc :: new ( schema) ) . unwrap ( ) ;
897+ let pruning_predicate = PruningPredicate :: try_new ( expr, schema. clone ( ) ) . unwrap ( ) ;
822898 // we must use the big-endian when encode the i128 to bytes or vec[u8].
823899 let rgm1 = get_row_group_meta_data (
824900 & schema_descr,
@@ -862,6 +938,7 @@ mod tests {
862938 let metrics = parquet_file_metrics ( ) ;
863939 assert_eq ! (
864940 prune_row_groups_by_statistics(
941+ & schema,
865942 & schema_descr,
866943 & [ rgm1, rgm2, rgm3] ,
867944 None ,
@@ -873,8 +950,11 @@ mod tests {
873950
874951 // BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
875952 // the type of parquet is decimal(18,2)
876- let schema =
877- Schema :: new ( vec ! [ Field :: new( "c1" , DataType :: Decimal128 ( 18 , 2 ) , false ) ] ) ;
953+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
954+ "c1" ,
955+ DataType :: Decimal128 ( 18 , 2 ) ,
956+ false ,
957+ ) ] ) ) ;
878958 let field = PrimitiveTypeField :: new ( "c1" , PhysicalType :: BYTE_ARRAY )
879959 . with_logical_type ( LogicalType :: Decimal {
880960 scale : 2 ,
@@ -888,8 +968,7 @@ mod tests {
888968 let left = cast ( col ( "c1" ) , DataType :: Decimal128 ( 28 , 3 ) ) ;
889969 let expr = left. eq ( lit ( ScalarValue :: Decimal128 ( Some ( 100000 ) , 28 , 3 ) ) ) ;
890970 let expr = logical2physical ( & expr, & schema) ;
891- let pruning_predicate =
892- PruningPredicate :: try_new ( expr, Arc :: new ( schema) ) . unwrap ( ) ;
971+ let pruning_predicate = PruningPredicate :: try_new ( expr, schema. clone ( ) ) . unwrap ( ) ;
893972 // we must use the big-endian when encode the i128 to bytes or vec[u8].
894973 let rgm1 = get_row_group_meta_data (
895974 & schema_descr,
@@ -922,6 +1001,7 @@ mod tests {
9221001 let metrics = parquet_file_metrics ( ) ;
9231002 assert_eq ! (
9241003 prune_row_groups_by_statistics(
1004+ & schema,
9251005 & schema_descr,
9261006 & [ rgm1, rgm2, rgm3] ,
9271007 None ,
0 commit comments