@@ -21,10 +21,8 @@ use arrow_array::cast::AsArray;
2121use arrow_array:: Array ;
2222use arrow_array:: { RecordBatch , RecordBatchReader } ;
2323use arrow_schema:: { ArrowError , DataType as ArrowType , Schema , SchemaRef } ;
24- use arrow_select:: filter:: prep_null_mask_filter;
2524pub use filter:: { ArrowPredicate , ArrowPredicateFn , RowFilter } ;
2625pub use selection:: { RowSelection , RowSelector } ;
27- use std:: collections:: VecDeque ;
2826use std:: sync:: Arc ;
2927
3028pub use crate :: arrow:: array_reader:: RowGroups ;
@@ -39,7 +37,10 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3937use crate :: file:: reader:: { ChunkReader , SerializedPageReader } ;
4038use crate :: schema:: types:: SchemaDescriptor ;
4139
40+ pub ( crate ) use read_plan:: { ReadPlan , ReadPlanBuilder } ;
41+
4242mod filter;
43+ mod read_plan;
4344mod selection;
4445pub mod statistics;
4546
@@ -679,38 +680,32 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
679680 } ;
680681
681682 let mut filter = self . filter ;
682- let mut selection = self . selection ;
683+ let mut plan_builder = ReadPlanBuilder :: new ( batch_size ) . with_selection ( self . selection ) ;
683684
685+ // Update selection based on any filters
684686 if let Some ( filter) = filter. as_mut ( ) {
685687 for predicate in filter. predicates . iter_mut ( ) {
686- if !selects_any ( selection. as_ref ( ) ) {
688+ // break early if we have ruled out all rows
689+ if !plan_builder. selects_any ( ) {
687690 break ;
688691 }
689692
690693 let array_reader =
691694 build_array_reader ( self . fields . as_deref ( ) , predicate. projection ( ) , & reader) ?;
692695
693- selection = Some ( evaluate_predicate (
694- batch_size,
695- array_reader,
696- selection,
697- predicate. as_mut ( ) ,
698- ) ?) ;
696+ plan_builder = plan_builder. with_predicate ( array_reader, predicate. as_mut ( ) ) ?;
699697 }
700698 }
701699
702700 let array_reader = build_array_reader ( self . fields . as_deref ( ) , & self . projection , & reader) ?;
701+ let read_plan = plan_builder
702+ . limited ( reader. num_rows ( ) )
703+ . with_offset ( self . offset )
704+ . with_limit ( self . limit )
705+ . build_limited ( )
706+ . build ( ) ;
703707
704- // If selection is empty, truncate
705- if !selects_any ( selection. as_ref ( ) ) {
706- selection = Some ( RowSelection :: from ( vec ! [ ] ) ) ;
707- }
708-
709- Ok ( ParquetRecordBatchReader :: new (
710- batch_size,
711- array_reader,
712- apply_range ( selection, reader. num_rows ( ) , self . offset , self . limit ) ,
713- ) )
708+ Ok ( ParquetRecordBatchReader :: new ( array_reader, read_plan) )
714709 }
715710}
716711
@@ -789,11 +784,9 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
789784/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
790785/// read from a parquet data source
791786pub struct ParquetRecordBatchReader {
792- batch_size : usize ,
793787 array_reader : Box < dyn ArrayReader > ,
794788 schema : SchemaRef ,
795- /// Row ranges to be selected from the data source
796- selection : Option < VecDeque < RowSelector > > ,
789+ read_plan : ReadPlan ,
797790}
798791
799792impl Iterator for ParquetRecordBatchReader {
@@ -814,9 +807,10 @@ impl ParquetRecordBatchReader {
814807 /// simplify error handling with `?`
815808 fn next_inner ( & mut self ) -> Result < Option < RecordBatch > > {
816809 let mut read_records = 0 ;
817- match self . selection . as_mut ( ) {
810+ let batch_size = self . batch_size ( ) ;
811+ match self . read_plan . selection_mut ( ) {
818812 Some ( selection) => {
819- while read_records < self . batch_size && !selection. is_empty ( ) {
813+ while read_records < batch_size && !selection. is_empty ( ) {
820814 let front = selection. pop_front ( ) . unwrap ( ) ;
821815 if front. skip {
822816 let skipped = self . array_reader . skip_records ( front. row_count ) ?;
@@ -838,7 +832,7 @@ impl ParquetRecordBatchReader {
838832 }
839833
840834 // try to read record
841- let need_read = self . batch_size - read_records;
835+ let need_read = batch_size - read_records;
842836 let to_read = match front. row_count . checked_sub ( need_read) {
843837 Some ( remaining) if remaining != 0 => {
844838 // if page row count less than batch_size we must set batch size to page row count.
@@ -855,7 +849,7 @@ impl ParquetRecordBatchReader {
855849 }
856850 }
857851 None => {
858- self . array_reader . read_records ( self . batch_size ) ?;
852+ self . array_reader . read_records ( batch_size) ?;
859853 }
860854 } ;
861855
@@ -905,116 +899,37 @@ impl ParquetRecordBatchReader {
905899 let array_reader =
906900 build_array_reader ( levels. levels . as_ref ( ) , & ProjectionMask :: all ( ) , row_groups) ?;
907901
902+ let read_plan = ReadPlanBuilder :: new ( batch_size)
903+ . with_selection ( selection)
904+ . build ( ) ;
905+
908906 Ok ( Self {
909- batch_size,
910907 array_reader,
911908 schema : Arc :: new ( Schema :: new ( levels. fields . clone ( ) ) ) ,
912- selection : selection . map ( |s| s . trim ( ) . into ( ) ) ,
909+ read_plan ,
913910 } )
914911 }
915912
916913 /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
917914 /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
918915 /// all rows will be returned
919- pub ( crate ) fn new (
920- batch_size : usize ,
921- array_reader : Box < dyn ArrayReader > ,
922- selection : Option < RowSelection > ,
923- ) -> Self {
916+ pub ( crate ) fn new ( array_reader : Box < dyn ArrayReader > , read_plan : ReadPlan ) -> Self {
924917 let schema = match array_reader. get_data_type ( ) {
925918 ArrowType :: Struct ( ref fields) => Schema :: new ( fields. clone ( ) ) ,
926919 _ => unreachable ! ( "Struct array reader's data type is not struct!" ) ,
927920 } ;
928921
929922 Self {
930- batch_size,
931923 array_reader,
932924 schema : Arc :: new ( schema) ,
933- selection : selection . map ( |s| s . trim ( ) . into ( ) ) ,
925+ read_plan ,
934926 }
935927 }
936- }
937928
938- /// Returns `true` if `selection` is `None` or selects some rows
939- pub ( crate ) fn selects_any ( selection : Option < & RowSelection > ) -> bool {
940- selection. map ( |x| x. selects_any ( ) ) . unwrap_or ( true )
941- }
942-
943- /// Applies an optional offset and limit to an optional [`RowSelection`]
944- pub ( crate ) fn apply_range (
945- mut selection : Option < RowSelection > ,
946- row_count : usize ,
947- offset : Option < usize > ,
948- limit : Option < usize > ,
949- ) -> Option < RowSelection > {
950- // If an offset is defined, apply it to the `selection`
951- if let Some ( offset) = offset {
952- selection = Some ( match row_count. checked_sub ( offset) {
953- None => RowSelection :: from ( vec ! [ ] ) ,
954- Some ( remaining) => selection
955- . map ( |selection| selection. offset ( offset) )
956- . unwrap_or_else ( || {
957- RowSelection :: from ( vec ! [
958- RowSelector :: skip( offset) ,
959- RowSelector :: select( remaining) ,
960- ] )
961- } ) ,
962- } ) ;
963- }
964-
965- // If a limit is defined, apply it to the final `selection`
966- if let Some ( limit) = limit {
967- selection = Some (
968- selection
969- . map ( |selection| selection. limit ( limit) )
970- . unwrap_or_else ( || {
971- RowSelection :: from ( vec ! [ RowSelector :: select( limit. min( row_count) ) ] )
972- } ) ,
973- ) ;
974- }
975- selection
976- }
977-
978- /// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
979- /// which rows to return.
980- ///
981- /// `input_selection`: Optional pre-existing selection. If `Some`, then the
982- /// final [`RowSelection`] will be the conjunction of it and the rows selected
983- /// by `predicate`.
984- ///
985- /// Note: A pre-existing selection may come from evaluating a previous predicate
986- /// or if the [`ParquetRecordBatchReader`] specified an explicit
987- /// [`RowSelection`] in addition to one or more predicates.
988- pub ( crate ) fn evaluate_predicate (
989- batch_size : usize ,
990- array_reader : Box < dyn ArrayReader > ,
991- input_selection : Option < RowSelection > ,
992- predicate : & mut dyn ArrowPredicate ,
993- ) -> Result < RowSelection > {
994- let reader = ParquetRecordBatchReader :: new ( batch_size, array_reader, input_selection. clone ( ) ) ;
995- let mut filters = vec ! [ ] ;
996- for maybe_batch in reader {
997- let maybe_batch = maybe_batch?;
998- let input_rows = maybe_batch. num_rows ( ) ;
999- let filter = predicate. evaluate ( maybe_batch) ?;
1000- // Since user supplied predicate, check error here to catch bugs quickly
1001- if filter. len ( ) != input_rows {
1002- return Err ( arrow_err ! (
1003- "ArrowPredicate predicate returned {} rows, expected {input_rows}" ,
1004- filter. len( )
1005- ) ) ;
1006- }
1007- match filter. null_count ( ) {
1008- 0 => filters. push ( filter) ,
1009- _ => filters. push ( prep_null_mask_filter ( & filter) ) ,
1010- } ;
929+ #[ inline( always) ]
930+ pub ( crate ) fn batch_size ( & self ) -> usize {
931+ self . read_plan . batch_size ( )
1011932 }
1012-
1013- let raw = RowSelection :: from_filters ( & filters) ;
1014- Ok ( match input_selection {
1015- Some ( selection) => selection. and_then ( & raw ) ,
1016- None => raw,
1017- } )
1018933}
1019934
1020935#[ cfg( test) ]
@@ -3993,7 +3908,7 @@ mod tests {
39933908 . build ( )
39943909 . unwrap ( ) ;
39953910 assert_ne ! ( 1024 , num_rows) ;
3996- assert_eq ! ( reader. batch_size, num_rows as usize ) ;
3911+ assert_eq ! ( reader. read_plan . batch_size( ) , num_rows as usize ) ;
39973912 }
39983913
39993914 #[ test]
0 commit comments