From 76ee618cd0ea5fae306aea49c473ca88bfc02388 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 1 Dec 2023 09:11:40 -0500 Subject: [PATCH 01/18] Refactor BloomFilter application into PruningPredicate Rewrite BloomFilterPruningPredicate in terms of BloomFilterPruningPredicate --- .../physical_plan/parquet/row_groups.rs | 241 +++------ .../core/src/physical_optimizer/pruning.rs | 418 ++++++++------- .../physical-expr/src/utils/guarantee.rs | 492 ++++++++++++++++++ .../src/{utils.rs => utils/mod.rs} | 33 +- 4 files changed, 817 insertions(+), 367 deletions(-) create mode 100644 datafusion/physical-expr/src/utils/guarantee.rs rename datafusion/physical-expr/src/{utils.rs => utils/mod.rs} (96%) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 0ab2046097c4..340caaba5c4a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -16,9 +16,9 @@ // under the License. use arrow::{array::ArrayRef, datatypes::Schema}; +use arrow_array::BooleanArray; use arrow_schema::FieldRef; -use datafusion_common::tree_node::{TreeNode, VisitRecursion}; -use datafusion_common::{Column, DataFusionError, Result, ScalarValue}; +use datafusion_common::{Column, ScalarValue}; use parquet::file::metadata::ColumnChunkMetaData; use parquet::schema::types::SchemaDescriptor; use parquet::{ @@ -26,19 +26,13 @@ use parquet::{ bloom_filter::Sbbf, file::metadata::RowGroupMetaData, }; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::collections::{HashMap, HashSet}; use crate::datasource::listing::FileRange; use crate::datasource::physical_plan::parquet::statistics::{ max_statistics, min_statistics, parquet_column, }; -use crate::logical_expr::Operator; -use crate::physical_expr::expressions as phys_expr; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use crate::physical_plan::PhysicalExpr; use super::ParquetFileMetrics; @@ -122,182 +116,118 @@ pub(crate) async fn prune_row_groups_by_bloom_filters< predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) -> Vec { - let bf_predicates = match BloomFilterPruningPredicate::try_new(predicate.orig_expr()) - { - Ok(predicates) => predicates, - Err(_) => { - return row_groups.to_vec(); - } - }; + println!( + "prune_row_groups_by_bloom_filters with pruning predicate: {:#?}", + predicate + ); let mut filtered = Vec::with_capacity(groups.len()); for idx in row_groups { let rg_metadata = &groups[*idx]; // get all columns bloom filter - let mut column_sbbf = - HashMap::with_capacity(bf_predicates.required_columns.len()); - for column_name in bf_predicates.required_columns.iter() { - let column_idx = match rg_metadata + let literal_columns = predicate.literal_columns(); + let mut column_sbbf = HashMap::with_capacity(literal_columns.len()); + + for column_name in literal_columns { + // This is very likely incorrect as it will not work for nested columns + // should use parquet_column instead + let Some((column_idx, _)) = rg_metadata .columns() .iter() .enumerate() - .find(|(_, column)| column.column_path().string().eq(column_name)) - { - Some((column_idx, _)) => column_idx, - None => continue, + .find(|(_, column)| column.column_path().string().eq(&column_name)) + else { + continue; }; + let bf = match builder .get_row_group_column_bloom_filter(*idx, column_idx) .await { - Ok(bf) => match bf { - Some(bf) => bf, - None => { - continue; - } - }, + Ok(Some(bf)) => bf, + Ok(None) => continue, // no bloom filter for this column Err(e) => { - log::error!("Error evaluating row group predicate values when using BloomFilterPruningPredicate {e}"); + log::error!("Ignoring error reading bloom filter: {e}"); metrics.predicate_evaluation_errors.add(1); continue; } }; - column_sbbf.insert(column_name.to_owned(), bf); + column_sbbf.insert(column_name.to_string(), bf); } - if bf_predicates.prune(&column_sbbf) { + + let stats = BloomFilterStatistics { column_sbbf }; + + // Can this group be pruned? + let prune_result = predicate.prune(&stats); + println!("prune result: {:?}", prune_result); + let prune_group = match prune_result { + Ok(values) => !values[0], + Err(e) => { + log::debug!("Error evaluating row group predicate on bloom filter: {e}"); + metrics.predicate_evaluation_errors.add(1); + false + } + }; + + println!("prune group: {}", prune_group); + + if prune_group { metrics.row_groups_pruned.add(1); - continue; + } else { + filtered.push(*idx); } - filtered.push(*idx); } filtered } -struct BloomFilterPruningPredicate { - /// Actual pruning predicate - predicate_expr: Option, - /// The statistics required to evaluate this predicate - required_columns: Vec, +struct BloomFilterStatistics { + column_sbbf: HashMap, } -impl BloomFilterPruningPredicate { - fn try_new(expr: &Arc) -> Result { - let binary_expr = expr.as_any().downcast_ref::(); - match binary_expr { - Some(binary_expr) => { - let columns = Self::get_predicate_columns(expr); - Ok(Self { - predicate_expr: Some(binary_expr.clone()), - required_columns: columns.into_iter().collect(), - }) - } - None => Err(DataFusionError::Execution( - "BloomFilterPruningPredicate only support binary expr".to_string(), - )), - } - } - - fn prune(&self, column_sbbf: &HashMap) -> bool { - Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf) +impl PruningStatistics for BloomFilterStatistics { + fn num_containers(&self) -> usize { + 1 } - /// Return true if the `expr` can be proved not `true` - /// based on the bloom filter. - /// - /// We only checked `BinaryExpr` but it also support `InList`, - /// Because of the `optimizer` will convert `InList` to `BinaryExpr`. - fn prune_expr_with_bloom_filter( - expr: Option<&phys_expr::BinaryExpr>, - column_sbbf: &HashMap, - ) -> bool { - let Some(expr) = expr else { - // unsupported predicate - return false; + /// Use bloom filters to determine if we are sure this column can not contain `value` + fn contains( + &self, + column: &Column, + values: &HashSet, + ) -> Option { + println!("Checking column {} for values {:?}", column.name, values); + let sbbf = self.column_sbbf.get(column.name.as_str())?; + println!(" have sbbf: {:?}", sbbf); + + // if true, means column probably contains value + // if false, means column definitely DOES NOT contain value + let known_not_present = values + .iter() + .map(|value| match value { + ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()), + ScalarValue::Boolean(Some(v)) => sbbf.check(v), + ScalarValue::Float64(Some(v)) => sbbf.check(v), + ScalarValue::Float32(Some(v)) => sbbf.check(v), + ScalarValue::Int64(Some(v)) => sbbf.check(v), + ScalarValue::Int32(Some(v)) => sbbf.check(v), + ScalarValue::Int16(Some(v)) => sbbf.check(v), + ScalarValue::Int8(Some(v)) => sbbf.check(v), + _ => true, + }) + // We know the row group doesn't contain any of the values if the checks are all + // false + .all(|v| !v); + println!("known_not_present result: {}", known_not_present); + + let contains = if known_not_present { + Some(false) + } else { + // The column might contain one of the values + None }; - match expr.op() { - Operator::And | Operator::Or => { - let left = Self::prune_expr_with_bloom_filter( - expr.left().as_any().downcast_ref::(), - column_sbbf, - ); - let right = Self::prune_expr_with_bloom_filter( - expr.right() - .as_any() - .downcast_ref::(), - column_sbbf, - ); - match expr.op() { - Operator::And => left || right, - Operator::Or => left && right, - _ => false, - } - } - Operator::Eq => { - if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) { - if let Some(sbbf) = column_sbbf.get(col.name()) { - match val { - ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()), - ScalarValue::Boolean(Some(v)) => !sbbf.check(&v), - ScalarValue::Float64(Some(v)) => !sbbf.check(&v), - ScalarValue::Float32(Some(v)) => !sbbf.check(&v), - ScalarValue::Int64(Some(v)) => !sbbf.check(&v), - ScalarValue::Int32(Some(v)) => !sbbf.check(&v), - ScalarValue::Int16(Some(v)) => !sbbf.check(&v), - ScalarValue::Int8(Some(v)) => !sbbf.check(&v), - _ => false, - } - } else { - false - } - } else { - false - } - } - _ => false, - } - } - - fn get_predicate_columns(expr: &Arc) -> HashSet { - let mut columns = HashSet::new(); - expr.apply(&mut |expr| { - if let Some(binary_expr) = - expr.as_any().downcast_ref::() - { - if let Some((column, _)) = - Self::check_expr_is_col_equal_const(binary_expr) - { - columns.insert(column.name().to_string()); - } - } - Ok(VisitRecursion::Continue) - }) - // no way to fail as only Ok(VisitRecursion::Continue) is returned - .unwrap(); - - columns - } - - fn check_expr_is_col_equal_const( - exr: &phys_expr::BinaryExpr, - ) -> Option<(phys_expr::Column, ScalarValue)> { - if Operator::Eq.ne(exr.op()) { - return None; - } - let left_any = exr.left().as_any(); - let right_any = exr.right().as_any(); - if let (Some(col), Some(liter)) = ( - left_any.downcast_ref::(), - right_any.downcast_ref::(), - ) { - return Some((col.clone(), liter.value().clone())); - } - if let (Some(liter), Some(col)) = ( - left_any.downcast_ref::(), - right_any.downcast_ref::(), - ) { - return Some((col.clone(), liter.value().clone())); - } - None + let result = Some(BooleanArray::from(vec![contains])); + println!("result: {:?}", result); + result } } @@ -350,6 +280,7 @@ mod tests { use arrow::datatypes::Schema; use arrow::datatypes::{DataType, Field}; use datafusion_common::{config::ConfigOptions, TableReference, ToDFSchema}; + use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{ builder::LogicalTableSource, cast, col, lit, AggregateUDF, Expr, ScalarUDF, TableSource, WindowUDF, diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index de508327fade..7cca1947bb31 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -35,12 +35,13 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; -use datafusion_common::{downcast_value, plan_datafusion_err, ScalarValue}; +use arrow_array::cast::AsArray; use datafusion_common::{ internal_err, plan_err, tree_node::{Transformed, TreeNode}, }; -use datafusion_physical_expr::utils::collect_columns; +use datafusion_common::{plan_datafusion_err, ScalarValue}; +use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; use log::trace; @@ -68,11 +69,15 @@ use log::trace; pub trait PruningStatistics { /// return the minimum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows - fn min_values(&self, column: &Column) -> Option; + fn min_values(&self, _column: &Column) -> Option { + None + } /// return the maximum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows. - fn max_values(&self, column: &Column) -> Option; + fn max_values(&self, _column: &Column) -> Option { + None + } /// return the number of containers (e.g. row groups) being /// pruned with these statistics @@ -82,7 +87,32 @@ pub trait PruningStatistics { /// `Option`. /// /// Note: the returned array must contain `num_containers()` rows. - fn null_counts(&self, column: &Column) -> Option; + fn null_counts(&self, _column: &Column) -> Option { + None + } + + /// Returns an array where each element represents if the value of the + /// column CERTAINLY DOES NOT contain any of the specified `values`. + /// + /// This can be used to prune containers based on structures such as Bloom + /// Filters which can test set membership quickly. + /// + /// The returned array has one row for each container, with the following: + /// * `true` if the value of column CERTAINLY IS one of `values` + /// * `false` if the value of column CERTAINLY IS NOT one of `values` + /// * `null` if the value of column may or may not be in values + /// + /// If these statistics can not determine column membership for any + /// container, return `None` (the default). + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn contains( + &self, + _column: &Column, + _values: &HashSet, + ) -> Option { + None + } } /// Evaluates filter expressions on statistics, rather than the actual data. If @@ -129,10 +159,12 @@ pub struct PruningPredicate { schema: SchemaRef, /// Actual pruning predicate (rewritten in terms of column min/max statistics) predicate_expr: Arc, - /// The statistics required to evaluate this predicate - required_columns: RequiredStatColumns, + /// The statistics required to evaluate `predicate_expr` + required_columns: RequiredColumns, /// Original physical predicate from which this predicate expr is derived (required for serialization) orig_expr: Arc, + /// Any col = literal expressions + literal_guarantees: Vec, } impl PruningPredicate { @@ -157,14 +189,18 @@ impl PruningPredicate { /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))` pub fn try_new(expr: Arc, schema: SchemaRef) -> Result { // build predicate expression once - let mut required_columns = RequiredStatColumns::new(); + let mut required_columns = RequiredColumns::new(); let predicate_expr = build_predicate_expression(&expr, schema.as_ref(), &mut required_columns); + + let literal_guarantees = LiteralGuarantee::analyze(&expr); + Ok(Self { schema, predicate_expr, required_columns, orig_expr: expr, + literal_guarantees, }) } @@ -183,40 +219,36 @@ impl PruningPredicate { /// /// [`ExprSimplifier`]: crate::optimizer::simplify_expressions::ExprSimplifier pub fn prune(&self, statistics: &S) -> Result> { + let mut builder = BoolVecBuilder::new(statistics.num_containers()); + + // First, check any expr_op_literals + for literal_guarantee in &self.literal_guarantees { + let LiteralGuarantee { + column, + guarantee, + literals, + } = literal_guarantee; + // Can the statistics tell us anything about this column? + if let Some(results) = statistics.contains(column, literals) { + match guarantee { + Guarantee::In => builder.append_array(&results), + Guarantee::NotIn => { + let results = arrow::compute::not(&results)?; + builder.append_array(&results) + } + } + } + } + // build a RecordBatch that contains the min/max values in the - // appropriate statistics columns + // appropriate statistics columns for the min/max predicate let statistics_batch = build_statistics_record_batch(statistics, &self.required_columns)?; - // Evaluate the pruning predicate on that record batch. - // - // Use true when the result of evaluating a predicate - // expression on a row group is null (aka `None`). Null can - // arise when the statistics are unknown or some calculation - // in the predicate means we don't know for sure if the row - // group can be filtered out or not. To maintain correctness - // the row group must be kept and thus `true` is returned. - match self.predicate_expr.evaluate(&statistics_batch)? { - ColumnarValue::Array(array) => { - let predicate_array = downcast_value!(array, BooleanArray); + // Evaluate the pruning predicate on that record batch and append any results to the builder + builder.append_value(self.predicate_expr.evaluate(&statistics_batch)?); - Ok(predicate_array - .into_iter() - .map(|x| x.unwrap_or(true)) // None -> true per comments above - .collect::>()) - } - // result was a column - ColumnarValue::Scalar(ScalarValue::Boolean(v)) => { - let v = v.unwrap_or(true); // None -> true per comments above - Ok(vec![v; statistics.num_containers()]) - } - other => { - internal_err!( - "Unexpected result of pruning predicate evaluation. Expected Boolean array \ - or scalar but got {other:?}" - ) - } - } + Ok(builder.build()) } /// Return a reference to the input schema @@ -239,9 +271,79 @@ impl PruningPredicate { is_always_true(&self.predicate_expr) } - pub(crate) fn required_columns(&self) -> &RequiredStatColumns { + pub(crate) fn required_columns(&self) -> &RequiredColumns { &self.required_columns } + + /// returns the names of the columns that are known to be a constant (and + /// that may be used as part of a Contains query + pub fn literal_columns(&self) -> Vec { + let mut seen = HashSet::new(); + self.literal_guarantees + .iter() + .map(|e| &e.column.name) + // avoid duplicates + .filter(|name| seen.insert(*name)) + .map(|s| s.to_string()) + .collect() + } +} + +/// Builds a Vec that is true if container CERTAINLY DOES NOT pass the +/// predicate, and false if it MAY pass the predicate +/// +/// Use true when the result of evaluating a predicate +/// expression on a row group is null (aka `None`). Null can +/// arise when the statistics are unknown or some calculation +/// in the predicate means we don't know for sure if the row +/// group can be filtered out or not. To maintain correctness +/// the row group must be kept and thus `true` is returned. +#[derive(Debug)] +struct BoolVecBuilder { + // true if the container may pass the predicate, false if we know for sure + // it did not pass the predicate + inner: Vec, +} + +impl BoolVecBuilder { + fn new(num_containers: usize) -> Self { + Self { + inner: vec![true; num_containers], + } + } + + /// Combines the results in an array to the currently in progress array + fn append_array(&mut self, array: &BooleanArray) { + assert_eq!(array.len(), self.inner.len()); + // set any locations to false if we know for sure they did not pass the predicate + for (cur, new) in self.inner.iter_mut().zip(array.iter()) { + if let Some(false) = new { + *cur = false; + } + } + } + + /// Combines the results in the [`ColumnarValue`] to the currently in progress array + fn append_value(&mut self, value: ColumnarValue) { + match value { + ColumnarValue::Array(array) => { + self.append_array(array.as_boolean()); + } + ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => { + // False means all containers can not pass the predicate + self.inner = vec![false; self.inner.len()]; + } + _ => { + // Null or true means we don't know if the container can pass the predicate + // so we must keep it + } + } + } + + /// Convert this builder into a Vec of bools + fn build(self) -> Vec { + self.inner + } } fn is_always_true(expr: &Arc) -> bool { @@ -257,21 +359,21 @@ fn is_always_true(expr: &Arc) -> bool { /// Handles creating references to the min/max statistics /// for columns as well as recording which statistics are needed #[derive(Debug, Default, Clone)] -pub(crate) struct RequiredStatColumns { +pub(crate) struct RequiredColumns { /// The statistics required to evaluate this predicate: /// * The unqualified column in the input schema /// * Statistics type (e.g. Min or Max or Null_Count) /// * The field the statistics value should be placed in for - /// pruning predicate evaluation + /// pruning predicate evaluation (e.g. `min_value` or `max_value`) columns: Vec<(phys_expr::Column, StatisticsType, Field)>, } -impl RequiredStatColumns { +impl RequiredColumns { fn new() -> Self { Self::default() } - /// Returns number of unique columns. + /// Returns number of unique columns pub(crate) fn n_columns(&self) -> usize { self.iter() .map(|(c, _s, _f)| c) @@ -325,11 +427,10 @@ impl RequiredStatColumns { // only add statistics column if not previously added if need_to_insert { - let stat_field = Field::new( - stat_column.name(), - field.data_type().clone(), - field.is_nullable(), - ); + // may be null if statistics are not present + let nullable = true; + let stat_field = + Field::new(stat_column.name(), field.data_type().clone(), nullable); self.columns.push((column.clone(), stat_type, stat_field)); } rewrite_column_expr(column_expr.clone(), column, &stat_column) @@ -372,7 +473,7 @@ impl RequiredStatColumns { } } -impl From> for RequiredStatColumns { +impl From> for RequiredColumns { fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self { Self { columns } } @@ -405,7 +506,7 @@ impl From> for RequiredStatColum /// ``` fn build_statistics_record_batch( statistics: &S, - required_columns: &RequiredStatColumns, + required_columns: &RequiredColumns, ) -> Result { let mut fields = Vec::::new(); let mut arrays = Vec::::new(); @@ -461,7 +562,7 @@ struct PruningExpressionBuilder<'a> { op: Operator, scalar_expr: Arc, field: &'a Field, - required_columns: &'a mut RequiredStatColumns, + required_columns: &'a mut RequiredColumns, } impl<'a> PruningExpressionBuilder<'a> { @@ -470,7 +571,7 @@ impl<'a> PruningExpressionBuilder<'a> { right: &'a Arc, op: Operator, schema: &'a Schema, - required_columns: &'a mut RequiredStatColumns, + required_columns: &'a mut RequiredColumns, ) -> Result { // find column name; input could be a more complicated expression let left_columns = collect_columns(left); @@ -685,7 +786,7 @@ fn reverse_operator(op: Operator) -> Result { fn build_single_column_expr( column: &phys_expr::Column, schema: &Schema, - required_columns: &mut RequiredStatColumns, + required_columns: &mut RequiredColumns, is_not: bool, // if true, treat as !col ) -> Option> { let field = schema.field_with_name(column.name()).ok()?; @@ -726,7 +827,7 @@ fn build_single_column_expr( fn build_is_null_column_expr( expr: &Arc, schema: &Schema, - required_columns: &mut RequiredStatColumns, + required_columns: &mut RequiredColumns, ) -> Option> { if let Some(col) = expr.as_any().downcast_ref::() { let field = schema.field_with_name(col.name()).ok()?; @@ -756,7 +857,7 @@ fn build_is_null_column_expr( fn build_predicate_expression( expr: &Arc, schema: &Schema, - required_columns: &mut RequiredStatColumns, + required_columns: &mut RequiredColumns, ) -> Arc { // Returned for unsupported expressions. Such expressions are // converted to TRUE. @@ -1184,7 +1285,7 @@ mod tests { #[test] fn test_build_statistics_record_batch() { // Request a record batch with of s1_min, s2_max, s3_max, s3_min - let required_columns = RequiredStatColumns::from(vec![ + let required_columns = RequiredColumns::from(vec![ // min of original column s1, named s1_min ( phys_expr::Column::new("s1", 1), @@ -1256,7 +1357,7 @@ mod tests { // which is what Parquet does // Request a record batch with of s1_min as a timestamp - let required_columns = RequiredStatColumns::from(vec![( + let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("s3", 3), StatisticsType::Min, Field::new( @@ -1288,7 +1389,7 @@ mod tests { #[test] fn test_build_statistics_no_required_stats() { - let required_columns = RequiredStatColumns::new(); + let required_columns = RequiredColumns::new(); let statistics = OneContainerStats { min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))), @@ -1306,7 +1407,7 @@ mod tests { // Test requesting a Utf8 column when the stats return some other type // Request a record batch with of s1_min as a timestamp - let required_columns = RequiredStatColumns::from(vec![( + let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("s3", 3), StatisticsType::Min, Field::new("s1_min", DataType::Utf8, true), @@ -1335,7 +1436,7 @@ mod tests { #[test] fn test_build_statistics_inconsistent_length() { // return an inconsistent length to the actual statistics arrays - let required_columns = RequiredStatColumns::from(vec![( + let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("s1", 3), StatisticsType::Min, Field::new("s1_min", DataType::Int64, true), @@ -1366,20 +1467,14 @@ mod tests { // test column on the left let expr = col("c1").eq(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).eq(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1392,20 +1487,14 @@ mod tests { // test column on the left let expr = col("c1").not_eq(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).not_eq(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1418,20 +1507,14 @@ mod tests { // test column on the left let expr = col("c1").gt(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).lt(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1444,19 +1527,13 @@ mod tests { // test column on the left let expr = col("c1").gt_eq(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).lt_eq(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1469,20 +1546,14 @@ mod tests { // test column on the left let expr = col("c1").lt(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).gt(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1495,19 +1566,13 @@ mod tests { // test column on the left let expr = col("c1").lt_eq(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).gt_eq(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1523,11 +1588,8 @@ mod tests { // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3"))); let expected_expr = "c1_min@0 < 1"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1542,11 +1604,8 @@ mod tests { // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 = 0 expression let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0))); let expected_expr = "true"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1558,11 +1617,8 @@ mod tests { let expected_expr = "true"; let expr = col("c1").not(); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1574,11 +1630,8 @@ mod tests { let expected_expr = "NOT c1_min@0 AND c1_max@1"; let expr = col("c1").not(); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1590,11 +1643,8 @@ mod tests { let expected_expr = "c1_min@0 OR c1_max@1"; let expr = col("c1"); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1608,11 +1658,8 @@ mod tests { // DF doesn't support arithmetic on boolean columns so // this predicate will error when evaluated let expr = col("c1").lt(lit(true)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1624,7 +1671,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Int32, false), ]); - let mut required_columns = RequiredStatColumns::new(); + let mut required_columns = RequiredColumns::new(); // c1 < 1 and (c2 = 2 or c2 = 3) let expr = col("c1") .lt(lit(1)) @@ -1640,7 +1687,7 @@ mod tests { ( phys_expr::Column::new("c1", 0), StatisticsType::Min, - c1_min_field + c1_min_field.with_nullable(true) // could be nullable if stats are not present ) ); // c2 = 2 should add c2_min and c2_max @@ -1650,7 +1697,7 @@ mod tests { ( phys_expr::Column::new("c2", 1), StatisticsType::Min, - c2_min_field + c2_min_field.with_nullable(true) // could be nullable if stats are not present ) ); let c2_max_field = Field::new("c2_max", DataType::Int32, false); @@ -1659,7 +1706,7 @@ mod tests { ( phys_expr::Column::new("c2", 1), StatisticsType::Max, - c2_max_field + c2_max_field.with_nullable(true) // could be nullable if stats are not present ) ); // c2 = 3 shouldn't add any new statistics fields @@ -1681,11 +1728,8 @@ mod tests { false, )); let expected_expr = "c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_min@0 <= 3 AND 3 <= c1_max@1"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1700,11 +1744,8 @@ mod tests { // test c1 in() let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false)); let expected_expr = "true"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1725,11 +1766,8 @@ mod tests { let expected_expr = "(c1_min@0 != 1 OR 1 != c1_max@1) \ AND (c1_min@0 != 2 OR 2 != c1_max@1) \ AND (c1_min@0 != 3 OR 3 != c1_max@1)"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1743,20 +1781,14 @@ mod tests { // test column on the left let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1)))); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); let expected_expr = "TRY_CAST(c1_max@0 AS Int64) > 1"; @@ -1764,21 +1796,15 @@ mod tests { // test column on the left let expr = try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1)))); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1798,11 +1824,8 @@ mod tests { false, )); let expected_expr = "CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); let expr = Expr::InList(InList::new( @@ -1818,11 +1841,8 @@ mod tests { "(CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) \ AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) \ AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -2468,7 +2488,7 @@ mod tests { fn test_build_predicate_expression( expr: &Expr, schema: &Schema, - required_columns: &mut RequiredStatColumns, + required_columns: &mut RequiredColumns, ) -> Arc { let expr = logical2physical(expr, schema); build_predicate_expression(&expr, schema, required_columns) diff --git a/datafusion/physical-expr/src/utils/guarantee.rs b/datafusion/physical-expr/src/utils/guarantee.rs new file mode 100644 index 000000000000..f7b371b6a5f4 --- /dev/null +++ b/datafusion/physical-expr/src/utils/guarantee.rs @@ -0,0 +1,492 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`LiteralGuarantee`] to analyze predicates and determine if a column is a +//constant. + +use crate::utils::split_disjunction; +use crate::{split_conjunction, PhysicalExpr}; +use datafusion_common::{Column, ScalarValue}; +use datafusion_expr::Operator; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +/// Represents a predicate where it is known that a column is either: +/// +/// 1. One of particular set of values. For example, `(a = 1)`, `(a = 1 OR a = +/// 2) or `a IN (1, 2, 3)` +/// +/// 2. Not one of a particular set of values. For example, `(a != 1)`, `(a != 1 +/// AND a != 2)` or `a NOT IN (1, 2, 3)` +#[derive(Debug, Clone, PartialEq)] +pub struct LiteralGuarantee { + pub column: Column, + pub guarantee: Guarantee, + pub literals: HashSet, +} + +/// What can be guaranteed about the values? +#[derive(Debug, Clone, PartialEq)] +pub enum Guarantee { + /// `column` is one of a set of constant values + In, + /// `column` is NOT one of a set of constant values + NotIn, +} + +impl LiteralGuarantee { + /// Create a new instance of the guarantee if the provided operator is supported + fn try_new<'a>( + column_name: impl Into, + op: &Operator, + literals: impl IntoIterator, + ) -> Option { + let guarantee = match op { + Operator::Eq => Guarantee::In, + Operator::NotEq => Guarantee::NotIn, + _ => return None, + }; + + let literals: HashSet<_> = literals.into_iter().cloned().collect(); + + Some(Self { + column: Column::from_name(column_name), + guarantee, + literals, + }) + } + + /// return a list of `LiteralGuarantees` that can be deduced for this + /// expression. + /// + /// `expr` should be a boolean expression, for example a filter expression + /// + /// Notes: this API assumes the expression has already been simplified and + /// returns duplicate guarantees for expressions like `a = 1 AND a = 1`. + pub fn analyze(expr: &Arc) -> Vec { + split_conjunction(expr) + .into_iter() + .fold(GuaranteeBuilder::new(), |builder, expr| { + if let Some(cel) = ColOpLit::try_new(expr) { + return builder.aggregate_conjunct(cel); + } else { + // look for pattern like + // (col literal) OR (col literal) ... + let disjunctions = split_disjunction(expr); + + let terms = disjunctions + .iter() + .filter_map(|expr| ColOpLit::try_new(expr)) + .collect::>(); + + if terms.is_empty() { + return builder; + } + + if terms.len() != disjunctions.len() { + // not all terms are of the form (col literal) + return builder; + } + + // if all terms are 'col literal' then we can say something about the column + let first_term = &terms[0]; + if terms.iter().all(|term| { + term.col.name() == first_term.col.name() + && term.op == first_term.op + }) { + builder.aggregate_multi_conjunct( + first_term.col, + first_term.op, + terms.iter().map(|term| term.lit.value()), + ) + } else { + // ignore it + builder + } + } + }) + .build() + } +} + +/// Combines conjuncts together into guarantees, preserving insert order +struct GuaranteeBuilder<'a> { + /// List of guarantees that have been created so far + /// if we have determined a subsequent conjunct invalidates a guarantee + /// e.g. `a = foo AND a = bar` then the relevant guarantee will be None + guarantees: Vec>, + + // Key is the column name, type and value is the index into `guarantees` + map: HashMap<(&'a crate::expressions::Column, &'a Operator), usize>, +} + +impl<'a> GuaranteeBuilder<'a> { + fn new() -> Self { + Self { + guarantees: vec![], + map: HashMap::new(), + } + } + + /// Aggregate a new single guarantee to this builder combining with existing guarantees + /// if possible + fn aggregate_conjunct(self, col_op_lit: ColOpLit<'a>) -> Self { + self.aggregate_multi_conjunct( + col_op_lit.col, + col_op_lit.op, + [col_op_lit.lit.value()], + ) + } + + /// Aggreates a new single new guarantee with multiple literals `a IN (1,2,3)` or `a NOT IN (1,2,3)`. So the new values are combined with OR + fn aggregate_multi_conjunct( + mut self, + col: &'a crate::expressions::Column, + op: &'a Operator, + new_values: impl IntoIterator, + ) -> Self { + let key = (col, op); + if let Some(index) = self.map.get(&key) { + // already have a guarantee for this column + let entry = &mut self.guarantees[*index]; + + let Some(existing) = entry else { + // guarantee has been previously invalidated, nothing to do + return self; + }; + + // can only combine conjuncts if we have `a != foo AND a != bar`. + // `a = foo AND a = bar` is not correct. Also, can't extend with more than one value. + match existing.guarantee { + Guarantee::NotIn => { + // can extend if only single literal, otherwise invalidate + let new_values: HashSet<_> = new_values.into_iter().collect(); + if new_values.len() == 1 { + existing.literals.extend(new_values.into_iter().cloned()) + } else { + // this is like (a != foo AND (a != bar OR a != baz)). + // We can't combine the (a!=bar OR a!=baz) part, but it + // also doesn't invalidate a != foo guarantee. + } + } + Guarantee::In => { + // for an IN guarantee, it is ok if the value is the same + // e.g. `a = foo AND a = foo` but not if the value is different + // e.g. `a = foo AND a = bar` + if new_values + .into_iter() + .all(|new_value| existing.literals.contains(new_value)) + { + // all values are already in the set + } else { + // at least one was not, so invalidate the guarantee + *entry = None; + } + } + } + } else { + // This is a new guarantee + let new_values: HashSet<_> = new_values.into_iter().collect(); + + // new_values are combined with OR, so we can only create a + // multi-column guarantee for `=` (or a single value). + // (e.g. ignore `a != foo OR a != bar`) + if op == &Operator::Eq || new_values.len() == 1 { + if let Some(guarantee) = + LiteralGuarantee::try_new(col.name(), op, new_values) + { + // add it to the list of guarantees + self.guarantees.push(Some(guarantee)); + self.map.insert(key, self.guarantees.len() - 1); + } + } + } + + self + } + + /// Return all guarantees that have been created so far + fn build(self) -> Vec { + // filter out any guarantees that have been invalidated + self.guarantees.into_iter().flatten().collect() + } +} + +/// Represents a single `col literal` expression +struct ColOpLit<'a> { + col: &'a crate::expressions::Column, + op: &'a Operator, + lit: &'a crate::expressions::Literal, +} + +impl<'a> ColOpLit<'a> { + /// Returns Some(ColEqLit) if the expression is either: + /// 1. `col literal` + /// 2. `literal col` + /// + /// Returns None otherwise + fn try_new(expr: &'a Arc) -> Option { + let binary_expr = expr + .as_any() + .downcast_ref::()?; + + let (left, op, right) = ( + binary_expr.left().as_any(), + binary_expr.op(), + binary_expr.right().as_any(), + ); + + if let (Some(col), Some(lit)) = ( + left.downcast_ref::(), + right.downcast_ref::(), + ) { + Some(Self { col, op, lit }) + } + // literal col + else if let (Some(lit), Some(col)) = ( + left.downcast_ref::(), + right.downcast_ref::(), + ) { + Some(Self { col, op, lit }) + } else { + None + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::create_physical_expr; + use crate::execution_props::ExecutionProps; + use arrow_schema::{DataType, Field, Schema, SchemaRef}; + use datafusion_common::ToDFSchema; + use datafusion_expr::expr_fn::*; + use datafusion_expr::{lit, Expr}; + use std::sync::OnceLock; + + #[test] + fn test_literal() { + // a single literal offers no guarantee + test_analyze(lit(true), vec![]) + } + + #[test] + fn test_single() { + // a = "foo" + test_analyze(col("a").eq(lit("foo")), vec![in_guarantee("a", ["foo"])]); + // "foo" = a + test_analyze(lit("foo").eq(col("a")), vec![in_guarantee("a", ["foo"])]); + // a != "foo" + test_analyze( + col("a").not_eq(lit("foo")), + vec![not_in_guarantee("a", ["foo"])], + ); + // a != "foo" + test_analyze( + lit("foo").not_eq(col("a")), + vec![not_in_guarantee("a", ["foo"])], + ); + } + + #[test] + fn test_conjunction() { + // a = "foo" AND b = 1 + test_analyze( + col("a").eq(lit("foo")).and(col("b").eq(lit(1))), + vec![ + // should find both column guarantees + in_guarantee("a", ["foo"]), + in_guarantee("b", [1]), + ], + ); + // a != "foo" AND b != 1 + test_analyze( + col("a").not_eq(lit("foo")).and(col("b").not_eq(lit(1))), + // should find both column guarantees + vec![not_in_guarantee("a", ["foo"]), not_in_guarantee("b", [1])], + ); + // a = "foo" AND a = "bar" + test_analyze( + col("a").eq(lit("foo")).and(col("a").eq(lit("bar"))), + // this predicate is impossible ( can't be both foo and bar), + vec![], + ); + // a = "foo" AND b != "bar" + test_analyze( + col("a").eq(lit("foo")).and(col("a").not_eq(lit("bar"))), + vec![in_guarantee("a", ["foo"]), not_in_guarantee("a", ["bar"])], + ); + // a != "foo" AND a != "bar" + test_analyze( + col("a").not_eq(lit("foo")).and(col("a").not_eq(lit("bar"))), + // know it isn't "foo" or "bar" + vec![not_in_guarantee("a", ["foo", "bar"])], + ); + // a != "foo" AND a != "bar" and a != "baz" + test_analyze( + col("a") + .not_eq(lit("foo")) + .and(col("a").not_eq(lit("bar"))) + .and(col("a").not_eq(lit("baz"))), + // know it isn't "foo" or "bar" or "baz" + vec![not_in_guarantee("a", ["foo", "bar", "baz"])], + ); + // a = "foo" AND a = "foo" + let expr = col("a").eq(lit("foo")); + test_analyze(expr.clone().and(expr), vec![in_guarantee("a", ["foo"])]); + // b > 5 AND b = 10 (should get an b = 10 guarantee) + test_analyze( + col("b").gt(lit(5)).and(col("b").eq(lit(10))), + vec![in_guarantee("b", [10])], + ); + + // a != "foo" and (a != "bar" OR a != "baz") + test_analyze( + col("a") + .not_eq(lit("foo")) + .and(col("a").not_eq(lit("bar")).or(col("a").not_eq(lit("baz")))), + // a is not foo (we can't represent other knowledge about a) + vec![not_in_guarantee("a", ["foo"])], + ); + } + + #[test] + fn test_disjunction() { + // a = "foo" OR b = 1 + test_analyze( + col("a").eq(lit("foo")).or(col("b").eq(lit(1))), + // no can't have a single column guarantee (if a = "foo" then b != 1) etc + vec![], + ); + // a != "foo" OR b != 1 + test_analyze( + col("a").not_eq(lit("foo")).or(col("b").not_eq(lit(1))), + // No single column guarantee + vec![], + ); + // a = "foo" OR a = "bar" + test_analyze( + col("a").eq(lit("foo")).or(col("a").eq(lit("bar"))), + vec![in_guarantee("a", ["foo", "bar"])], + ); + // a = "foo" OR a = "foo" + test_analyze( + col("a").eq(lit("foo")).or(col("a").eq(lit("foo"))), + vec![in_guarantee("a", ["foo"])], + ); + // a != "foo" OR a != "bar" + test_analyze( + col("a").not_eq(lit("foo")).or(col("a").not_eq(lit("bar"))), + // can't represent knowledge about a in this case + vec![], + ); + // a = "foo" OR a = "bar" OR a = "baz" + test_analyze( + col("a") + .eq(lit("foo")) + .or(col("a").eq(lit("bar"))) + .or(col("a").eq(lit("baz"))), + vec![in_guarantee("a", ["foo", "bar", "baz"])], + ); + // (a = "foo" OR a = "bar") AND (a = "baz)" + test_analyze( + (col("a").eq(lit("foo")).or(col("a").eq(lit("bar")))) + .and(col("a").eq(lit("baz"))), + // this could potentially be represented as 2 constraints with a more + // sophisticated analysis + vec![], + ); + // (a = "foo" OR a = "bar") AND (b = 1) + test_analyze( + (col("a").eq(lit("foo")).or(col("a").eq(lit("bar")))) + .and(col("b").eq(lit(1))), + vec![in_guarantee("a", ["foo", "bar"]), in_guarantee("b", [1])], + ); + // (a = "foo" OR a = "bar") OR (b = 1) + test_analyze( + col("a") + .eq(lit("foo")) + .or(col("a").eq(lit("bar"))) + .or(col("b").eq(lit(1))), + // can't represent knowledge about a or b in this case + vec![], + ); + } + + // TODO file ticket to add tests for : + // a IN (...) + // b NOT IN (...) + + /// Tests that analyzing expr results in the expected guarantees + fn test_analyze(expr: Expr, expected: Vec) { + println!("Begin analyze of {expr}"); + let schema = schema(); + let physical_expr = logical2physical(&expr, &schema); + + let actual = LiteralGuarantee::analyze(&physical_expr); + assert_eq!( + expected, actual, + "expr: {expr}\ + \n\nexpected: {expected:#?}\ + \n\nactual: {actual:#?}\ + \n\nexpr: {expr:#?}\ + \n\nphysical_expr: {physical_expr:#?}" + ); + } + + /// Guarantee that column is a specified value + fn in_guarantee<'a, I, S>(column: &str, literals: I) -> LiteralGuarantee + where + I: IntoIterator, + S: Into + 'a, + { + let literals: Vec<_> = literals.into_iter().map(|s| s.into()).collect(); + LiteralGuarantee::try_new(column, &Operator::Eq, literals.iter()).unwrap() + } + + /// Guarantee that column is NOT a specified value + fn not_in_guarantee<'a, I, S>(column: &str, literals: I) -> LiteralGuarantee + where + I: IntoIterator, + S: Into + 'a, + { + let literals: Vec<_> = literals.into_iter().map(|s| s.into()).collect(); + LiteralGuarantee::try_new(column, &Operator::NotEq, literals.iter()).unwrap() + } + + /// Convert a logical expression to a physical expression (without any simplification, etc) + fn logical2physical(expr: &Expr, schema: &Schema) -> Arc { + let df_schema = schema.clone().to_dfschema().unwrap(); + let execution_props = ExecutionProps::new(); + create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() + } + + // Schema for testing + fn schema() -> SchemaRef { + SCHEMA + .get_or_init(|| { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + ])) + }) + .clone() + } + + static SCHEMA: OnceLock = OnceLock::new(); +} diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils/mod.rs similarity index 96% rename from datafusion/physical-expr/src/utils.rs rename to datafusion/physical-expr/src/utils/mod.rs index 71a7ff5fb778..87ef36558b96 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +mod guarantee; +pub use guarantee::{Guarantee, LiteralGuarantee}; + use std::borrow::Borrow; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -41,25 +44,29 @@ use petgraph::stable_graph::StableGraph; pub fn split_conjunction( predicate: &Arc, ) -> Vec<&Arc> { - split_conjunction_impl(predicate, vec![]) + split_impl(Operator::And, predicate, vec![]) } -fn split_conjunction_impl<'a>( +/// Assume the predicate is in the form of DNF, split the predicate to a Vec of PhysicalExprs. +/// +/// For example, split "a1 = a2 OR b1 <= b2 OR c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"] +pub fn split_disjunction( + predicate: &Arc, +) -> Vec<&Arc> { + split_impl(Operator::Or, predicate, vec![]) +} + +fn split_impl<'a>( + operator: Operator, predicate: &'a Arc, mut exprs: Vec<&'a Arc>, ) -> Vec<&'a Arc> { match predicate.as_any().downcast_ref::() { - Some(binary) => match binary.op() { - Operator::And => { - let exprs = split_conjunction_impl(binary.left(), exprs); - split_conjunction_impl(binary.right(), exprs) - } - _ => { - exprs.push(predicate); - exprs - } - }, - None => { + Some(binary) if binary.op() == &operator => { + let exprs = split_impl(operator, binary.left(), exprs); + split_impl(operator, binary.right(), exprs) + } + Some(_) | None => { exprs.push(predicate); exprs } From 4917cb7a057fdfdb646c9f75786bf41941adb5a3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 4 Dec 2023 16:34:04 -0500 Subject: [PATCH 02/18] cleanup println --- .../datasource/physical_plan/parquet/row_groups.rs | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 340caaba5c4a..3251116db554 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -116,10 +116,6 @@ pub(crate) async fn prune_row_groups_by_bloom_filters< predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) -> Vec { - println!( - "prune_row_groups_by_bloom_filters with pruning predicate: {:#?}", - predicate - ); let mut filtered = Vec::with_capacity(groups.len()); for idx in row_groups { let rg_metadata = &groups[*idx]; @@ -158,7 +154,6 @@ pub(crate) async fn prune_row_groups_by_bloom_filters< // Can this group be pruned? let prune_result = predicate.prune(&stats); - println!("prune result: {:?}", prune_result); let prune_group = match prune_result { Ok(values) => !values[0], Err(e) => { @@ -168,8 +163,6 @@ pub(crate) async fn prune_row_groups_by_bloom_filters< } }; - println!("prune group: {}", prune_group); - if prune_group { metrics.row_groups_pruned.add(1); } else { @@ -194,9 +187,7 @@ impl PruningStatistics for BloomFilterStatistics { column: &Column, values: &HashSet, ) -> Option { - println!("Checking column {} for values {:?}", column.name, values); let sbbf = self.column_sbbf.get(column.name.as_str())?; - println!(" have sbbf: {:?}", sbbf); // if true, means column probably contains value // if false, means column definitely DOES NOT contain value @@ -216,7 +207,6 @@ impl PruningStatistics for BloomFilterStatistics { // We know the row group doesn't contain any of the values if the checks are all // false .all(|v| !v); - println!("known_not_present result: {}", known_not_present); let contains = if known_not_present { Some(false) @@ -225,9 +215,7 @@ impl PruningStatistics for BloomFilterStatistics { None }; - let result = Some(BooleanArray::from(vec![contains])); - println!("result: {:?}", result); - result + Some(BooleanArray::from(vec![contains])) } } From aee324dafd37f9d3c58076b03d462dcf1753213c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 06:27:42 -0500 Subject: [PATCH 03/18] Fix column lookup --- .../physical_plan/parquet/row_groups.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 3251116db554..198319aa45c8 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -118,20 +118,16 @@ pub(crate) async fn prune_row_groups_by_bloom_filters< ) -> Vec { let mut filtered = Vec::with_capacity(groups.len()); for idx in row_groups { - let rg_metadata = &groups[*idx]; // get all columns bloom filter let literal_columns = predicate.literal_columns(); let mut column_sbbf = HashMap::with_capacity(literal_columns.len()); for column_name in literal_columns { - // This is very likely incorrect as it will not work for nested columns - // should use parquet_column instead - let Some((column_idx, _)) = rg_metadata - .columns() - .iter() - .enumerate() - .find(|(_, column)| column.column_path().string().eq(&column_name)) - else { + let Some((column_idx, _field)) = parquet_column( + builder.parquet_schema(), + predicate.schema(), + &column_name, + ) else { continue; }; @@ -142,7 +138,7 @@ pub(crate) async fn prune_row_groups_by_bloom_filters< Ok(Some(bf)) => bf, Ok(None) => continue, // no bloom filter for this column Err(e) => { - log::error!("Ignoring error reading bloom filter: {e}"); + log::debug!("Ignoring error reading bloom filter: {e}"); metrics.predicate_evaluation_errors.add(1); continue; } From 2669f92eb3f04295e52297acb72489b7b600ed27 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 06:28:50 -0500 Subject: [PATCH 04/18] comment --- .../core/src/datasource/physical_plan/parquet/row_groups.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 198319aa45c8..467602e0bd19 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -169,6 +169,7 @@ pub(crate) async fn prune_row_groups_by_bloom_filters< } struct BloomFilterStatistics { + /// Maps column name to the parquet bloom filter column_sbbf: HashMap, } @@ -177,7 +178,8 @@ impl PruningStatistics for BloomFilterStatistics { 1 } - /// Use bloom filters to determine if we are sure this column can not contain `value` + /// Use bloom filters to determine if we are sure this column can not + /// possibly contain `values` fn contains( &self, column: &Column, From 5a4afb0020f9850ad4e0984662f710f8a1ddf603 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 06:35:54 -0500 Subject: [PATCH 05/18] Add another row group test --- .../physical_plan/parquet/row_groups.rs | 55 +++++++++++++++++-- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 467602e0bd19..19803faea898 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -911,6 +911,26 @@ mod tests { create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() } + // Note the values in the `String` column are: + // ❯ select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet'; + // +-----------+ + // | String | + // +-----------+ + // | Hello | + // | This is | + // | a | + // | test | + // | How | + // | are you | + // | doing | + // | today | + // | the quick | + // | brown fox | + // | jumps | + // | over | + // | the lazy | + // | dog | + // +-----------+ #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() { // load parquet file @@ -919,7 +939,7 @@ mod tests { let path = format!("{testdata}/{file_name}"); let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - // generate pruning predicate + // generate pruning predicate `(String = "Hello_Not_exists")` let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); let expr = col(r#""String""#).eq(lit("Hello_Not_Exists")); let expr = logical2physical(&expr, &schema); @@ -946,7 +966,7 @@ mod tests { let path = format!("{testdata}/{file_name}"); let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - // generate pruning predicate + // generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")` let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); let expr = lit("1").eq(lit("1")).and( col(r#""String""#) @@ -1008,7 +1028,7 @@ mod tests { let path = format!("{testdata}/{file_name}"); let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - // generate pruning predicate + // generate pruning predicate `(String = "Hello")` let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); let expr = col(r#""String""#).eq(lit("Hello")); let expr = logical2physical(&expr, &schema); @@ -1027,6 +1047,33 @@ mod tests { assert_eq!(pruned_row_groups, row_groups); } + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_with_exists_values() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate `(String = "Hello") OR (String = "This is")` + let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); + let expr = col(r#""String""#).eq(lit("Hello")).or(col(r#""String""#).eq(lit("the quick"))); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert_eq!(pruned_row_groups, row_groups); + } + #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter() { // load parquet file @@ -1035,7 +1082,7 @@ mod tests { let path = format!("{testdata}/{file_name}"); let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - // generate pruning predicate + // generate pruning predicate on a column without a bloom filter let schema = Schema::new(vec![Field::new("string_col", DataType::Utf8, false)]); let expr = col(r#""string_col""#).eq(lit("0")); let expr = logical2physical(&expr, &schema); From 1db84382f3e9fa2096c1c087249310578a22974b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 06:43:48 -0500 Subject: [PATCH 06/18] add new test, update comments --- .../physical_plan/parquet/row_groups.rs | 27 +++++++++++++++++++ .../core/src/physical_optimizer/pruning.rs | 8 +++--- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 19803faea898..e03d8dd6a854 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -1074,6 +1074,33 @@ mod tests { assert_eq!(pruned_row_groups, row_groups); } + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_with_or_not_eq() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate `(String = "foo") OR (String != "bar")` + let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); + let expr = col(r#""String""#).not_eq(lit("foo")).or(col(r#""String""#).not_eq(lit("bar"))); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert_eq!(pruned_row_groups, row_groups); + } + #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter() { // load parquet file diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 7cca1947bb31..06c0352f9c83 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -91,11 +91,11 @@ pub trait PruningStatistics { None } - /// Returns an array where each element represents if the value of the - /// column CERTAINLY DOES NOT contain any of the specified `values`. + /// Returns an array where each row represents if the value of the + /// column is known to contain or not contain any of the set of `values`. /// - /// This can be used to prune containers based on structures such as Bloom - /// Filters which can test set membership quickly. + /// This is used to prune containers using structures such as Bloom + /// Filters which can quickly test set membership. /// /// The returned array has one row for each container, with the following: /// * `true` if the value of column CERTAINLY IS one of `values` From 692b587cf3f718848bcb9524ce28b895dee6e9a0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 08:04:42 -0500 Subject: [PATCH 07/18] Start adding final set of tests --- .../core/src/physical_optimizer/pruning.rs | 207 ++++++++++++++++++ 1 file changed, 207 insertions(+) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 71a311601a56..7e6805f0c1f6 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1098,6 +1098,7 @@ mod tests { max: ArrayRef, /// Optional values null_counts: Option, + // /// Optional known values (e.g. mimic a bloom filter) } impl ContainerStats { @@ -2504,6 +2505,212 @@ mod tests { // TODO: add other negative test for other case and op } + #[test] + fn prune_with_contains() { + // contains filters for s1 and s2 + let schema = Arc::new(Schema::new(vec![ + Field::new("s1", DataType::Utf8, true), + Field::new("s2", DataType::Utf8, true), + ])); + + // Model having information like bloom filters for s1 and s2 + let statistics = TestStatistics::new() + .with_known_values( + "s1", + ScalarValue::from(0i32), + [ + // container 0 known to contain value, 1 not contains, 2 unknown + Some(true), + Some(false), + None, + // container 3 known to contain value, 4 not contains, 5 unknown + Some(true), + Some(false), + None, + // container 6 known to contain value, 7 not contains, 8 unknown + Some(true), + Some(false), + None, + ], + ) + .with_known_values( + "s2", + ScalarValue::from(0i32), + [ + // container 0,1,2 contains + Some(true), + Some(true), + Some(true), + // container 3,4,5 not contains + Some(false), + Some(false), + Some(false), + // container 5,6,7 unknown + None, + None, + None, + ], + ); + + // s1 = 'foo' OR s2 = 'bar' + let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar"))); + prune_with_expr( + expr, + &schema, + &statistics, + // can only rule out container where we know values are not present (both are Some(false)) + vec![true, true, true, true, false, true, true, true, true], + ); + + // s1 = 'foo' AND s2 != 'bar' + prune_with_expr( + col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))), + &schema, + &statistics, + // can only rule out container where we know the first is not present (Some(false)) and the second is (Some(true)) + vec![true, false, true, true, true, true, true, true, true], + ); + + /* + // s1 != 'foo' AND s2 != 'bar' + let expr = col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))); + let expr = logical2physical(&expr, &schema); + let p = PruningPredicate::try_new(expr, schema).unwrap(); + // Can only rule out container where we know values are present (both are Some(true)) + let expected_ret = vec![false, true, true, true, true, true, true, true, true]; + let result = p.prune(&statistics).unwrap(); + assert_eq!(result, expected_ret); + + // s1 like '%foo%bar%' + let expr = col("s1").like("foo%bar%"); + let expr = logical2physical(&expr, &schema); + let p = PruningPredicate::try_new(expr, schema).unwrap(); + // cant rule out anything (unknown predicate) + let expected_ret = vec![true, true, true, true, true, true, true, true, true]; + let result = p.prune(&statistics).unwrap(); + assert_eq!(result, expected_ret); + + // s1 like '%foo%bar%' AND s2 = 'bar' + let expr = col("s1").like("foo%bar%").and(col("s2").eq(lit("bar"))); + let expr = logical2physical(&expr, &schema); + let p = PruningPredicate::try_new(expr, schema).unwrap(); + // cant rule out all results when we know second column is false + let expected_ret = vec![true, true, true, false, false, false, true, true, true]; + let result = p.prune(&statistics).unwrap(); + assert_eq!(result, expected_ret); + */ + } + + #[test] + fn prune_with_range_and_contains() { + // Setup mimics range information for i, a bloom filter for s + let schema = Arc::new(Schema::new(vec![ + Field::new("i", DataType::Int32, true), + Field::new("s", DataType::Utf8, true), + ])); + + let statistics = TestStatistics::new().with( + "i", + ContainerStats::new_i32( + // Container 0, 3 ,6, contains range + // Container 1, 4, 7, does not contain range + // Container 2, 5, 9, unknown + vec![ + Some(-5), + Some(10), + None, + Some(-5), + Some(10), + None, + Some(-5), + Some(10), + None, + ], // min + vec![ + Some(5), + Some(20), + None, + Some(5), + Some(20), + None, + Some(5), + Some(20), + None, + ], // max + ), + ); + + // Add contains information about the containers + let statistics = statistics.with_known_values( + "b", + ScalarValue::from(0i32), + [ + // container 0, 1,2 contain value + Some(true), + Some(true), + Some(true), + // container 3,4,5 does not contain value + Some(false), + Some(false), + Some(false), + /// container 6,7,8 unknown + None, + None, + None, + ], + ); + + // i = 0 and s = 'foo' + prune_with_expr( + col("i").eq(lit(0)).and(col("s").eq(lit("foo"))), + &schema, + &statistics, + // Can only rule out container where we know values are not present (range is false, and contains is false) + vec![true, true, true, true, false, true, true, true, true], + ); + + /* + // i = 0 and s != 'foo' + let expr = col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))); + let expr = logical2physical(&expr, &schema); + let p = PruningPredicate::try_new(expr, schema).unwrap(); + // Can only rule out container where we know range is false, and contains is true) + let expected_ret = vec![true, false, true, true, true, true, true, true, true]; + let result = p.prune(&statistics).unwrap(); + assert_eq!(result, expected_ret); + + // i = 0 OR s = 'foo' + let expr = col("i").eq(lit(0)).or(col("s").not_eq(lit("foo"))); + let expr = logical2physical(&expr, &schema); + let p = PruningPredicate::try_new(expr, schema).unwrap(); + // cant rule out anything (as connected by OR) + let expected_ret = vec![true, true, true, true, true, true, true, true, true]; + let result = p.prune(&statistics).unwrap(); + assert_eq!(result, expected_ret); + + */ + } + + /// prunes the specified expr with the specified schema and statistics, and + /// ensures it returns expected. + /// + /// `expected` is a vector of bools, where true means the row group should + /// be kept, and false means it should be pruned. + /// + // TODO refactor other tests to use this to reduce boiler plate + fn prune_with_expr( + expr: Expr, + schema: &SchemaRef, + statistics: &TestStatistics, + expected: Vec, + ) -> Vec { + let expr = logical2physical(&expr, &schema); + let p = PruningPredicate::try_new(expr, schema).unwrap(); + p.prune(&statistics).unwrap(); + let result = p.prune(&statistics).unwrap(); + assert_eq!(result, expected); + } + fn test_build_predicate_expression( expr: &Expr, schema: &Schema, From e01ae66897550800256792d43b359b6eaeed8208 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 08:42:30 -0500 Subject: [PATCH 08/18] update tests --- .../core/src/physical_optimizer/pruning.rs | 145 +++++++++++------- 1 file changed, 90 insertions(+), 55 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 7e6805f0c1f6..3c12e53b8273 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1098,7 +1098,9 @@ mod tests { max: ArrayRef, /// Optional values null_counts: Option, - // /// Optional known values (e.g. mimic a bloom filter) + /// Optional known values (e.g. mimic a bloom filter) + /// If present, must be the same size as min/max + known_values: Option, } impl ContainerStats { @@ -1122,6 +1124,7 @@ mod tests { .unwrap(), ), null_counts: None, + known_values: None, } } @@ -1133,6 +1136,7 @@ mod tests { min: Arc::new(min.into_iter().collect::()), max: Arc::new(max.into_iter().collect::()), null_counts: None, + known_values: None, } } @@ -1144,6 +1148,7 @@ mod tests { min: Arc::new(min.into_iter().collect::()), max: Arc::new(max.into_iter().collect::()), null_counts: None, + known_values: None, } } @@ -1155,6 +1160,7 @@ mod tests { min: Arc::new(min.into_iter().collect::()), max: Arc::new(max.into_iter().collect::()), null_counts: None, + known_values: None, } } @@ -1166,6 +1172,7 @@ mod tests { min: Arc::new(min.into_iter().collect::()), max: Arc::new(max.into_iter().collect::()), null_counts: None, + known_values: None, } } @@ -1192,7 +1199,6 @@ mod tests { mut self, counts: impl IntoIterator>, ) -> Self { - // take stats out and update them let null_counts: ArrayRef = Arc::new(counts.into_iter().collect::()); @@ -1200,6 +1206,19 @@ mod tests { self.null_counts = Some(null_counts); self } + + /// Add contains informaation. + pub fn with_known_values( + mut self, + values: impl IntoIterator>, + ) -> Self { + let known_values: ArrayRef = + Arc::new(values.into_iter().collect::()); + + assert_eq!(known_values.len(), self.len()); + self.known_values = Some(known_values); + self + } } #[derive(Debug, Default)] @@ -1244,6 +1263,26 @@ mod tests { self.stats.insert(col, container_stats); self } + + /// Add contains informaation for the specified columm. + fn with_known_values( + mut self, + name: impl Into, + values: impl IntoIterator>, + ) -> Self { + let col = Column::from_name(name.into()); + + // take stats out and update them + let container_stats = self + .stats + .remove(&col) + .expect("Can not find stats for column") + .with_known_values(values); + + // put stats back in + self.stats.insert(col, container_stats); + self + } } impl PruningStatistics for TestStatistics { @@ -2517,7 +2556,6 @@ mod tests { let statistics = TestStatistics::new() .with_known_values( "s1", - ScalarValue::from(0i32), [ // container 0 known to contain value, 1 not contains, 2 unknown Some(true), @@ -2535,7 +2573,6 @@ mod tests { ) .with_known_values( "s2", - ScalarValue::from(0i32), [ // container 0,1,2 contains Some(true), @@ -2609,56 +2646,55 @@ mod tests { Field::new("s", DataType::Utf8, true), ])); - let statistics = TestStatistics::new().with( - "i", - ContainerStats::new_i32( - // Container 0, 3 ,6, contains range - // Container 1, 4, 7, does not contain range - // Container 2, 5, 9, unknown - vec![ - Some(-5), - Some(10), - None, - Some(-5), - Some(10), - None, - Some(-5), - Some(10), - None, - ], // min - vec![ - Some(5), - Some(20), + let statistics = TestStatistics::new() + .with( + "i", + ContainerStats::new_i32( + // Container 0, 3 ,6, contains range + // Container 1, 4, 7, does not contain range + // Container 2, 5, 9, unknown + vec![ + Some(-5), + Some(10), + None, + Some(-5), + Some(10), + None, + Some(-5), + Some(10), + None, + ], // min + vec![ + Some(5), + Some(20), + None, + Some(5), + Some(20), + None, + Some(5), + Some(20), + None, + ], // max + ), + ) + // Add contains information about the containers + .with_known_values( + "b", + [ + // container 0, 1,2 contain value + Some(true), + Some(true), + Some(true), + // container 3,4,5 does not contain value + Some(false), + Some(false), + Some(false), + // container 6,7,8 unknown None, - Some(5), - Some(20), None, - Some(5), - Some(20), None, - ], // max - ), - ); - - // Add contains information about the containers - let statistics = statistics.with_known_values( - "b", - ScalarValue::from(0i32), - [ - // container 0, 1,2 contain value - Some(true), - Some(true), - Some(true), - // container 3,4,5 does not contain value - Some(false), - Some(false), - Some(false), - /// container 6,7,8 unknown - None, - None, - None, - ], - ); + ], + ); // i = 0 and s = 'foo' prune_with_expr( @@ -2703,11 +2739,10 @@ mod tests { schema: &SchemaRef, statistics: &TestStatistics, expected: Vec, - ) -> Vec { + ) { let expr = logical2physical(&expr, &schema); - let p = PruningPredicate::try_new(expr, schema).unwrap(); - p.prune(&statistics).unwrap(); - let result = p.prune(&statistics).unwrap(); + let p = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let result = p.prune(statistics).unwrap(); assert_eq!(result, expected); } From 7d23874721ee19f32b144335eafe3e8cee08ce81 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 09:04:16 -0500 Subject: [PATCH 09/18] update test infrastructure --- .../core/src/physical_optimizer/pruning.rs | 137 +++++++++++------- 1 file changed, 85 insertions(+), 52 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 3c12e53b8273..a3a71f492d5b 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1085,7 +1085,7 @@ mod tests { use std::collections::HashMap; use std::ops::{Not, Rem}; - #[derive(Debug)] + #[derive(Debug, Default)] /// Mock statistic provider for tests /// /// Each row represents the statistics for a "container" (which @@ -1094,94 +1094,82 @@ mod tests { /// /// Note All `ArrayRefs` must be the same size. struct ContainerStats { - min: ArrayRef, - max: ArrayRef, + min: Option, + max: Option, /// Optional values null_counts: Option, /// Optional known values (e.g. mimic a bloom filter) /// If present, must be the same size as min/max - known_values: Option, + contains: Option, } impl ContainerStats { + fn new() -> Self { + Default::default() + } fn new_decimal128( min: impl IntoIterator>, max: impl IntoIterator>, precision: u8, scale: i8, ) -> Self { - Self { - min: Arc::new( + Self::new() + .with_min(Arc::new( min.into_iter() .collect::() .with_precision_and_scale(precision, scale) .unwrap(), - ), - max: Arc::new( + )) + .with_max(Arc::new( max.into_iter() .collect::() .with_precision_and_scale(precision, scale) .unwrap(), - ), - null_counts: None, - known_values: None, - } + )) } fn new_i64( min: impl IntoIterator>, max: impl IntoIterator>, ) -> Self { - Self { - min: Arc::new(min.into_iter().collect::()), - max: Arc::new(max.into_iter().collect::()), - null_counts: None, - known_values: None, - } + Self::new() + .with_min(Arc::new(min.into_iter().collect::())) + .with_max(Arc::new(max.into_iter().collect::())) } fn new_i32( min: impl IntoIterator>, max: impl IntoIterator>, ) -> Self { - Self { - min: Arc::new(min.into_iter().collect::()), - max: Arc::new(max.into_iter().collect::()), - null_counts: None, - known_values: None, - } + Self::new() + .with_min(Arc::new(min.into_iter().collect::())) + .with_max(Arc::new(max.into_iter().collect::())) } fn new_utf8<'a>( min: impl IntoIterator>, max: impl IntoIterator>, ) -> Self { - Self { - min: Arc::new(min.into_iter().collect::()), - max: Arc::new(max.into_iter().collect::()), - null_counts: None, - known_values: None, - } + Self::new() + .with_min(Arc::new(min.into_iter().collect::())) + .with_max(Arc::new(max.into_iter().collect::())) } fn new_bool( min: impl IntoIterator>, max: impl IntoIterator>, ) -> Self { - Self { - min: Arc::new(min.into_iter().collect::()), - max: Arc::new(max.into_iter().collect::()), - null_counts: None, - known_values: None, - } + Self::new() + .with_min(Arc::new(min.into_iter().collect::())) + .with_max(Arc::new(max.into_iter().collect::())) } fn min(&self) -> Option { - Some(self.min.clone()) + self.min.clone() } fn max(&self) -> Option { - Some(self.max.clone()) + self.max.clone() } fn null_counts(&self) -> Option { @@ -1189,8 +1177,53 @@ mod tests { } fn len(&self) -> usize { - assert_eq!(self.min.len(), self.max.len()); - self.min.len() + // pick the first non zero length + self.min + .as_ref() + .map(|m| m.len()) + .or_else(|| self.max.as_ref().map(|m| m.len())) + .or_else(|| self.null_counts.as_ref().map(|m| m.len())) + .or_else(|| self.contains.as_ref().map(|m| m.len())) + .unwrap_or(0) + } + + /// Ensure that the lengths of all arrays are consistent + fn assert_invariants(&self) { + let lens = vec![ + self.min.as_ref().map(|m| m.len()), + self.max.as_ref().map(|m| m.len()), + self.null_counts.as_ref().map(|m| m.len()), + self.contains.as_ref().map(|m| m.len()), + ]; + + let mut prev_len = None; + + for maybe_len in lens { + // Get a length, if we don't already have one + match (prev_len, maybe_len) { + (None, _) => { + prev_len = maybe_len; + } + (Some(_), None) => { + // no length to check + } + (Some(prev_len), Some(len)) => { + assert_eq!(prev_len, len); + } + } + } + } + + /// Add min values + fn with_min(mut self, min: ArrayRef) -> Self { + self.min = Some(min); + self + } + + /// Add max values + fn with_max(mut self, max: ArrayRef) -> Self { + self.max = Some(max); + self } /// Add null counts. There must be the same number of null counts as @@ -1202,21 +1235,21 @@ mod tests { let null_counts: ArrayRef = Arc::new(counts.into_iter().collect::()); - assert_eq!(null_counts.len(), self.len()); + self.assert_invariants(); self.null_counts = Some(null_counts); self } /// Add contains informaation. - pub fn with_known_values( + pub fn with_contains( mut self, values: impl IntoIterator>, ) -> Self { - let known_values: ArrayRef = + let contains: ArrayRef = Arc::new(values.into_iter().collect::()); - assert_eq!(known_values.len(), self.len()); - self.known_values = Some(known_values); + self.assert_invariants(); + self.contains = Some(contains); self } } @@ -1256,7 +1289,7 @@ mod tests { let container_stats = self .stats .remove(&col) - .expect("Can not find stats for column") + .unwrap_or_else(|| ContainerStats::new()) .with_null_counts(counts); // put stats back in @@ -1265,7 +1298,7 @@ mod tests { } /// Add contains informaation for the specified columm. - fn with_known_values( + fn with_contains( mut self, name: impl Into, values: impl IntoIterator>, @@ -1276,8 +1309,8 @@ mod tests { let container_stats = self .stats .remove(&col) - .expect("Can not find stats for column") - .with_known_values(values); + .unwrap_or_else(|| ContainerStats::new()) + .with_contains(values); // put stats back in self.stats.insert(col, container_stats); @@ -2554,7 +2587,7 @@ mod tests { // Model having information like bloom filters for s1 and s2 let statistics = TestStatistics::new() - .with_known_values( + .with_contains( "s1", [ // container 0 known to contain value, 1 not contains, 2 unknown @@ -2571,7 +2604,7 @@ mod tests { None, ], ) - .with_known_values( + .with_contains( "s2", [ // container 0,1,2 contains @@ -2678,7 +2711,7 @@ mod tests { ), ) // Add contains information about the containers - .with_known_values( + .with_contains( "b", [ // container 0, 1,2 contain value From 7e90d6923b51638889a86f5abae476124956184f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 09:09:35 -0500 Subject: [PATCH 10/18] hack --- .../core/src/physical_optimizer/pruning.rs | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index a3a71f492d5b..476513d45150 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1100,7 +1100,7 @@ mod tests { null_counts: Option, /// Optional known values (e.g. mimic a bloom filter) /// If present, must be the same size as min/max - contains: Option, + contains: Option, } impl ContainerStats { @@ -1245,8 +1245,7 @@ mod tests { mut self, values: impl IntoIterator>, ) -> Self { - let contains: ArrayRef = - Arc::new(values.into_iter().collect::()); + let contains: BooleanArray = values.into_iter().collect(); self.assert_invariants(); self.contains = Some(contains); @@ -1347,6 +1346,18 @@ mod tests { .map(|container_stats| container_stats.null_counts()) .unwrap_or(None) } + + fn contains( + &self, + column: &Column, + _values: &HashSet, + ) -> Option { + // ignore values, just return the contains array for this column + // the testing the values is checked in more integration tests (e.g. bloom filter) + self.stats + .get(column) + .and_then(|container_stats| container_stats.contains.clone()) + } } /// Returns the specified min/max container values @@ -2622,6 +2633,15 @@ mod tests { ], ); + // s1 = 'foo' + prune_with_expr( + col("s1").eq(lit("foo")), + &schema, + &statistics, + // rule out containers where we know s1 is not present + vec![true, false, true, true, false, true, true, false, true], + ); + // s1 = 'foo' OR s2 = 'bar' let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar"))); prune_with_expr( From 418e7937b90f7b2c1d78e477937c88b224aa03ac Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 09:31:07 -0500 Subject: [PATCH 11/18] add single column tests --- .../core/src/physical_optimizer/pruning.rs | 155 +++++++++++++----- 1 file changed, 117 insertions(+), 38 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 476513d45150..f5ac8eb1d614 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1099,8 +1099,9 @@ mod tests { /// Optional values null_counts: Option, /// Optional known values (e.g. mimic a bloom filter) - /// If present, must be the same size as min/max - contains: Option, + /// (value, contains) + /// If present, all BooleanArrays must be the same size as min/max + contains: Vec<(HashSet, BooleanArray)>, } impl ContainerStats { @@ -1176,38 +1177,40 @@ mod tests { self.null_counts.clone() } + /// return an iterator over all arrays in this statistics + fn arrays(&self) -> Vec { + let contains_arrays = self + .contains + .iter() + .map(|(_values, contains)| Arc::new(contains.clone()) as ArrayRef); + + [ + self.min.as_ref().map(|a| a.clone()), + self.max.as_ref().map(|a| a.clone()), + self.null_counts.as_ref().map(|a| a.clone()), + ] + .into_iter() + .flatten() + .chain(contains_arrays) + .collect() + } + fn len(&self) -> usize { // pick the first non zero length - self.min - .as_ref() - .map(|m| m.len()) - .or_else(|| self.max.as_ref().map(|m| m.len())) - .or_else(|| self.null_counts.as_ref().map(|m| m.len())) - .or_else(|| self.contains.as_ref().map(|m| m.len())) - .unwrap_or(0) + self.arrays().iter().map(|a| a.len()).next().unwrap_or(0) } /// Ensure that the lengths of all arrays are consistent fn assert_invariants(&self) { - let lens = vec![ - self.min.as_ref().map(|m| m.len()), - self.max.as_ref().map(|m| m.len()), - self.null_counts.as_ref().map(|m| m.len()), - self.contains.as_ref().map(|m| m.len()), - ]; - let mut prev_len = None; - for maybe_len in lens { + for len in self.arrays().iter().map(|a| a.len()) { // Get a length, if we don't already have one - match (prev_len, maybe_len) { - (None, _) => { - prev_len = maybe_len; + match prev_len { + None => { + prev_len = Some(len); } - (Some(_), None) => { - // no length to check - } - (Some(prev_len), Some(len)) => { + Some(prev_len) => { assert_eq!(prev_len, len); } } @@ -1243,14 +1246,25 @@ mod tests { /// Add contains informaation. pub fn with_contains( mut self, - values: impl IntoIterator>, + values: impl IntoIterator, + contains: impl IntoIterator>, ) -> Self { - let contains: BooleanArray = values.into_iter().collect(); + let contains: BooleanArray = contains.into_iter().collect(); + let values: HashSet<_> = values.into_iter().collect(); + self.contains.push((values, contains)); self.assert_invariants(); - self.contains = Some(contains); self } + + /// get any contains information for the specified values + fn contains(&self, find_values: &HashSet) -> Option { + // find the one with the matching values + self.contains + .iter() + .find(|(values, _contains)| values == find_values) + .map(|(_values, contains)| contains.clone()) + } } #[derive(Debug, Default)] @@ -1300,7 +1314,8 @@ mod tests { fn with_contains( mut self, name: impl Into, - values: impl IntoIterator>, + values: impl IntoIterator, + contains: impl IntoIterator>, ) -> Self { let col = Column::from_name(name.into()); @@ -1309,7 +1324,7 @@ mod tests { .stats .remove(&col) .unwrap_or_else(|| ContainerStats::new()) - .with_contains(values); + .with_contains(values, contains); // put stats back in self.stats.insert(col, container_stats); @@ -1350,13 +1365,11 @@ mod tests { fn contains( &self, column: &Column, - _values: &HashSet, + values: &HashSet, ) -> Option { - // ignore values, just return the contains array for this column - // the testing the values is checked in more integration tests (e.g. bloom filter) self.stats .get(column) - .and_then(|container_stats| container_stats.contains.clone()) + .and_then(|container_stats| container_stats.contains(values)) } } @@ -2588,6 +2601,69 @@ mod tests { // TODO: add other negative test for other case and op } + #[test] + fn prune_with_contains_one_column() { + // contains filters for s1 and s2 + let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)])); + + // Model having information like bloom filters for s1 and s2 + let statistics = TestStatistics::new() + .with_contains( + "s1", + [ScalarValue::from("foo")], + [ + // container 0 known to contain "foo"", 1 not contains, 2 unknown + Some(true), + Some(false), + None, + // container 3 known to contain "foo", 4 not contains, 5 unknown + Some(true), + Some(false), + None, + // container 6 known to contain "foo", 7 not contains, 8 unknown + Some(true), + Some(false), + None, + ], + ) + .with_contains( + "s1", + [ScalarValue::from("bar")], + [ + // container 0,1,2 contains + Some(true), + Some(true), + Some(true), + // container 3,4,5 not contains + Some(false), + Some(false), + Some(false), + // container 5,6,7 unknown + None, + None, + None, + ], + ); + + // s1 = 'foo' + prune_with_expr( + col("s1").eq(lit("foo")), + &schema, + &statistics, + // rule out containers where we know foo is not present + vec![true, false, true, true, false, true, true, false, true], + ); + + // s1 = 'bar' + prune_with_expr( + col("s1").eq(lit("bar")), + &schema, + &statistics, + // rule out containers where we know bar is not present + vec![true, true, true, false, false, false, true, true, true], + ); + } + #[test] fn prune_with_contains() { // contains filters for s1 and s2 @@ -2600,16 +2676,17 @@ mod tests { let statistics = TestStatistics::new() .with_contains( "s1", + [ScalarValue::from("foo")], [ - // container 0 known to contain value, 1 not contains, 2 unknown + // container 0 known to contain "foo"", 1 not contains, 2 unknown Some(true), Some(false), None, - // container 3 known to contain value, 4 not contains, 5 unknown + // container 3 known to contain "foo", 4 not contains, 5 unknown Some(true), Some(false), None, - // container 6 known to contain value, 7 not contains, 8 unknown + // container 6 known to contain "foo", 7 not contains, 8 unknown Some(true), Some(false), None, @@ -2617,6 +2694,7 @@ mod tests { ) .with_contains( "s2", + [ScalarValue::from("bar")], [ // container 0,1,2 contains Some(true), @@ -2648,8 +2726,8 @@ mod tests { expr, &schema, &statistics, - // can only rule out container where we know values are not present (both are Some(false)) - vec![true, true, true, true, false, true, true, true, true], + // can't rule out any container (predicate is on both columns) + vec![true, true, true, true, true, true, true, true, true], ); // s1 = 'foo' AND s2 != 'bar' @@ -2733,6 +2811,7 @@ mod tests { // Add contains information about the containers .with_contains( "b", + [ScalarValue::from("bar")], [ // container 0, 1,2 contain value Some(true), From 0793f799f2acd237ecba3ce1752dcce6e9a4afa1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 09:41:18 -0500 Subject: [PATCH 12/18] more --- .../core/src/physical_optimizer/pruning.rs | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index f5ac8eb1d614..12b16d504873 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -2662,10 +2662,30 @@ mod tests { // rule out containers where we know bar is not present vec![true, true, true, false, false, false, true, true, true], ); + + // s1 = 'foo' AND s1 = 'bar' + prune_with_expr( + col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))), + &schema, + &statistics, + // this can't possibly be true (the column can't take on both values) + // but we can certainly rule it out if the stats tell us that both values are not present + vec![true, true, true, true, true, true,true, true, true] + ); + + // s1 = 'foo' OR s1 = 'bar' + prune_with_expr( + col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))), + &schema, + &statistics, + // can rule out only when we know both are not present + vec![true, true, true, true, false, true, true, true, true], + ); + } #[test] - fn prune_with_contains() { + fn prune_with_contains_two_columns() { // contains filters for s1 and s2 let schema = Arc::new(Schema::new(vec![ Field::new("s1", DataType::Utf8, true), From f99759ecb8a3f7f030fd6edd83acab89777a56e4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 17:21:25 -0500 Subject: [PATCH 13/18] complete single column tests --- .../core/src/physical_optimizer/pruning.rs | 89 +++++++++++++++++-- 1 file changed, 82 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 12b16d504873..07c849a3fd0e 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -238,6 +238,8 @@ impl PruningPredicate { // First, check any expr_op_literals for literal_guarantee in &self.literal_guarantees { + println!("Considering guarantee {:#?}", literal_guarantee); + println!("builder at start {:#?}", builder); let LiteralGuarantee { column, guarantee, @@ -245,13 +247,17 @@ impl PruningPredicate { } = literal_guarantee; // Can the statistics tell us anything about this column? if let Some(results) = statistics.contains(column, literals) { - match guarantee { + println!(" got results {:#?}", results); + + let builder = match guarantee { Guarantee::In => builder.append_array(&results), Guarantee::NotIn => { let results = arrow::compute::not(&results)?; builder.append_array(&results) } - } + }; + println!("builder is now {:#?}", builder); + builder } } @@ -2630,11 +2636,11 @@ mod tests { "s1", [ScalarValue::from("bar")], [ - // container 0,1,2 contains + // container 0,1,2 contains "bar" Some(true), Some(true), Some(true), - // container 3,4,5 not contains + // container 3,4,5 does not contain "bar" Some(false), Some(false), Some(false), @@ -2643,6 +2649,26 @@ mod tests { None, None, ], + ) + .with_contains( + // the way the tests are setup, this contains is + // consulted if the "foo" and "bar" are being checked at the same time + "s1", + [ScalarValue::from("foo"), ScalarValue::from("bar")], + [ + // container 0,1,2 unknown + None, + None, + None, + // container 3,4,5 does not contains either "foo" and "bar" + Some(true), + Some(true), + Some(true), + // container 6,7,8 contains either "foo" and "bar" + Some(false), + Some(false), + Some(false), + ], ); // s1 = 'foo' @@ -2663,6 +2689,15 @@ mod tests { vec![true, true, true, false, false, false, true, true, true], ); + // s1 = 'baz' (unknown value) + prune_with_expr( + col("s1").eq(lit("baz")), + &schema, + &statistics, + // can't rule out anything + vec![true, true, true, true, true, true, true, true, true], + ); + // s1 = 'foo' AND s1 = 'bar' prune_with_expr( col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))), @@ -2670,7 +2705,7 @@ mod tests { &statistics, // this can't possibly be true (the column can't take on both values) // but we can certainly rule it out if the stats tell us that both values are not present - vec![true, true, true, true, true, true,true, true, true] + vec![true, true, true, true, true, true, true, true, true], ); // s1 = 'foo' OR s1 = 'bar' @@ -2678,10 +2713,49 @@ mod tests { col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))), &schema, &statistics, - // can rule out only when we know both are not present - vec![true, true, true, true, false, true, true, true, true], + // returns the check with foo and bar constants + vec![true, true, true, true, true, true, false, false, false], + ); + + // s1 != foo + prune_with_expr( + col("s1").not_eq(lit("foo")), + &schema, + &statistics, + // rule out when we know for sure s1 does the value + vec![false, true, true, false, true, true, false, true, true], + ); + + // s1 != bar + prune_with_expr( + col("s1").not_eq(lit("bar")), + &schema, + &statistics, + // rule out when we know for sure s1 does have the value + vec![false, false, false, true, true, true, true, true, true], ); + // s1 != foo AND s1 != bar + prune_with_expr( + col("s1") + .not_eq(lit("foo")) + .and(col("s1").not_eq(lit("bar"))), + &schema, + &statistics, + // can rule out any container where we know s1 does have either value + vec![true, true, true, false, false, false, true, true, true], + ); + + // s1 != foo OR s1 != bar + prune_with_expr( + col("s1") + .not_eq(lit("foo")) + .or(col("s1").not_eq(lit("bar"))), + &schema, + &statistics, + // cant' rule out anything + vec![true, true, true, true, true, true, true, true, true], + ); } #[test] @@ -2892,6 +2966,7 @@ mod tests { statistics: &TestStatistics, expected: Vec, ) { + println!("Pruning with expr: {}", expr); let expr = logical2physical(&expr, &schema); let p = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let result = p.prune(statistics).unwrap(); From ce026f3864d2428413eff7e8b972d24b4749319a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 17:33:52 -0500 Subject: [PATCH 14/18] complete tests --- .../core/src/physical_optimizer/pruning.rs | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 07c849a3fd0e..c0d257d84f90 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -2829,38 +2829,36 @@ mod tests { col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))), &schema, &statistics, - // can only rule out container where we know the first is not present (Some(false)) and the second is (Some(true)) - vec![true, false, true, true, true, true, true, true, true], + // can only rule out container where we know s1 is NOT present or s2 IS preset + vec![false, false, false, true, false, true, true, false, true], ); - /* // s1 != 'foo' AND s2 != 'bar' - let expr = col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))); - let expr = logical2physical(&expr, &schema); - let p = PruningPredicate::try_new(expr, schema).unwrap(); - // Can only rule out container where we know values are present (both are Some(true)) - let expected_ret = vec![false, true, true, true, true, true, true, true, true]; - let result = p.prune(&statistics).unwrap(); - assert_eq!(result, expected_ret); + prune_with_expr( + col("s1").not_eq(lit("foo")).and(col("s2").not_eq(lit("bar"))), + &schema, + &statistics, + // Can rule out any container where we know either values is present for sure + vec![false, false, false, false, true, true, false, true, true], + ); // s1 like '%foo%bar%' - let expr = col("s1").like("foo%bar%"); - let expr = logical2physical(&expr, &schema); - let p = PruningPredicate::try_new(expr, schema).unwrap(); - // cant rule out anything (unknown predicate) - let expected_ret = vec![true, true, true, true, true, true, true, true, true]; - let result = p.prune(&statistics).unwrap(); - assert_eq!(result, expected_ret); + prune_with_expr( + col("s1").like(lit("foo%bar%")), + &schema, + &statistics, + // cant rule out anything (unknown predicate) + vec![true, true, true, true, true, true, true, true, true], + ); // s1 like '%foo%bar%' AND s2 = 'bar' - let expr = col("s1").like("foo%bar%").and(col("s2").eq(lit("bar"))); - let expr = logical2physical(&expr, &schema); - let p = PruningPredicate::try_new(expr, schema).unwrap(); - // cant rule out all results when we know second column is false - let expected_ret = vec![true, true, true, false, false, false, true, true, true]; - let result = p.prune(&statistics).unwrap(); - assert_eq!(result, expected_ret); - */ + prune_with_expr( + col("s1").like(lit("foo%bar%")).and(col("s2").eq(lit("bar"))), + &schema, + &statistics, + // can rule out all results when we know second column is false + vec![true, true, true, false, false, false, true, true, true], + ); } #[test] From 6b2359aeb63d9124a917882c44ad8049c1c355d5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 17:39:12 -0500 Subject: [PATCH 15/18] cleanup --- .../core/src/physical_optimizer/pruning.rs | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index c0d257d84f90..8090cb9f0633 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -2835,7 +2835,9 @@ mod tests { // s1 != 'foo' AND s2 != 'bar' prune_with_expr( - col("s1").not_eq(lit("foo")).and(col("s2").not_eq(lit("bar"))), + col("s1") + .not_eq(lit("foo")) + .and(col("s2").not_eq(lit("bar"))), &schema, &statistics, // Can rule out any container where we know either values is present for sure @@ -2853,7 +2855,9 @@ mod tests { // s1 like '%foo%bar%' AND s2 = 'bar' prune_with_expr( - col("s1").like(lit("foo%bar%")).and(col("s2").eq(lit("bar"))), + col("s1") + .like(lit("foo%bar%")) + .and(col("s2").eq(lit("bar"))), &schema, &statistics, // can rule out all results when we know second column is false @@ -2900,16 +2904,16 @@ mod tests { ], // max ), ) - // Add contains information about the containers + // Add contains information about the containers with the foo value .with_contains( - "b", - [ScalarValue::from("bar")], + "s", + [ScalarValue::from("foo")], [ - // container 0, 1,2 contain value + // container 0, 1,2 contains foo for sure Some(true), Some(true), Some(true), - // container 3,4,5 does not contain value + // container 3,4,5 does not contain "foo" for sure Some(false), Some(false), Some(false), @@ -2925,8 +2929,9 @@ mod tests { col("i").eq(lit(0)).and(col("s").eq(lit("foo"))), &schema, &statistics, - // Can only rule out container where we know values are not present (range is false, and contains is false) - vec![true, true, true, true, false, true, true, true, true], + // Can only rule out container where we know values are not present + // (range is false, and contains is false) + vec![true, false, true, false, false, false, true, false, true], ); /* From 15c65e5b650ae97df3f4429a815186e8dea29a00 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 17:43:02 -0500 Subject: [PATCH 16/18] Finish tests --- .../core/src/physical_optimizer/pruning.rs | 41 ++++++++----------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 8090cb9f0633..a1d8273b27b1 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -238,8 +238,6 @@ impl PruningPredicate { // First, check any expr_op_literals for literal_guarantee in &self.literal_guarantees { - println!("Considering guarantee {:#?}", literal_guarantee); - println!("builder at start {:#?}", builder); let LiteralGuarantee { column, guarantee, @@ -247,17 +245,13 @@ impl PruningPredicate { } = literal_guarantee; // Can the statistics tell us anything about this column? if let Some(results) = statistics.contains(column, literals) { - println!(" got results {:#?}", results); - - let builder = match guarantee { + match guarantee { Guarantee::In => builder.append_array(&results), Guarantee::NotIn => { let results = arrow::compute::not(&results)?; builder.append_array(&results) } - }; - println!("builder is now {:#?}", builder); - builder + } } } @@ -2934,26 +2928,23 @@ mod tests { vec![true, false, true, false, false, false, true, false, true], ); - /* // i = 0 and s != 'foo' - let expr = col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))); - let expr = logical2physical(&expr, &schema); - let p = PruningPredicate::try_new(expr, schema).unwrap(); - // Can only rule out container where we know range is false, and contains is true) - let expected_ret = vec![true, false, true, true, true, true, true, true, true]; - let result = p.prune(&statistics).unwrap(); - assert_eq!(result, expected_ret); + prune_with_expr( + col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))), + &schema, + &statistics, + // Can only rule out container where we know range is false, or contains is true) + vec![false, false, false, true, false, true, true, false, true], + ); // i = 0 OR s = 'foo' - let expr = col("i").eq(lit(0)).or(col("s").not_eq(lit("foo"))); - let expr = logical2physical(&expr, &schema); - let p = PruningPredicate::try_new(expr, schema).unwrap(); - // cant rule out anything (as connected by OR) - let expected_ret = vec![true, true, true, true, true, true, true, true, true]; - let result = p.prune(&statistics).unwrap(); - assert_eq!(result, expected_ret); - - */ + prune_with_expr( + col("i").eq(lit(0)).or(col("s").not_eq(lit("foo"))), + &schema, + &statistics, + // cant rule out anything (as connected by OR) + vec![true, true, true, true, true, true, true, true, true], + ); } /// prunes the specified expr with the specified schema and statistics, and From 7be776cadfbc0be66996339ea6d5708d4d9270af Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 17:44:11 -0500 Subject: [PATCH 17/18] clippy --- datafusion/core/src/physical_optimizer/pruning.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index a1d8273b27b1..6ff50b1dd592 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1185,9 +1185,9 @@ mod tests { .map(|(_values, contains)| Arc::new(contains.clone()) as ArrayRef); [ - self.min.as_ref().map(|a| a.clone()), - self.max.as_ref().map(|a| a.clone()), - self.null_counts.as_ref().map(|a| a.clone()), + self.min.as_ref().cloned(), + self.max.as_ref().cloned(), + self.null_counts.as_ref().cloned(), ] .into_iter() .flatten() @@ -1302,7 +1302,7 @@ mod tests { let container_stats = self .stats .remove(&col) - .unwrap_or_else(|| ContainerStats::new()) + .unwrap_or_else(ContainerStats::new) .with_null_counts(counts); // put stats back in @@ -1323,7 +1323,7 @@ mod tests { let container_stats = self .stats .remove(&col) - .unwrap_or_else(|| ContainerStats::new()) + .unwrap_or_else(ContainerStats::new) .with_contains(values, contains); // put stats back in @@ -2961,7 +2961,7 @@ mod tests { expected: Vec, ) { println!("Pruning with expr: {}", expr); - let expr = logical2physical(&expr, &schema); + let expr = logical2physical(&expr, schema); let p = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let result = p.prune(statistics).unwrap(); assert_eq!(result, expected); From a523b29a3b42b5e83e3f02b67afbca817475d6a9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Dec 2023 17:45:08 -0500 Subject: [PATCH 18/18] clippy --- datafusion/core/src/physical_optimizer/pruning.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 6ff50b1dd592..4ba718eda80f 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1302,7 +1302,7 @@ mod tests { let container_stats = self .stats .remove(&col) - .unwrap_or_else(ContainerStats::new) + .unwrap_or_default() .with_null_counts(counts); // put stats back in @@ -1323,7 +1323,7 @@ mod tests { let container_stats = self .stats .remove(&col) - .unwrap_or_else(ContainerStats::new) + .unwrap_or_default() .with_contains(values, contains); // put stats back in