Skip to content

Commit c7a6965

Browse files
authored
Make filter selectivity for statistics configurable (#8243)
* Turning filter selectivity as a configurable parameter * Renaming API to be more consistent with struct value * Adding a filter with custom selectivity
1 parent e322839 commit c7a6965

File tree

10 files changed

+124
-6
lines changed

10 files changed

+124
-6
lines changed

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,11 @@ config_namespace! {
524524
/// The maximum estimated size in bytes for one input side of a HashJoin
525525
/// will be collected into a single partition
526526
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
527+
528+
/// The default filter selectivity used by Filter Statistics
529+
/// when an exact selectivity cannot be determined. Valid values are
530+
/// between 0 (no selectivity) and 100 (all rows are selected).
531+
pub default_filter_selectivity: u8, default = 20
527532
}
528533
}
529534

@@ -877,6 +882,7 @@ config_field!(String);
877882
config_field!(bool);
878883
config_field!(usize);
879884
config_field!(f64);
885+
config_field!(u8);
880886
config_field!(u64);
881887

882888
/// An implementation trait used to recursively walk configuration

datafusion/core/src/physical_optimizer/projection_pushdown.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,10 @@ fn try_swapping_with_filter(
348348
};
349349

350350
FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?)
351+
.and_then(|e| {
352+
let selectivity = filter.default_selectivity();
353+
e.with_default_selectivity(selectivity)
354+
})
351355
.map(|e| Some(Arc::new(e) as _))
352356
}
353357

datafusion/core/src/physical_planner.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -915,7 +915,9 @@ impl DefaultPhysicalPlanner {
915915
&input_schema,
916916
session_state,
917917
)?;
918-
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
918+
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
919+
let filter = FilterExec::try_new(runtime_expr, physical_input)?;
920+
Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
919921
}
920922
LogicalPlan::Union(Union { inputs, .. }) => {
921923
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;

datafusion/physical-plan/src/filter.rs

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ pub struct FilterExec {
6161
input: Arc<dyn ExecutionPlan>,
6262
/// Execution metrics
6363
metrics: ExecutionPlanMetricsSet,
64+
/// Selectivity for statistics. 0 = no rows, 100 all rows
65+
default_selectivity: u8,
6466
}
6567

6668
impl FilterExec {
@@ -74,13 +76,25 @@ impl FilterExec {
7476
predicate,
7577
input: input.clone(),
7678
metrics: ExecutionPlanMetricsSet::new(),
79+
default_selectivity: 20,
7780
}),
7881
other => {
7982
plan_err!("Filter predicate must return boolean values, not {other:?}")
8083
}
8184
}
8285
}
8386

87+
pub fn with_default_selectivity(
88+
mut self,
89+
default_selectivity: u8,
90+
) -> Result<Self, DataFusionError> {
91+
if default_selectivity > 100 {
92+
return plan_err!("Default flter selectivity needs to be less than 100");
93+
}
94+
self.default_selectivity = default_selectivity;
95+
Ok(self)
96+
}
97+
8498
/// The expression to filter on. This expression must evaluate to a boolean value.
8599
pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
86100
&self.predicate
@@ -90,6 +104,11 @@ impl FilterExec {
90104
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
91105
&self.input
92106
}
107+
108+
/// The default selectivity
109+
pub fn default_selectivity(&self) -> u8 {
110+
self.default_selectivity
111+
}
93112
}
94113

95114
impl DisplayAs for FilterExec {
@@ -166,6 +185,10 @@ impl ExecutionPlan for FilterExec {
166185
mut children: Vec<Arc<dyn ExecutionPlan>>,
167186
) -> Result<Arc<dyn ExecutionPlan>> {
168187
FilterExec::try_new(self.predicate.clone(), children.swap_remove(0))
188+
.and_then(|e| {
189+
let selectivity = e.default_selectivity();
190+
e.with_default_selectivity(selectivity)
191+
})
169192
.map(|e| Arc::new(e) as _)
170193
}
171194

@@ -196,10 +219,7 @@ impl ExecutionPlan for FilterExec {
196219
let input_stats = self.input.statistics()?;
197220
let schema = self.schema();
198221
if !check_support(predicate, &schema) {
199-
// assume filter selects 20% of rows if we cannot do anything smarter
200-
// tracking issue for making this configurable:
201-
// https://github.com/apache/arrow-datafusion/issues/8133
202-
let selectivity = 0.2_f64;
222+
let selectivity = self.default_selectivity as f64 / 100.0;
203223
let mut stats = input_stats.into_inexact();
204224
stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
205225
stats.total_byte_size = stats
@@ -987,4 +1007,54 @@ mod tests {
9871007

9881008
Ok(())
9891009
}
1010+
1011+
#[tokio::test]
1012+
async fn test_validation_filter_selectivity() -> Result<()> {
1013+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1014+
let input = Arc::new(StatisticsExec::new(
1015+
Statistics::new_unknown(&schema),
1016+
schema,
1017+
));
1018+
// WHERE a = 10
1019+
let predicate = Arc::new(BinaryExpr::new(
1020+
Arc::new(Column::new("a", 0)),
1021+
Operator::Eq,
1022+
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1023+
));
1024+
let filter = FilterExec::try_new(predicate, input)?;
1025+
assert!(filter.with_default_selectivity(120).is_err());
1026+
Ok(())
1027+
}
1028+
1029+
#[tokio::test]
1030+
async fn test_custom_filter_selectivity() -> Result<()> {
1031+
// Need a decimal to trigger inexact selectivity
1032+
let schema =
1033+
Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
1034+
let input = Arc::new(StatisticsExec::new(
1035+
Statistics {
1036+
num_rows: Precision::Inexact(1000),
1037+
total_byte_size: Precision::Inexact(4000),
1038+
column_statistics: vec![ColumnStatistics {
1039+
..Default::default()
1040+
}],
1041+
},
1042+
schema,
1043+
));
1044+
// WHERE a = 10
1045+
let predicate = Arc::new(BinaryExpr::new(
1046+
Arc::new(Column::new("a", 0)),
1047+
Operator::Eq,
1048+
Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
1049+
));
1050+
let filter = FilterExec::try_new(predicate, input)?;
1051+
let statistics = filter.statistics()?;
1052+
assert_eq!(statistics.num_rows, Precision::Inexact(200));
1053+
assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
1054+
let filter = filter.with_default_selectivity(40)?;
1055+
let statistics = filter.statistics()?;
1056+
assert_eq!(statistics.num_rows, Precision::Inexact(400));
1057+
assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
1058+
Ok(())
1059+
}
9901060
}

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,6 +1368,7 @@ message PhysicalNegativeNode {
13681368
message FilterExecNode {
13691369
PhysicalPlanNode input = 1;
13701370
PhysicalExprNode expr = 2;
1371+
uint32 default_filter_selectivity = 3;
13711372
}
13721373

13731374
message FileGroup {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,16 @@ impl AsExecutionPlan for PhysicalPlanNode {
158158
.to_owned(),
159159
)
160160
})?;
161-
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
161+
let filter_selectivity = filter.default_filter_selectivity.try_into();
162+
let filter = FilterExec::try_new(predicate, input)?;
163+
match filter_selectivity {
164+
Ok(filter_selectivity) => Ok(Arc::new(
165+
filter.with_default_selectivity(filter_selectivity)?,
166+
)),
167+
Err(_) => Err(DataFusionError::Internal(
168+
"filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
169+
)),
170+
}
162171
}
163172
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
164173
parse_protobuf_file_scan_config(
@@ -988,6 +997,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
988997
protobuf::FilterExecNode {
989998
input: Some(Box::new(input)),
990999
expr: Some(exec.predicate().clone().try_into()?),
1000+
default_filter_selectivity: exec.default_selectivity() as u32,
9911001
},
9921002
))),
9931003
});

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ datafusion.explain.logical_plan_only false
188188
datafusion.explain.physical_plan_only false
189189
datafusion.explain.show_statistics false
190190
datafusion.optimizer.allow_symmetric_joins_without_pruning true
191+
datafusion.optimizer.default_filter_selectivity 20
191192
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
192193
datafusion.optimizer.enable_round_robin_repartition true
193194
datafusion.optimizer.enable_topk_aggregation true
@@ -261,6 +262,7 @@ datafusion.explain.logical_plan_only false When set to true, the explain stateme
261262
datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans
262263
datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans
263264
datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors.
265+
datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected).
264266
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
265267
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
266268
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
9999
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys |
100100
| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory |
101101
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
102+
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
102103
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
103104
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
104105
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |

0 commit comments

Comments
 (0)