diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 2f670a64e108..201ccf9141a3 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -18,15 +18,16 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] use arrow_array::cast::AsArray; -use arrow_array::Array; +use arrow_array::{Array, ArrayRef, BooleanArray}; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; -use arrow_select::filter::prep_null_mask_filter; +use arrow_select::filter::{filter, filter_record_batch, prep_null_mask_filter}; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; use std::collections::VecDeque; use std::sync::Arc; - +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; +use arrow_select::concat::concat; pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; @@ -680,7 +681,37 @@ impl ParquetRecordBatchReaderBuilder { let mut filter = self.filter; let mut selection = self.selection; + let mut projection = self.projection; + + + let predicate_projection = filter + .as_mut() + .map(|filter| { + filter + .predicates + .iter_mut() + .map(|p| p.projection().clone()) + .reduce(|mut acc, p| { + acc.union(&p); + acc + }) + }) + .flatten(); + + let projection_to_cache = predicate_projection.as_ref().map(|p| { + let mut p = p.clone(); + p.intersect(&projection); + p + }); + let project_exclude_filter = projection_to_cache.as_ref().map(|p| { + let mut rest = projection.clone(); + rest.subtract(p); + rest + }).or_else(|| Some(projection.clone())); + + + let mut filter_readers = vec![]; if let Some(filter) = filter.as_mut() { for predicate in filter.predicates.iter_mut() { if !selects_any(selection.as_ref()) { @@ -690,26 +721,28 @@ impl ParquetRecordBatchReaderBuilder { let array_reader = build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?; - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); + filter_readers.push(array_reader); } } - let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?; - // If selection is empty, truncate if !selects_any(selection.as_ref()) { selection = Some(RowSelection::from(vec![])); } - Ok(ParquetRecordBatchReader::new( + + let filter_reader = build_array_reader(self.fields.as_deref(), predicate_projection.as_ref().unwrap(), &reader)?; + + + selection = apply_range(selection, reader.num_rows(), self.offset, self.limit); + + Ok(ParquetRecordBatchReader::new( batch_size, - array_reader, - apply_range(selection, reader.num_rows(), self.offset, self.limit), + build_array_reader(self.fields.as_deref(), project_exclude_filter.as_ref().unwrap(), &reader)?, + filter_readers, + filter, + projection_to_cache, + selection, )) } } @@ -791,66 +824,143 @@ impl PageIterator for ReaderPageIterator {} pub struct ParquetRecordBatchReader { batch_size: usize, array_reader: Box, + filter_readers: Vec>, + row_filter: Option, schema: SchemaRef, + cached_mask: Option, selection: Option>, } + +/// Take the next selection from the selection queue, and return the selection +/// whose selected row count is to_select or less (if input selection is exhausted). +fn take_next_selection( + selection: &mut VecDeque, + to_select: usize, +) -> Option { + let mut current_selected = 0; + let mut rt = Vec::new(); + while let Some(front) = selection.pop_front() { + if front.skip { + rt.push(front); + continue; + } + + if current_selected + front.row_count <= to_select { + rt.push(front); + current_selected += front.row_count; + } else { + let select = to_select - current_selected; + let remaining = front.row_count - select; + rt.push(RowSelector::select(select)); + selection.push_front(RowSelector::select(remaining)); + + return Some(rt.into()); + } + } + if !rt.is_empty() { + return Some(rt.into()); + } + None +} + + + impl Iterator for ParquetRecordBatchReader { type Item = Result; fn next(&mut self) -> Option { - let mut read_records = 0; - match self.selection.as_mut() { - Some(selection) => { - while read_records < self.batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); - if front.skip { - let skipped = match self.array_reader.skip_records(front.row_count) { - Ok(skipped) => skipped, - Err(e) => return Some(Err(e.into())), + + let mut current_selected = 0; + + let mut current_selections: Vec = vec![]; + while current_selected < self.batch_size { + let selection: &mut VecDeque = match self.selection.as_mut() { + Some(s) => s, + None => { + self.selection = Some( + std::iter::once(RowSelector::select(self.batch_size)) + .collect::>(), + ); + self.selection.as_mut().unwrap() + } + }; + + let Some(mut raw_sel) = take_next_selection(selection, self.batch_size) else { + break; + }; + + let selection: Result = match &mut self.row_filter { + None => Ok(raw_sel), + Some(filter) => { + debug_assert_eq!( + self.filter_readers.len(), + filter.predicates.len(), + "predicate readers and predicates should have the same length" + ); + + let mut final_select = raw_sel.clone(); + for (predicate, reader) in filter + .predicates + .iter_mut() + .zip(self.filter_readers.iter_mut()) + { + let array = read_selection(reader.as_mut(), &raw_sel); + let batch = RecordBatch::from(array.unwrap().as_struct_opt().ok_or_else(|| { + general_err!("Struct array reader should return struct array") + }).unwrap()); + let input_rows = batch.num_rows(); + let predicate_filter = predicate.evaluate(batch).unwrap(); + if predicate_filter.len() != input_rows { + return Some(Err(ArrowError::ParquetError(format!( + "ArrowPredicate predicate returned {} rows, expected {input_rows}", + predicate_filter.len() + )))); + } + let predicate_filter = match predicate_filter.null_count() { + 0 => predicate_filter, + _ => prep_null_mask_filter(&predicate_filter), }; + let raw = RowSelection::from_filters(&[predicate_filter]); + final_select = final_select.and_then(&raw); - if skipped != front.row_count { - return Some(Err(general_err!( - "failed to skip rows, expected {}, got {}", - front.row_count, - skipped - ) - .into())); + if !final_select.selects_any() { + break } - continue; } + Ok(final_select) + } + }; - //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader. - //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669 - if front.row_count == 0 { - continue; - } + current_selected += selection.as_ref().unwrap().row_count(); + current_selections.push(selection.unwrap()); + } - // try to read record - let need_read = self.batch_size - read_records; - let to_read = match front.row_count.checked_sub(need_read) { - Some(remaining) if remaining != 0 => { - // if page row count less than batch_size we must set batch size to page row count. - // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); - need_read - } - _ => front.row_count, + for selection in &mut current_selections { + for selector in selection.iter() { + if selector.skip { + let skipped = match self.array_reader.skip_records(selector.row_count) { + Ok(skipped) => skipped, + Err(e) => return Some(Err(e.into())), }; - match self.array_reader.read_records(to_read) { - Ok(0) => break, - Ok(rec) => read_records += rec, - Err(error) => return Some(Err(error.into())), + + if skipped != selector.row_count { + return Some(Err(general_err!( + "failed to skip rows, expected {}, got {}", + selector.row_count, + skipped + ) + .into())); } + continue; } + + match self.array_reader.read_records(selector.row_count) { + Ok(read) => read, + Err(e) => return Some(Err(e.into())), + }; } - None => { - if let Err(error) = self.array_reader.read_records(self.batch_size) { - return Some(Err(error.into())); - } - } - }; + } match self.array_reader.consume_batch() { Err(error) => Some(Err(error.into())), @@ -863,7 +973,10 @@ impl Iterator for ParquetRecordBatchReader { match struct_array { Err(err) => Some(Err(err)), - Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))), + Ok(e) => { + // println!("e.len() = {}", e.len()); + (e.len() > 0).then(|| Ok(RecordBatch::from(e))) + }, } } } @@ -906,6 +1019,9 @@ impl ParquetRecordBatchReader { Ok(Self { batch_size, array_reader, + filter_readers: vec![], + row_filter: None, + cached_mask: None, schema: Arc::new(Schema::new(levels.fields.clone())), selection: selection.map(|s| s.trim().into()), }) @@ -916,7 +1032,14 @@ impl ParquetRecordBatchReader { /// all rows will be returned pub(crate) fn new( batch_size: usize, + // finial project columns exclude the filter columns array_reader: Box, + // filter columns reader + filter_readers: Vec>, + // row filters + row_filter: Option, + // Cached project mask + cached_mask: Option, selection: Option, ) -> Self { let schema = match array_reader.get_data_type() { @@ -927,12 +1050,14 @@ impl ParquetRecordBatchReader { Self { batch_size, array_reader, + filter_readers, + row_filter, schema: Arc::new(schema), + cached_mask, selection: selection.map(|s| s.trim().into()), } } } - /// Returns `true` if `selection` is `None` or selects some rows pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool { selection.map(|x| x.selects_any()).unwrap_or(true) @@ -973,46 +1098,20 @@ pub(crate) fn apply_range( selection } -/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating -/// which rows to return. -/// -/// `input_selection`: Optional pre-existing selection. If `Some`, then the -/// final [`RowSelection`] will be the conjunction of it and the rows selected -/// by `predicate`. -/// -/// Note: A pre-existing selection may come from evaluating a previous predicate -/// or if the [`ParquetRecordBatchReader`] specified an explicit -/// [`RowSelection`] in addition to one or more predicates. -pub(crate) fn evaluate_predicate( - batch_size: usize, - array_reader: Box, - input_selection: Option, - predicate: &mut dyn ArrowPredicate, -) -> Result { - let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone()); - let mut filters = vec![]; - for maybe_batch in reader { - let maybe_batch = maybe_batch?; - let input_rows = maybe_batch.num_rows(); - let filter = predicate.evaluate(maybe_batch)?; - // Since user supplied predicate, check error here to catch bugs quickly - if filter.len() != input_rows { - return Err(arrow_err!( - "ArrowPredicate predicate returned {} rows, expected {input_rows}", - filter.len() - )); +fn read_selection( + reader: &mut dyn ArrayReader, + selection: &RowSelection, +) -> Result { + for selector in selection.iter() { + if selector.skip { + let skipped = reader.skip_records(selector.row_count)?; + debug_assert_eq!(skipped, selector.row_count, "failed to skip rows"); + } else { + let read_records = reader.read_records(selector.row_count)?; + debug_assert_eq!(read_records, selector.row_count, "failed to read rows"); } - match filter.null_count() { - 0 => filters.push(filter), - _ => filters.push(prep_null_mask_filter(&filter)), - }; } - - let raw = RowSelection::from_filters(&filters); - Ok(match input_selection { - Some(selection) => selection.and_then(&raw), - None => raw, - }) + reader.consume_batch() } #[cfg(test)] diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 45df68821ca8..e2fbfebf8799 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -39,10 +39,7 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; -use crate::arrow::arrow_reader::{ - apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, - ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, -}; +use crate::arrow::arrow_reader::{apply_range, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, RowSelector}; use crate::arrow::ProjectionMask; use crate::bloom_filter::{ @@ -586,27 +583,36 @@ where metadata: self.metadata.as_ref(), }; - if let Some(filter) = self.filter.as_mut() { - for predicate in filter.predicates.iter_mut() { - if !selects_any(selection.as_ref()) { - return Ok((self, None)); - } - let predicate_projection = predicate.projection(); - row_group - .fetch(&mut self.input, predicate_projection, selection.as_ref()) - .await?; + let predicate_projection = self.filter + .as_mut() + .map(|filter| { + filter + .predicates + .iter_mut() + .map(|p| p.projection().clone()) + .reduce(|mut acc, p| { + acc.union(&p); + acc + }) + }) + .flatten(); + + let projection_to_cache = predicate_projection.as_ref().map(|p| { + let mut p = p.clone(); + p.intersect(&projection); + p + }); - let array_reader = - build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?; + let project_exclude_filter = predicate_projection.as_ref().map(|p| { + let mut rest = projection.clone(); + rest.subtract(p); + rest + }).or_else(|| Some(projection.clone())); - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); - } + + if selection.is_none() { + selection = Some(RowSelection::from(vec![RowSelector::select(row_group.row_count)])); } // Compute the number of rows in the selection before applying limit and offset @@ -642,13 +648,44 @@ where *limit -= rows_after; } + let mut filter_readers = vec![]; + + if let Some(filter) = self.filter.as_mut() { + for predicate in filter.predicates.iter_mut() { + if !selects_any(selection.as_ref()) { + return Ok((self, None)); + } + + let predicate_projection = predicate.projection(); + + row_group + .fetch(&mut self.input, predicate_projection, selection.as_ref()) + .await?; + + let array_reader = build_array_reader( + self.fields.as_deref(), + predicate_projection, + &row_group, + )?; + + filter_readers.push(array_reader); + } + } + + // let filter_reader = build_array_reader(self.fields.as_deref(), predicate_projection.as_ref().unwrap(), &row_group)?; + + + // Fetch the data pages for the row group which is the final project excluding the filter row_group - .fetch(&mut self.input, &projection, selection.as_ref()) + .fetch(&mut self.input, &project_exclude_filter.as_ref().unwrap(), selection.as_ref()) .await?; let reader = ParquetRecordBatchReader::new( batch_size, - build_array_reader(self.fields.as_deref(), &projection, &row_group)?, + build_array_reader(self.fields.as_deref(), project_exclude_filter.as_ref().unwrap(), &row_group)?, + filter_readers, + self.filter.take(), + projection_to_cache, selection, ); diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 76f8ef1bf068..cee362a46863 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -407,6 +407,26 @@ impl ProjectionMask { } } } + + /// Subtract two projection masks + /// + /// Example: + /// ```text + /// mask1 = [true, false, true] + /// mask2 = [false, true, true] + /// subtract(mask1, mask2) = [true, false, false] + /// ``` + pub fn subtract(&mut self, other: &Self) { + match (self.mask.as_ref(), other.mask.as_ref()) { + (None, _) => {} + (_, None) => {} + (Some(a), Some(b)) => { + debug_assert_eq!(a.len(), b.len()); + let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && !b).collect(); + self.mask = Some(mask); + } + } + } } /// Lookups up the parquet column by name