@@ -24,7 +24,7 @@ use std::sync::Arc;
2424use crate :: datasource:: listing:: PartitionedFile ;
2525use crate :: datasource:: physical_plan:: file_stream:: FileStream ;
2626use crate :: datasource:: physical_plan:: {
27- parquet:: page_filter:: PagePruningPredicate , DisplayAs , FileGroupPartitioner ,
27+ parquet:: page_filter:: PagePruningAccessPlanFilter , DisplayAs , FileGroupPartitioner ,
2828 FileScanConfig ,
2929} ;
3030use crate :: {
@@ -39,13 +39,11 @@ use crate::{
3939 } ,
4040} ;
4141
42- use arrow:: datatypes:: { DataType , SchemaRef } ;
42+ use arrow:: datatypes:: SchemaRef ;
4343use datafusion_physical_expr:: { EquivalenceProperties , LexOrdering , PhysicalExpr } ;
4444
4545use itertools:: Itertools ;
4646use log:: debug;
47- use parquet:: basic:: { ConvertedType , LogicalType } ;
48- use parquet:: schema:: types:: ColumnDescriptor ;
4947
5048mod access_plan;
5149mod metrics;
@@ -225,7 +223,7 @@ pub struct ParquetExec {
225223 /// Optional predicate for pruning row groups (derived from `predicate`)
226224 pruning_predicate : Option < Arc < PruningPredicate > > ,
227225 /// Optional predicate for pruning pages (derived from `predicate`)
228- page_pruning_predicate : Option < Arc < PagePruningPredicate > > ,
226+ page_pruning_predicate : Option < Arc < PagePruningAccessPlanFilter > > ,
229227 /// Optional hint for the size of the parquet metadata
230228 metadata_size_hint : Option < usize > ,
231229 /// Optional user defined parquet file reader factory
@@ -381,19 +379,12 @@ impl ParquetExecBuilder {
381379 } )
382380 . filter ( |p| !p. always_true ( ) ) ;
383381
384- let page_pruning_predicate = predicate. as_ref ( ) . and_then ( |predicate_expr| {
385- match PagePruningPredicate :: try_new ( predicate_expr, file_schema. clone ( ) ) {
386- Ok ( pruning_predicate) => Some ( Arc :: new ( pruning_predicate) ) ,
387- Err ( e) => {
388- debug ! (
389- "Could not create page pruning predicate for '{:?}': {}" ,
390- pruning_predicate, e
391- ) ;
392- predicate_creation_errors. add ( 1 ) ;
393- None
394- }
395- }
396- } ) ;
382+ let page_pruning_predicate = predicate
383+ . as_ref ( )
384+ . map ( |predicate_expr| {
385+ PagePruningAccessPlanFilter :: new ( predicate_expr, file_schema. clone ( ) )
386+ } )
387+ . map ( Arc :: new) ;
397388
398389 let ( projected_schema, projected_statistics, projected_output_ordering) =
399390 base_config. project ( ) ;
@@ -739,7 +730,7 @@ impl ExecutionPlan for ParquetExec {
739730
740731fn should_enable_page_index (
741732 enable_page_index : bool ,
742- page_pruning_predicate : & Option < Arc < PagePruningPredicate > > ,
733+ page_pruning_predicate : & Option < Arc < PagePruningAccessPlanFilter > > ,
743734) -> bool {
744735 enable_page_index
745736 && page_pruning_predicate. is_some ( )
@@ -749,26 +740,6 @@ fn should_enable_page_index(
749740 . unwrap_or ( false )
750741}
751742
752- // Convert parquet column schema to arrow data type, and just consider the
753- // decimal data type.
754- pub ( crate ) fn parquet_to_arrow_decimal_type (
755- parquet_column : & ColumnDescriptor ,
756- ) -> Option < DataType > {
757- let type_ptr = parquet_column. self_type_ptr ( ) ;
758- match type_ptr. get_basic_info ( ) . logical_type ( ) {
759- Some ( LogicalType :: Decimal { scale, precision } ) => {
760- Some ( DataType :: Decimal128 ( precision as u8 , scale as i8 ) )
761- }
762- _ => match type_ptr. get_basic_info ( ) . converted_type ( ) {
763- ConvertedType :: DECIMAL => Some ( DataType :: Decimal128 (
764- type_ptr. get_precision ( ) as u8 ,
765- type_ptr. get_scale ( ) as i8 ,
766- ) ) ,
767- _ => None ,
768- } ,
769- }
770- }
771-
772743#[ cfg( test) ]
773744mod tests {
774745 // See also `parquet_exec` integration test
@@ -798,7 +769,7 @@ mod tests {
798769 } ;
799770 use arrow:: datatypes:: { Field , Schema , SchemaBuilder } ;
800771 use arrow:: record_batch:: RecordBatch ;
801- use arrow_schema:: Fields ;
772+ use arrow_schema:: { DataType , Fields } ;
802773 use datafusion_common:: { assert_contains, ScalarValue } ;
803774 use datafusion_expr:: { col, lit, when, Expr } ;
804775 use datafusion_physical_expr:: planner:: logical2physical;
0 commit comments