From 96638f8778ea8cc07a8c1ff2d87c00c24e5adecb Mon Sep 17 00:00:00 2001 From: silezhou Date: Sat, 19 Apr 2025 13:15:25 +0000 Subject: [PATCH 01/10] feat: ORDER BY ALL --- datafusion/expr/src/expr.rs | 20 +++- datafusion/sql/src/expr/function.rs | 6 +- datafusion/sql/src/expr/order_by.rs | 116 +++++++++++-------- datafusion/sql/src/query.rs | 44 +++++-- datafusion/sql/src/select.rs | 6 +- datafusion/sql/src/statement.rs | 12 +- datafusion/sqllogictest/test_files/order.slt | 39 +++++++ 7 files changed, 173 insertions(+), 70 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 9f6855b69824..1168e46e37a9 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -39,7 +39,7 @@ use datafusion_common::{ use datafusion_functions_window_common::field::WindowUDFFieldArgs; use sqlparser::ast::{ display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem, - NullTreatment, RenameSelectItem, ReplaceSelectElement, + NullTreatment, OrderByExpr, OrderByOptions, RenameSelectItem, ReplaceSelectElement, }; /// Represents logical expressions such as `A + 1`, or `CAST(c1 AS int)`. @@ -701,6 +701,24 @@ impl TryCast { } } +/// OrderBy Expressions +pub enum OrderByExprs { + OrderByExprVec(Vec), + All { + exprs: Vec, + options: OrderByOptions, + }, +} + +impl OrderByExprs { + pub fn is_empty(&self) -> bool { + match self { + OrderByExprs::OrderByExprVec(exprs) => exprs.is_empty(), + OrderByExprs::All { exprs, .. } => exprs.is_empty(), + } + } +} + /// SORT expression #[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] pub struct Sort { diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 436f4388d8a3..dce60644576b 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -22,7 +22,7 @@ use datafusion_common::{ internal_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema, Dependency, Diagnostic, Result, Span, }; -use datafusion_expr::expr::{ScalarFunction, Unnest, WildcardOptions}; +use datafusion_expr::expr::{OrderByExprs, ScalarFunction, Unnest, WildcardOptions}; use datafusion_expr::planner::{PlannerResult, RawAggregateExpr, RawWindowExpr}; use datafusion_expr::{ expr, Expr, ExprFunctionExt, ExprSchemable, WindowFrame, WindowFunctionDefinition, @@ -276,7 +276,7 @@ impl SqlToRel<'_, S> { .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) .collect::>>()?; let mut order_by = self.order_by_to_sort_expr( - window.order_by, + OrderByExprs::OrderByExprVec(window.order_by), schema, planner_context, // Numeric literals in window function ORDER BY are treated as constants @@ -357,7 +357,7 @@ impl SqlToRel<'_, S> { // User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function if let Some(fm) = self.context_provider.get_aggregate_meta(&name) { let order_by = self.order_by_to_sort_expr( - order_by, + OrderByExprs::OrderByExprVec(order_by), schema, planner_context, true, diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index cce3f3004809..ef055b510736 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -19,7 +19,7 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{ not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, Result, }; -use datafusion_expr::expr::Sort; +use datafusion_expr::expr::{OrderByExprs, Sort}; use datafusion_expr::{Expr, SortExpr}; use sqlparser::ast::{ Expr as SQLExpr, OrderByExpr, OrderByOptions, Value, ValueWithSpan, @@ -41,16 +41,12 @@ impl SqlToRel<'_, S> { /// If false, interpret numeric literals as constant values. pub(crate) fn order_by_to_sort_expr( &self, - exprs: Vec, + order_by_exprs: OrderByExprs, input_schema: &DFSchema, planner_context: &mut PlannerContext, literal_to_column: bool, additional_schema: Option<&DFSchema>, ) -> Result> { - if exprs.is_empty() { - return Ok(vec![]); - } - let mut combined_schema; let order_by_schema = match additional_schema { Some(schema) => { @@ -61,56 +57,78 @@ impl SqlToRel<'_, S> { None => input_schema, }; - let mut expr_vec = vec![]; - for e in exprs { - let OrderByExpr { - expr, - options: OrderByOptions { asc, nulls_first }, - with_fill, - } = e; + if order_by_exprs.is_empty() { + return Ok(vec![]); + } - if let Some(with_fill) = with_fill { - return not_impl_err!("ORDER BY WITH FILL is not supported: {with_fill}"); - } + let mut sort_expr_vec = vec![]; - let expr = match expr { - SQLExpr::Value(ValueWithSpan { - value: Value::Number(v, _), - span: _, - }) if literal_to_column => { - let field_index = v - .parse::() - .map_err(|err| plan_datafusion_err!("{}", err))?; + let make_sort_expr = + |expr: Expr, asc: Option, nulls_first: Option| { + let asc = asc.unwrap_or(true); + // When asc is true, by default nulls last to be consistent with postgres + // postgres rule: https://www.postgresql.org/docs/current/queries-order.html + let nulls_first = nulls_first.unwrap_or(!asc); + Sort::new(expr, asc, nulls_first) + }; - if field_index == 0 { - return plan_err!( - "Order by index starts at 1 for column indexes" - ); - } else if input_schema.fields().len() < field_index { - return plan_err!( - "Order by column out of bounds, specified: {}, max: {}", - field_index, - input_schema.fields().len() + match order_by_exprs { + OrderByExprs::OrderByExprVec(expressions) => { + for e in expressions { + let OrderByExpr { + expr, + options: OrderByOptions { asc, nulls_first }, + with_fill, + } = e; + + if let Some(with_fill) = with_fill { + return not_impl_err!( + "ORDER BY WITH FILL is not supported: {with_fill}" ); } - Expr::Column(Column::from( - input_schema.qualified_field(field_index - 1), - )) + let expr = match expr { + SQLExpr::Value(ValueWithSpan { + value: Value::Number(v, _), + span: _, + }) if literal_to_column => { + let field_index = v + .parse::() + .map_err(|err| plan_datafusion_err!("{}", err))?; + + if field_index == 0 { + return plan_err!( + "Order by index starts at 1 for column indexes" + ); + } else if input_schema.fields().len() < field_index { + return plan_err!( + "Order by column out of bounds, specified: {}, max: {}", + field_index, + input_schema.fields().len() + ); + } + + Expr::Column(Column::from( + input_schema.qualified_field(field_index - 1), + )) + } + e => self.sql_expr_to_logical_expr( + e, + order_by_schema, + planner_context, + )?, + }; + sort_expr_vec.push(make_sort_expr(expr, asc, nulls_first)); } - e => { - self.sql_expr_to_logical_expr(e, order_by_schema, planner_context)? + } + OrderByExprs::All { exprs, options } => { + let OrderByOptions { asc, nulls_first } = options; + for expr in exprs { + sort_expr_vec.push(make_sort_expr(expr, asc, nulls_first)); } - }; - let asc = asc.unwrap_or(true); - expr_vec.push(Sort::new( - expr, - asc, - // When asc is true, by default nulls last to be consistent with postgres - // postgres rule: https://www.postgresql.org/docs/current/queries-order.html - nulls_first.unwrap_or(!asc), - )) - } - Ok(expr_vec) + } + }; + + Ok(sort_expr_vec) } } diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index ea641320c01b..6d84370624ce 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -21,14 +21,14 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::stack::StackGuard; use datafusion_common::{not_impl_err, Constraints, DFSchema, Result}; -use datafusion_expr::expr::Sort; -use datafusion_expr::select_expr::SelectExpr; +use datafusion_expr::expr::{OrderByExprs, Sort}; + use datafusion_expr::{ - CreateMemoryTable, DdlStatement, Distinct, LogicalPlan, LogicalPlanBuilder, + CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; use sqlparser::ast::{ - Expr as SQLExpr, Offset as SQLOffset, OrderBy, OrderByExpr, OrderByKind, Query, - SelectInto, SetExpr, + Expr as SQLExpr, Offset as SQLOffset, OrderBy, OrderByKind, Query, SelectInto, + SetExpr, }; impl SqlToRel<'_, S> { @@ -151,24 +151,46 @@ impl SqlToRel<'_, S> { } /// Returns the order by expressions from the query. -fn to_order_by_exprs(order_by: Option) -> Result> { +fn to_order_by_exprs(order_by: Option) -> Result { to_order_by_exprs_with_select(order_by, None) } /// Returns the order by expressions from the query with the select expressions. pub(crate) fn to_order_by_exprs_with_select( order_by: Option, - _select_exprs: Option<&Vec>, // TODO: ORDER BY ALL -) -> Result> { + select_exprs: Option<&Vec>, +) -> Result { let Some(OrderBy { kind, interpolate }) = order_by else { // If no order by, return an empty array. - return Ok(vec![]); + return Ok(OrderByExprs::OrderByExprVec(vec![])); }; if let Some(_interpolate) = interpolate { return not_impl_err!("ORDER BY INTERPOLATE is not supported"); } match kind { - OrderByKind::All(_) => not_impl_err!("ORDER BY ALL is not supported"), - OrderByKind::Expressions(order_by_exprs) => Ok(order_by_exprs), + OrderByKind::All(order_by_options) => { + let Some(exprs) = select_exprs else { + return Ok(OrderByExprs::All { + exprs: vec![], + options: order_by_options, + }); + }; + + let order_by_epxrs = exprs + .iter() + .filter_map(|select_expr| match select_expr { + Expr::Column(_) => Some(select_expr.clone()), + _ => None, + }) + .collect::>(); + + Ok(OrderByExprs::All { + exprs: order_by_epxrs, + options: order_by_options, + }) + } + OrderByKind::Expressions(order_by_exprs) => { + Ok(OrderByExprs::OrderByExprVec(order_by_exprs)) + } } } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 33994b60b735..c8837c947723 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -94,13 +94,13 @@ impl SqlToRel<'_, S> { planner_context, )?; - let order_by = - to_order_by_exprs_with_select(query_order_by, Some(&select_exprs))?; - // Having and group by clause may reference aliases defined in select projection let projected_plan = self.project(base_plan.clone(), select_exprs)?; let select_exprs = projected_plan.expressions(); + let order_by = + to_order_by_exprs_with_select(query_order_by, Some(&select_exprs))?; + // Place the fields of the base plan at the front so that when there are references // with the same name, the fields of the base plan will be searched first. // See https://github.com/apache/datafusion/issues/9162 diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 1f1c235fee6f..0aa6ede89493 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -39,6 +39,7 @@ use datafusion_common::{ ToDFSchema, }; use datafusion_expr::dml::{CopyTo, InsertOp}; +use datafusion_expr::expr::OrderByExprs; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; use datafusion_expr::logical_plan::builder::project; use datafusion_expr::logical_plan::DdlStatement; @@ -1240,7 +1241,7 @@ impl SqlToRel<'_, S> { .to_dfschema_ref()?; let using: Option = using.as_ref().map(ident_to_string); let columns = self.order_by_to_sort_expr( - columns, + OrderByExprs::OrderByExprVec(columns), &table_schema, planner_context, false, @@ -1423,8 +1424,13 @@ impl SqlToRel<'_, S> { let mut all_results = vec![]; for expr in order_exprs { // Convert each OrderByExpr to a SortExpr: - let expr_vec = - self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?; + let expr_vec = self.order_by_to_sort_expr( + OrderByExprs::OrderByExprVec(expr), + schema, + planner_context, + true, + None, + )?; // Verify that columns of all SortExprs exist in the schema: for sort in expr_vec.iter() { for column in sort.expr.column_refs().iter() { diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 4e8be56f3377..7d9bea3d2b6f 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1380,3 +1380,42 @@ physical_plan statement ok drop table table_with_ordered_not_null; + +# ORDER BY ALL +statement ok +set datafusion.sql_parser.dialect = 'DuckDB'; + +statement ok +CREATE OR REPLACE TABLE addresses AS + SELECT '123 Quack Blvd' AS address, 'DuckTown' AS city, '11111' AS zip + UNION ALL + SELECT '111 Duck Duck Goose Ln', 'DuckTown', '11111' + UNION ALL + SELECT '111 Duck Duck Goose Ln', 'Duck Town', '11111' + UNION ALL + SELECT '111 Duck Duck Goose Ln', 'Duck Town', '11111-0001'; + + +query TTT +SELECT * FROM addresses ORDER BY ALL; +---- +111 Duck Duck Goose Ln Duck Town 11111 +111 Duck Duck Goose Ln Duck Town 11111-0001 +111 Duck Duck Goose Ln DuckTown 11111 +123 Quack Blvd DuckTown 11111 + +query TTT +SELECT * FROM addresses ORDER BY ALL DESC; +---- +123 Quack Blvd DuckTown 11111 +111 Duck Duck Goose Ln DuckTown 11111 +111 Duck Duck Goose Ln Duck Town 11111-0001 +111 Duck Duck Goose Ln Duck Town 11111 + +query TT +SELECT address, zip FROM addresses ORDER BY ALL; +---- +111 Duck Duck Goose Ln 11111 +111 Duck Duck Goose Ln 11111 +111 Duck Duck Goose Ln 11111-0001 +123 Quack Blvd 11111 \ No newline at end of file From b2d48a2a5539d5e830489a0fd93a0596554d7dcd Mon Sep 17 00:00:00 2001 From: SileZhou Date: Sat, 3 May 2025 16:10:01 +0000 Subject: [PATCH 02/10] refactor: orderyby all --- datafusion/expr/src/expr.rs | 20 +------ datafusion/sql/src/expr/function.rs | 6 +- datafusion/sql/src/expr/order_by.rs | 90 ++++++++++++----------------- datafusion/sql/src/query.rs | 41 +++++++------ datafusion/sql/src/statement.rs | 12 +--- 5 files changed, 65 insertions(+), 104 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 9f11a1b3e728..b8e4204a9c9e 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -39,7 +39,7 @@ use datafusion_common::{ use datafusion_functions_window_common::field::WindowUDFFieldArgs; use sqlparser::ast::{ display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem, - NullTreatment, OrderByExpr, OrderByOptions, RenameSelectItem, ReplaceSelectElement, + NullTreatment, RenameSelectItem, ReplaceSelectElement, }; /// Represents logical expressions such as `A + 1`, or `CAST(c1 AS int)`. @@ -701,24 +701,6 @@ impl TryCast { } } -/// OrderBy Expressions -pub enum OrderByExprs { - OrderByExprVec(Vec), - All { - exprs: Vec, - options: OrderByOptions, - }, -} - -impl OrderByExprs { - pub fn is_empty(&self) -> bool { - match self { - OrderByExprs::OrderByExprVec(exprs) => exprs.is_empty(), - OrderByExprs::All { exprs, .. } => exprs.is_empty(), - } - } -} - /// SORT expression #[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] pub struct Sort { diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 4effa47905e9..535300427ad8 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -22,7 +22,7 @@ use datafusion_common::{ internal_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema, Dependency, Diagnostic, Result, Span, }; -use datafusion_expr::expr::{OrderByExprs, ScalarFunction, Unnest, WildcardOptions}; +use datafusion_expr::expr::{ScalarFunction, Unnest, WildcardOptions}; use datafusion_expr::planner::{PlannerResult, RawAggregateExpr, RawWindowExpr}; use datafusion_expr::{ expr, Expr, ExprFunctionExt, ExprSchemable, WindowFrame, WindowFunctionDefinition, @@ -291,7 +291,7 @@ impl SqlToRel<'_, S> { .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) .collect::>>()?; let mut order_by = self.order_by_to_sort_expr( - OrderByExprs::OrderByExprVec(window.order_by), + window.order_by, schema, planner_context, // Numeric literals in window function ORDER BY are treated as constants @@ -405,7 +405,7 @@ impl SqlToRel<'_, S> { (!within_group.is_empty()).then_some(within_group) } else { let order_by = self.order_by_to_sort_expr( - OrderByExprs::OrderByExprVec(order_by), + order_by, schema, planner_context, true, diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index ef055b510736..c77be9cac585 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -19,7 +19,7 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{ not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, Result, }; -use datafusion_expr::expr::{OrderByExprs, Sort}; +use datafusion_expr::expr::Sort; use datafusion_expr::{Expr, SortExpr}; use sqlparser::ast::{ Expr as SQLExpr, OrderByExpr, OrderByOptions, Value, ValueWithSpan, @@ -41,7 +41,7 @@ impl SqlToRel<'_, S> { /// If false, interpret numeric literals as constant values. pub(crate) fn order_by_to_sort_expr( &self, - order_by_exprs: OrderByExprs, + order_by_exprs: Vec, input_schema: &DFSchema, planner_context: &mut PlannerContext, literal_to_column: bool, @@ -72,62 +72,48 @@ impl SqlToRel<'_, S> { Sort::new(expr, asc, nulls_first) }; - match order_by_exprs { - OrderByExprs::OrderByExprVec(expressions) => { - for e in expressions { - let OrderByExpr { - expr, - options: OrderByOptions { asc, nulls_first }, - with_fill, - } = e; + for order_by_expr in order_by_exprs { + let OrderByExpr { + expr, + options: OrderByOptions { asc, nulls_first }, + with_fill, + } = order_by_expr; - if let Some(with_fill) = with_fill { - return not_impl_err!( - "ORDER BY WITH FILL is not supported: {with_fill}" - ); - } + if let Some(with_fill) = with_fill { + return not_impl_err!("ORDER BY WITH FILL is not supported: {with_fill}"); + } - let expr = match expr { - SQLExpr::Value(ValueWithSpan { - value: Value::Number(v, _), - span: _, - }) if literal_to_column => { - let field_index = v - .parse::() - .map_err(|err| plan_datafusion_err!("{}", err))?; + let expr = match expr { + SQLExpr::Value(ValueWithSpan { + value: Value::Number(v, _), + span: _, + }) if literal_to_column => { + let field_index = v + .parse::() + .map_err(|err| plan_datafusion_err!("{}", err))?; - if field_index == 0 { - return plan_err!( - "Order by index starts at 1 for column indexes" - ); - } else if input_schema.fields().len() < field_index { - return plan_err!( - "Order by column out of bounds, specified: {}, max: {}", - field_index, - input_schema.fields().len() - ); - } + if field_index == 0 { + return plan_err!( + "Order by index starts at 1 for column indexes" + ); + } else if input_schema.fields().len() < field_index { + return plan_err!( + "Order by column out of bounds, specified: {}, max: {}", + field_index, + input_schema.fields().len() + ); + } - Expr::Column(Column::from( - input_schema.qualified_field(field_index - 1), - )) - } - e => self.sql_expr_to_logical_expr( - e, - order_by_schema, - planner_context, - )?, - }; - sort_expr_vec.push(make_sort_expr(expr, asc, nulls_first)); + Expr::Column(Column::from( + input_schema.qualified_field(field_index - 1), + )) } - } - OrderByExprs::All { exprs, options } => { - let OrderByOptions { asc, nulls_first } = options; - for expr in exprs { - sort_expr_vec.push(make_sort_expr(expr, asc, nulls_first)); + e => { + self.sql_expr_to_logical_expr(e, order_by_schema, planner_context)? } - } - }; + }; + sort_expr_vec.push(make_sort_expr(expr, asc, nulls_first)); + } Ok(sort_expr_vec) } diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 6d84370624ce..85140686fa43 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -21,15 +21,16 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::stack::StackGuard; use datafusion_common::{not_impl_err, Constraints, DFSchema, Result}; -use datafusion_expr::expr::{OrderByExprs, Sort}; +use datafusion_expr::expr::Sort; use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; use sqlparser::ast::{ - Expr as SQLExpr, Offset as SQLOffset, OrderBy, OrderByKind, Query, SelectInto, - SetExpr, + Expr as SQLExpr, Ident, Offset as SQLOffset, OrderBy, OrderByExpr, OrderByKind, + Query, SelectInto, SetExpr, }; +use sqlparser::tokenizer::Span; impl SqlToRel<'_, S> { /// Generate a logical plan from an SQL query/subquery @@ -151,7 +152,7 @@ impl SqlToRel<'_, S> { } /// Returns the order by expressions from the query. -fn to_order_by_exprs(order_by: Option) -> Result { +fn to_order_by_exprs(order_by: Option) -> Result> { to_order_by_exprs_with_select(order_by, None) } @@ -159,10 +160,10 @@ fn to_order_by_exprs(order_by: Option) -> Result { pub(crate) fn to_order_by_exprs_with_select( order_by: Option, select_exprs: Option<&Vec>, -) -> Result { +) -> Result> { let Some(OrderBy { kind, interpolate }) = order_by else { // If no order by, return an empty array. - return Ok(OrderByExprs::OrderByExprVec(vec![])); + return Ok(vec![]); }; if let Some(_interpolate) = interpolate { return not_impl_err!("ORDER BY INTERPOLATE is not supported"); @@ -170,27 +171,25 @@ pub(crate) fn to_order_by_exprs_with_select( match kind { OrderByKind::All(order_by_options) => { let Some(exprs) = select_exprs else { - return Ok(OrderByExprs::All { - exprs: vec![], - options: order_by_options, - }); + return Ok(vec![]); }; - - let order_by_epxrs = exprs + let order_by_exprs = exprs .iter() .filter_map(|select_expr| match select_expr { - Expr::Column(_) => Some(select_expr.clone()), + Expr::Column(column) => Some(OrderByExpr { + expr: SQLExpr::Identifier(Ident { + value: column.name.clone(), + quote_style: None, + span: Span::empty(), + }), + options: order_by_options.clone(), + with_fill: None, + }), _ => None, }) .collect::>(); - - Ok(OrderByExprs::All { - exprs: order_by_epxrs, - options: order_by_options, - }) - } - OrderByKind::Expressions(order_by_exprs) => { - Ok(OrderByExprs::OrderByExprVec(order_by_exprs)) + Ok(order_by_exprs) } + OrderByKind::Expressions(order_by_exprs) => Ok(order_by_exprs), } } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 0aa6ede89493..1f1c235fee6f 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -39,7 +39,6 @@ use datafusion_common::{ ToDFSchema, }; use datafusion_expr::dml::{CopyTo, InsertOp}; -use datafusion_expr::expr::OrderByExprs; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; use datafusion_expr::logical_plan::builder::project; use datafusion_expr::logical_plan::DdlStatement; @@ -1241,7 +1240,7 @@ impl SqlToRel<'_, S> { .to_dfschema_ref()?; let using: Option = using.as_ref().map(ident_to_string); let columns = self.order_by_to_sort_expr( - OrderByExprs::OrderByExprVec(columns), + columns, &table_schema, planner_context, false, @@ -1424,13 +1423,8 @@ impl SqlToRel<'_, S> { let mut all_results = vec![]; for expr in order_exprs { // Convert each OrderByExpr to a SortExpr: - let expr_vec = self.order_by_to_sort_expr( - OrderByExprs::OrderByExprVec(expr), - schema, - planner_context, - true, - None, - )?; + let expr_vec = + self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?; // Verify that columns of all SortExprs exist in the schema: for sort in expr_vec.iter() { for column in sort.expr.column_refs().iter() { From 86526fd506e28712e0244be938276d2959f3a394 Mon Sep 17 00:00:00 2001 From: SileZhou Date: Sun, 4 May 2025 15:38:35 +0000 Subject: [PATCH 03/10] refactor: order_by_to_sort_expr --- datafusion/sql/src/expr/order_by.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index c77be9cac585..d357c3753e13 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -47,6 +47,10 @@ impl SqlToRel<'_, S> { literal_to_column: bool, additional_schema: Option<&DFSchema>, ) -> Result> { + if order_by_exprs.is_empty() { + return Ok(vec![]); + } + let mut combined_schema; let order_by_schema = match additional_schema { Some(schema) => { @@ -57,11 +61,7 @@ impl SqlToRel<'_, S> { None => input_schema, }; - if order_by_exprs.is_empty() { - return Ok(vec![]); - } - - let mut sort_expr_vec = vec![]; + let mut sort_expr_vec = Vec::with_capacity(order_by_exprs.len()); let make_sort_expr = |expr: Expr, asc: Option, nulls_first: Option| { From e9a5038312f62df6337f71f498393e51e6d640ff Mon Sep 17 00:00:00 2001 From: SileZhou Date: Mon, 5 May 2025 14:00:05 +0000 Subject: [PATCH 04/10] refactor: TODO comment --- datafusion/sql/src/query.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 85140686fa43..f42a3ad138c4 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -175,8 +175,8 @@ pub(crate) fn to_order_by_exprs_with_select( }; let order_by_exprs = exprs .iter() - .filter_map(|select_expr| match select_expr { - Expr::Column(column) => Some(OrderByExpr { + .map(|select_expr| match select_expr { + Expr::Column(column) => Ok(OrderByExpr { expr: SQLExpr::Identifier(Ident { value: column.name.clone(), quote_style: None, @@ -185,9 +185,12 @@ pub(crate) fn to_order_by_exprs_with_select( options: order_by_options.clone(), with_fill: None, }), - _ => None, + // TODO: Support other types of expressions + _ => not_impl_err!( + "ORDER BY ALL is not supported for non-column expressions" + ), }) - .collect::>(); + .collect::>>()?; Ok(order_by_exprs) } OrderByKind::Expressions(order_by_exprs) => Ok(order_by_exprs), From 6fc11229cdeb08fa413854174a9ef9d37de1686c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 5 May 2025 02:17:39 -0500 Subject: [PATCH 05/10] fix query results for predicates referencing partition columns and data columns (#15935) * fix query results for predicates referencing partition columns and data columns * fmt * add e2e test * newline --- .../datasource-parquet/src/file_format.rs | 8 +-- .../datasource-parquet/src/row_filter.rs | 53 +++------------ .../test_files/parquet_filter_pushdown.slt | 67 +++++++++++++++++++ 3 files changed, 79 insertions(+), 49 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index a0ca46422786..e1d393caa8f3 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -464,16 +464,16 @@ impl FileFormat for ParquetFormat { fn supports_filters_pushdown( &self, file_schema: &Schema, - table_schema: &Schema, + _table_schema: &Schema, filters: &[&Expr], ) -> Result { if !self.options().global.pushdown_filters { return Ok(FilePushdownSupport::NoSupport); } - let all_supported = filters.iter().all(|filter| { - can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema) - }); + let all_supported = filters + .iter() + .all(|filter| can_expr_be_pushed_down_with_schemas(filter, file_schema)); Ok(if all_supported { FilePushdownSupport::Supported diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 2d2993c29a6f..d7bbe30c8943 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -299,6 +299,7 @@ struct PushdownChecker<'schema> { non_primitive_columns: bool, /// Does the expression reference any columns that are in the table /// schema but not in the file schema? + /// This includes partition columns and projected columns. projected_columns: bool, // Indices into the table schema of the columns required to evaluate the expression required_columns: BTreeSet, @@ -387,13 +388,12 @@ fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bo /// Otherwise, true. pub fn can_expr_be_pushed_down_with_schemas( expr: &datafusion_expr::Expr, - _file_schema: &Schema, - table_schema: &Schema, + file_schema: &Schema, ) -> bool { let mut can_be_pushed = true; expr.apply(|expr| match expr { datafusion_expr::Expr::Column(column) => { - can_be_pushed &= !would_column_prevent_pushdown(column.name(), table_schema); + can_be_pushed &= !would_column_prevent_pushdown(column.name(), file_schema); Ok(if can_be_pushed { TreeNodeRecursion::Jump } else { @@ -649,8 +649,6 @@ mod test { #[test] fn nested_data_structures_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![Field::new( "list_col", DataType::Struct(Fields::empty()), @@ -659,49 +657,31 @@ mod test { let expr = col("list_col").is_not_null(); - assert!(!can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } #[test] - fn projected_columns_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - + fn projected_or_partition_columns_prevent_pushdown() { let file_schema = Schema::new(vec![Field::new("existing_col", DataType::Int64, true)]); let expr = col("nonexistent_column").is_null(); - assert!(!can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } #[test] fn basic_expr_doesnt_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]); let expr = col("string_col").is_null(); - assert!(can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } #[test] fn complex_expr_doesnt_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![ Field::new("string_col", DataType::Utf8, true), Field::new("bigint_col", DataType::Int64, true), @@ -711,23 +691,6 @@ mod test { .is_not_null() .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5))))); - assert!(can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); - } - - fn get_basic_table_schema() -> Schema { - let testdata = datafusion_common::test_util::parquet_test_data(); - let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) - .expect("opening file"); - - let reader = SerializedFileReader::new(file).expect("creating reader"); - - let metadata = reader.metadata(); - - parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) - .expect("parsing schema") + assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } } diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 758113b70835..a32db2ff0524 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -156,3 +156,70 @@ DROP TABLE t; statement ok DROP TABLE t_pushdown; + +## Test filter pushdown with a predicate that references both a partition column and a file column +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +## Create table +statement ok +CREATE EXTERNAL TABLE t_pushdown(part text, val text) +STORED AS PARQUET +PARTITIONED BY (part) +LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/'; + +statement ok +COPY ( + SELECT arrow_cast('a', 'Utf8') AS val +) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet' +STORED AS PARQUET; + +statement ok +COPY ( + SELECT arrow_cast('b', 'Utf8') AS val +) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet' +STORED AS PARQUET; + +statement ok +COPY ( + SELECT arrow_cast('xyz', 'Utf8') AS val +) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet' +STORED AS PARQUET; + +query TT +select * from t_pushdown where part == val +---- +a a +b b + +query TT +select * from t_pushdown where part != val +---- +xyz c + +# If we reference both a file and partition column the predicate cannot be pushed down +query TT +EXPLAIN select * from t_pushdown where part != val +---- +logical_plan +01)Filter: t_pushdown.val != t_pushdown.part +02)--TableScan: t_pushdown projection=[val, part], partial_filters=[t_pushdown.val != t_pushdown.part] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: val@0 != part@1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != part@1 + +# If we reference only a partition column it gets evaluted during the listing phase +query TT +EXPLAIN select * from t_pushdown where part != 'a'; +---- +logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part != Utf8("a")] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet + +# And if we reference only a file column it gets pushed down +query TT +EXPLAIN select * from t_pushdown where val != 'c'; +---- +logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.val != Utf8("c")] +physical_plan DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)] From a9f758a89e3e6223e0e1534b1d8e169cb8c444ae Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 5 May 2025 17:57:10 +0800 Subject: [PATCH 06/10] chore(deps): bump substrait from 0.55.0 to 0.55.1 (#15941) Bumps [substrait](https://github.com/substrait-io/substrait-rs) from 0.55.0 to 0.55.1. - [Release notes](https://github.com/substrait-io/substrait-rs/releases) - [Changelog](https://github.com/substrait-io/substrait-rs/blob/main/CHANGELOG.md) - [Commits](https://github.com/substrait-io/substrait-rs/compare/v0.55.0...v0.55.1) --- updated-dependencies: - dependency-name: substrait dependency-version: 0.55.1 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c746b2c14e39..fcac5b1f75f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4741,9 +4741,9 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.31" +version = "0.2.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5316f57387668042f561aae71480de936257848f9c43ce528e311d89a07cadeb" +checksum = "664ec5419c51e34154eec046ebcba56312d5a2fc3b09a06da188e1ad21afadf6" dependencies = [ "proc-macro2", "syn 2.0.100", @@ -6056,9 +6056,9 @@ dependencies = [ [[package]] name = "substrait" -version = "0.55.0" +version = "0.55.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3a359aeb711c1e1944c0c4178bbb2d679d39237ac5bfe28f7e0506e522e5ce6" +checksum = "048fe52a3664881ccdfdc9bdb0f4e8805f3444ee64abf299d365c54f6a2ffabb" dependencies = [ "heck 0.5.0", "pbjson", From 02cd396155c5536fcc73e70cafe0f78d7bc4d76f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9mie=20Drouet?= Date: Mon, 5 May 2025 12:53:47 +0200 Subject: [PATCH 07/10] feat: create helpers to set the max_temp_directory_size (#15919) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: create helpers to set the max_temp_directory_size Signed-off-by: Jérémie Drouet * refactor: use helper in cli Signed-off-by: Jérémie Drouet * refactor: update error message Signed-off-by: Jérémie Drouet * refactor: use setter in tests Signed-off-by: Jérémie Drouet --------- Signed-off-by: Jérémie Drouet Co-authored-by: Andrew Lamb --- datafusion-cli/src/main.rs | 11 +++++----- datafusion/core/tests/memory_limit/mod.rs | 8 +++---- datafusion/execution/src/disk_manager.rs | 26 ++++++++++++++++++++--- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index fd1b11126230..2971a6191b82 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -177,13 +177,14 @@ async fn main_inner() -> Result<()> { // set disk limit if let Some(disk_limit) = args.disk_limit { - let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; + let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - let disk_manager = Arc::try_unwrap(disk_manager) - .expect("DiskManager should be a single instance") - .with_max_temp_directory_size(disk_limit.try_into().unwrap())?; + DiskManager::set_arc_max_temp_directory_size( + &mut disk_manager, + disk_limit.try_into().unwrap(), + )?; - let disk_config = DiskManagerConfig::new_existing(Arc::new(disk_manager)); + let disk_config = DiskManagerConfig::new_existing(disk_manager); rt_builder = rt_builder.with_disk_manager(disk_config); } diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 01342d1604fc..c8beedb8c2c9 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -534,11 +534,9 @@ async fn setup_context( disk_limit: u64, memory_pool_limit: usize, ) -> Result { - let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; + let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - let disk_manager = Arc::try_unwrap(disk_manager) - .expect("DiskManager should be a single instance") - .with_max_temp_directory_size(disk_limit)?; + DiskManager::set_arc_max_temp_directory_size(&mut disk_manager, disk_limit)?; let runtime = RuntimeEnvBuilder::new() .with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit))) @@ -547,7 +545,7 @@ async fn setup_context( let runtime = Arc::new(RuntimeEnv { memory_pool: runtime.memory_pool.clone(), - disk_manager: Arc::new(disk_manager), + disk_manager, cache_manager: runtime.cache_manager.clone(), object_store_registry: runtime.object_store_registry.clone(), }); diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 2b21a6dbf175..555b44ad5ceb 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -120,10 +120,10 @@ impl DiskManager { } } - pub fn with_max_temp_directory_size( - mut self, + pub fn set_max_temp_directory_size( + &mut self, max_temp_directory_size: u64, - ) -> Result { + ) -> Result<()> { // If the disk manager is disabled and `max_temp_directory_size` is not 0, // this operation is not meaningful, fail early. if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 { @@ -133,6 +133,26 @@ impl DiskManager { } self.max_temp_directory_size = max_temp_directory_size; + Ok(()) + } + + pub fn set_arc_max_temp_directory_size( + this: &mut Arc, + max_temp_directory_size: u64, + ) -> Result<()> { + if let Some(inner) = Arc::get_mut(this) { + inner.set_max_temp_directory_size(max_temp_directory_size)?; + Ok(()) + } else { + config_err!("DiskManager should be a single instance") + } + } + + pub fn with_max_temp_directory_size( + mut self, + max_temp_directory_size: u64, + ) -> Result { + self.set_max_temp_directory_size(max_temp_directory_size)?; Ok(self) } From 3f9839da7cab29606e8d99af874dbe35db122048 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 5 May 2025 19:06:05 +0800 Subject: [PATCH 08/10] Fix main CI (#15942) --- datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index a32db2ff0524..99fbb713646d 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -186,7 +186,7 @@ COPY ( ) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet' STORED AS PARQUET; -query TT +query TT rowsort select * from t_pushdown where part == val ---- a a From cd1bcaf9240fb96b5e7eb724549fae1c6baff479 Mon Sep 17 00:00:00 2001 From: Gabriel <45515538+gabotechs@users.noreply.github.com> Date: Mon, 5 May 2025 15:23:06 +0200 Subject: [PATCH 09/10] Improve sqllogictest error reporting (#15905) --- datafusion/sqllogictest/bin/sqllogictests.rs | 56 ++++++++++++++------ 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 21dfe2ee08f4..f8a4ec666572 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -31,8 +31,8 @@ use itertools::Itertools; use log::Level::Info; use log::{info, log_enabled}; use sqllogictest::{ - parse_file, strict_column_validator, AsyncDB, Condition, Normalizer, Record, - Validator, + parse_file, strict_column_validator, AsyncDB, Condition, MakeConnection, Normalizer, + Record, Validator, }; #[cfg(feature = "postgres")] @@ -50,6 +50,7 @@ const TEST_DIRECTORY: &str = "test_files/"; const DATAFUSION_TESTING_TEST_DIRECTORY: &str = "../../datafusion-testing/data/"; const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_"; const SQLITE_PREFIX: &str = "sqlite"; +const ERRS_PER_FILE_LIMIT: usize = 10; pub fn main() -> Result<()> { tokio::runtime::Builder::new_multi_thread() @@ -234,15 +235,45 @@ async fn run_test_file( runner.with_column_validator(strict_column_validator); runner.with_normalizer(value_normalizer); runner.with_validator(validator); + let result = run_file_in_runner(path, runner).await; + pb.finish_and_clear(); + result +} - let res = runner - .run_file_async(path) - .await - .map_err(|e| DataFusionError::External(Box::new(e))); +async fn run_file_in_runner>( + path: PathBuf, + mut runner: sqllogictest::Runner, +) -> Result<()> { + let path = path.canonicalize()?; + let records = + parse_file(&path).map_err(|e| DataFusionError::External(Box::new(e)))?; + let mut errs = vec![]; + for record in records.into_iter() { + if let Record::Halt { .. } = record { + break; + } + if let Err(err) = runner.run_async(record).await { + errs.push(format!("{err}")); + } + } - pb.finish_and_clear(); + if !errs.is_empty() { + let mut msg = format!("{} errors in file {}\n\n", errs.len(), path.display()); + for (i, err) in errs.iter().enumerate() { + if i >= ERRS_PER_FILE_LIMIT { + msg.push_str(&format!( + "... other {} errors in {} not shown ...\n\n", + errs.len() - ERRS_PER_FILE_LIMIT, + path.display() + )); + break; + } + msg.push_str(&format!("{}. {err}\n\n", i + 1)); + } + return Err(DataFusionError::External(msg.into())); + } - res + Ok(()) } fn get_record_count(path: &PathBuf, label: String) -> u64 { @@ -308,14 +339,9 @@ async fn run_test_file_with_postgres( runner.with_column_validator(strict_column_validator); runner.with_normalizer(value_normalizer); runner.with_validator(validator); - runner - .run_file_async(path) - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; - + let result = run_file_in_runner(path, runner).await; pb.finish_and_clear(); - - Ok(()) + result } #[cfg(not(feature = "postgres"))] From a32898388d8cbc7ce46d283c1bb4dbe73f41079b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 5 May 2025 15:40:15 +0200 Subject: [PATCH 10/10] refactor filter pushdown apis (#15801) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor filter pushdown apis * remove commented out code * fix tests * fail to fix bug * fix * add/fix docs * lint * add some docstrings, some minimal cleaup * review suggestions * add more comments * fix doc links * fmt * add comments * make test deterministic * add bench * fix bench * register bench * fix bench * cargo fmt --------- Co-authored-by: berkaysynnada Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> --- datafusion/core/Cargo.toml | 4 + datafusion/core/benches/push_down_filter.rs | 124 +++++++ .../physical_optimizer/push_down_filter.rs | 67 ++-- datafusion/datasource/src/file.rs | 16 +- datafusion/datasource/src/file_scan_config.rs | 57 ++-- datafusion/datasource/src/source.rs | 65 ++-- ...push_down_filter.rs => filter_pushdown.rs} | 298 ++++++++-------- datafusion/physical-optimizer/src/lib.rs | 2 +- .../physical-optimizer/src/optimizer.rs | 4 +- .../physical-plan/src/coalesce_batches.rs | 23 +- .../physical-plan/src/execution_plan.rs | 85 +++-- datafusion/physical-plan/src/filter.rs | 170 +++++++--- .../physical-plan/src/filter_pushdown.rs | 317 ++++++++++++++---- .../physical-plan/src/repartition/mod.rs | 22 +- .../test_files/parquet_filter_pushdown.slt | 6 +- 15 files changed, 845 insertions(+), 415 deletions(-) create mode 100644 datafusion/core/benches/push_down_filter.rs rename datafusion/physical-optimizer/src/{push_down_filter.rs => filter_pushdown.rs} (67%) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 3ace3e14ec25..4b6d8f274932 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -179,6 +179,10 @@ name = "csv_load" harness = false name = "distinct_query_sql" +[[bench]] +harness = false +name = "push_down_filter" + [[bench]] harness = false name = "sort_limit_query_sql" diff --git a/datafusion/core/benches/push_down_filter.rs b/datafusion/core/benches/push_down_filter.rs new file mode 100644 index 000000000000..92de1711a9e8 --- /dev/null +++ b/datafusion/core/benches/push_down_filter.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Field, Schema}; +use bytes::{BufMut, BytesMut}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion::config::ConfigOptions; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::ExecutionPlan; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::ObjectStore; +use parquet::arrow::ArrowWriter; +use std::sync::Arc; + +async fn create_plan() -> Arc { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + Field::new("age", DataType::UInt16, true), + Field::new("salary", DataType::Float64, true), + ])); + let batch = RecordBatch::new_empty(schema); + + let store = Arc::new(InMemory::new()) as Arc; + let mut out = BytesMut::new().writer(); + { + let mut writer = ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + let data = out.into_inner().freeze(); + store + .put(&Path::from("test.parquet"), data.into()) + .await + .unwrap(); + ctx.register_object_store( + ObjectStoreUrl::parse("memory://").unwrap().as_ref(), + store, + ); + + ctx.register_parquet("t", "memory:///", ParquetReadOptions::default()) + .await + .unwrap(); + + let df = ctx + .sql( + r" + WITH brackets AS ( + SELECT age % 10 AS age_bracket + FROM t + GROUP BY age % 10 + HAVING COUNT(*) > 10 + ) + SELECT id, name, age, salary + FROM t + JOIN brackets ON t.age % 10 = brackets.age_bracket + WHERE age > 20 AND t.salary > 1000 + ORDER BY t.salary DESC + LIMIT 100 + ", + ) + .await + .unwrap(); + + df.create_physical_plan().await.unwrap() +} + +#[derive(Clone)] +struct BenchmarkPlan { + plan: Arc, + config: ConfigOptions, +} + +impl std::fmt::Display for BenchmarkPlan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BenchmarkPlan") + } +} + +fn bench_push_down_filter(c: &mut Criterion) { + // Create a relatively complex plan + let plan = tokio::runtime::Runtime::new() + .unwrap() + .block_on(create_plan()); + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + let plan = BenchmarkPlan { plan, config }; + + c.bench_with_input( + BenchmarkId::new("push_down_filter", plan.clone()), + &plan, + |b, plan| { + b.iter(|| { + let optimizer = FilterPushdown::new(); + optimizer + .optimize(Arc::clone(&plan.plan), &plan.config) + .unwrap(); + }); + }, + ); +} + +criterion_group!(benches, bench_push_down_filter); +criterion_main!(benches); diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs b/datafusion/core/tests/physical_optimizer/push_down_filter.rs index b19144f1bcff..326a7b837e7a 100644 --- a/datafusion/core/tests/physical_optimizer/push_down_filter.rs +++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs @@ -44,11 +44,10 @@ use datafusion_physical_expr::{ aggregate::AggregateExprBuilder, conjunction, Partitioning, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; -use datafusion_physical_optimizer::push_down_filter::PushdownFilter; +use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, - FilterPushdownSupport, + FilterPushdownPropagation, PredicateSupports, }; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, @@ -154,29 +153,24 @@ impl FileSource for TestSource { fn try_pushdown_filters( &self, - mut fd: FilterDescription, + mut filters: Vec>, config: &ConfigOptions, - ) -> Result>> { + ) -> Result>> { if self.support && config.execution.parquet.pushdown_filters { if let Some(internal) = self.predicate.as_ref() { - fd.filters.push(Arc::clone(internal)); + filters.push(Arc::clone(internal)); } - let all_filters = fd.take_description(); - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions: vec![], - op: Arc::new(TestSource { - support: true, - predicate: Some(conjunction(all_filters)), - statistics: self.statistics.clone(), // should be updated in reality - }), - revisit: false, - }, - remaining_description: FilterDescription::empty(), + let new_node = Arc::new(TestSource { + support: true, + predicate: Some(conjunction(filters.clone())), + statistics: self.statistics.clone(), // should be updated in reality + }); + Ok(FilterPushdownPropagation { + filters: PredicateSupports::all_supported(filters), + updated_node: Some(new_node), }) } else { - Ok(filter_pushdown_not_supported(fd)) + Ok(FilterPushdownPropagation::unsupported(filters)) } } } @@ -201,7 +195,7 @@ fn test_pushdown_into_scan() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: @@ -225,7 +219,7 @@ fn test_pushdown_into_scan_with_config_options() { insta::assert_snapshot!( OptimizationTest::new( Arc::clone(&plan), - PushdownFilter {}, + FilterPushdown {}, false ), @r" @@ -244,7 +238,7 @@ fn test_pushdown_into_scan_with_config_options() { insta::assert_snapshot!( OptimizationTest::new( plan, - PushdownFilter {}, + FilterPushdown {}, true ), @r" @@ -269,7 +263,7 @@ fn test_filter_collapse() { let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: @@ -278,7 +272,7 @@ fn test_filter_collapse() { - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true output: Ok: - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar " ); } @@ -288,25 +282,28 @@ fn test_filter_with_projection() { let scan = test_scan(true); let projection = vec![1, 0]; let predicate = col_lit_predicate("a", "foo", schema()); - let plan = Arc::new( - FilterExec::try_new(predicate, Arc::clone(&scan)) + let filter = Arc::new( + FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&scan)) .unwrap() .with_projection(Some(projection)) .unwrap(), ); + let predicate = col_lit_predicate("b", "bar", &filter.schema()); + let plan = Arc::new(FilterExec::try_new(predicate, filter).unwrap()); // expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: - - FilterExec: a@0 = foo, projection=[b@1, a@0] - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + - FilterExec: b@0 = bar + - FilterExec: a@0 = foo, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true output: Ok: - ProjectionExec: expr=[b@1 as b, a@0 as a] - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar ", ); @@ -320,7 +317,7 @@ fn test_filter_with_projection() { .unwrap(), ); insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{},true), + OptimizationTest::new(plan, FilterPushdown{},true), @r" OptimizationTest: input: @@ -349,7 +346,7 @@ fn test_push_down_through_transparent_nodes() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{},true), + OptimizationTest::new(plan, FilterPushdown{},true), @r" OptimizationTest: input: @@ -362,7 +359,7 @@ fn test_push_down_through_transparent_nodes() { Ok: - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=0 - CoalesceBatchesExec: target_batch_size=1 - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar " ); } @@ -413,7 +410,7 @@ fn test_no_pushdown_through_aggregates() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 6bbe9e50d3b0..c9b5c416f0c0 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -28,10 +28,8 @@ use crate::file_stream::FileOpener; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{Result, Statistics}; -use datafusion_physical_expr::LexOrdering; -use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, -}; +use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; +use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -108,14 +106,14 @@ pub trait FileSource: Send + Sync { } /// Try to push down filters into this FileSource. - /// See [`ExecutionPlan::try_pushdown_filters`] for more details. + /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. /// - /// [`ExecutionPlan::try_pushdown_filters`]: datafusion_physical_plan::ExecutionPlan::try_pushdown_filters + /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result fn try_pushdown_filters( &self, - fd: FilterDescription, + filters: Vec>, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_not_supported(fd)) + ) -> Result>> { + Ok(FilterPushdownPropagation::unsupported(filters)) } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index fb756cc11fbb..ae94af5a7b26 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -48,14 +48,12 @@ use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::{ expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr, }; -use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, - FilterPushdownSupport, -}; +use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, metrics::ExecutionPlanMetricsSet, @@ -596,40 +594,29 @@ impl DataSource for FileScanConfig { fn try_pushdown_filters( &self, - fd: FilterDescription, + filters: Vec>, config: &ConfigOptions, - ) -> Result>> { - let FilterPushdownResult { - support, - remaining_description, - } = self.file_source.try_pushdown_filters(fd, config)?; - - match support { - FilterPushdownSupport::Supported { - child_descriptions, - op, - revisit, - } => { - let new_data_source = Arc::new( - FileScanConfigBuilder::from(self.clone()) - .with_source(op) - .build(), - ); - - debug_assert!(child_descriptions.is_empty()); - debug_assert!(!revisit); - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: new_data_source, - revisit, - }, - remaining_description, + ) -> Result>> { + let result = self.file_source.try_pushdown_filters(filters, config)?; + match result.updated_node { + Some(new_file_source) => { + let file_scan_config = FileScanConfigBuilder::new( + self.object_store_url.clone(), + Arc::clone(&self.file_schema), + new_file_source, + ) + .build(); + Ok(FilterPushdownPropagation { + filters: result.filters, + updated_node: Some(Arc::new(file_scan_config) as _), }) } - FilterPushdownSupport::NotSupported => { - Ok(filter_pushdown_not_supported(remaining_description)) + None => { + // If the file source does not support filter pushdown, return the original config + Ok(FilterPushdownPropagation { + filters: result.filters, + updated_node: None, + }) } } } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 022f77f2e421..cf42347e3aba 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -33,11 +33,10 @@ use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, - FilterPushdownSupport, + ChildPushdownResult, FilterPushdownPropagation, }; /// A source of data, typically a list of files or memory @@ -95,13 +94,15 @@ pub trait DataSource: Send + Sync + Debug { _projection: &ProjectionExec, ) -> Result>>; /// Try to push down filters into this DataSource. - /// See [`ExecutionPlan::try_pushdown_filters`] for more details. + /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. + /// + /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result fn try_pushdown_filters( &self, - fd: FilterDescription, + filters: Vec>, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_not_supported(fd)) + ) -> Result>> { + Ok(FilterPushdownPropagation::unsupported(filters)) } } @@ -237,39 +238,31 @@ impl ExecutionPlan for DataSourceExec { self.data_source.try_swapping_with_projection(projection) } - fn try_pushdown_filters( + fn handle_child_pushdown_result( &self, - fd: FilterDescription, + child_pushdown_result: ChildPushdownResult, config: &ConfigOptions, - ) -> Result>> { - let FilterPushdownResult { - support, - remaining_description, - } = self.data_source.try_pushdown_filters(fd, config)?; - - match support { - FilterPushdownSupport::Supported { - child_descriptions, - op, - revisit, - } => { - let new_exec = Arc::new(DataSourceExec::new(op)); - - debug_assert!(child_descriptions.is_empty()); - debug_assert!(!revisit); - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: new_exec, - revisit, - }, - remaining_description, + ) -> Result>> { + // Push any remaining filters into our data source + let res = self.data_source.try_pushdown_filters( + child_pushdown_result.parent_filters.collect_all(), + config, + )?; + match res.updated_node { + Some(data_source) => { + let mut new_node = self.clone(); + new_node.data_source = data_source; + new_node.cache = + Self::compute_properties(Arc::clone(&new_node.data_source)); + Ok(FilterPushdownPropagation { + filters: res.filters, + updated_node: Some(Arc::new(new_node)), }) } - FilterPushdownSupport::NotSupported => { - Ok(filter_pushdown_not_supported(remaining_description)) - } + None => Ok(FilterPushdownPropagation { + filters: res.filters, + updated_node: None, + }), } } } diff --git a/datafusion/physical-optimizer/src/push_down_filter.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs similarity index 67% rename from datafusion/physical-optimizer/src/push_down_filter.rs rename to datafusion/physical-optimizer/src/filter_pushdown.rs index 80201454d06d..6c445458b51b 100644 --- a/datafusion/physical-optimizer/src/push_down_filter.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -19,25 +19,25 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{config::ConfigOptions, Result}; -use datafusion_physical_expr::conjunction; -use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ - FilterDescription, FilterPushdownResult, FilterPushdownSupport, + ChildPushdownResult, FilterPushdownPropagation, PredicateSupport, PredicateSupports, }; -use datafusion_physical_plan::tree_node::PlanContext; -use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; + +use itertools::izip; /// Attempts to recursively push given filters from the top of the tree into leafs. /// /// # Default Implementation /// -/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a no-op -/// that assumes that: +/// The default implementation in [`ExecutionPlan::gather_filters_for_pushdown`] +/// and [`ExecutionPlan::handle_child_pushdown_result`] assumes that: /// -/// * Parent filters can't be passed onto children. -/// * This node has no filters to contribute. +/// * Parent filters can't be passed onto children (determined by [`ExecutionPlan::gather_filters_for_pushdown`]) +/// * This node has no filters to contribute (determined by [`ExecutionPlan::gather_filters_for_pushdown`]). +/// * Any filters that could not be pushed down to the children are marked as unsupported (determined by [`ExecutionPlan::handle_child_pushdown_result`]). /// /// # Example: Push filter into a `DataSourceExec` /// @@ -240,7 +240,7 @@ use datafusion_physical_plan::ExecutionPlan; /// The point here is that: /// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into the `DataSourceExec` node. /// Any filters above the [`AggregateExec`] node are not pushed down. -/// This is determined by calling [`ExecutionPlan::try_pushdown_filters`] on the [`AggregateExec`] node. +/// This is determined by calling [`ExecutionPlan::gather_filters_for_pushdown`] on the [`AggregateExec`] node. /// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push /// down the `id=1` filter. /// @@ -362,47 +362,29 @@ use datafusion_physical_plan::ExecutionPlan; /// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec /// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec #[derive(Debug)] -pub struct PushdownFilter {} +pub struct FilterPushdown {} -impl Default for PushdownFilter { +impl FilterPushdown { + pub fn new() -> Self { + Self {} + } +} + +impl Default for FilterPushdown { fn default() -> Self { Self::new() } } -pub type FilterDescriptionContext = PlanContext; - -impl PhysicalOptimizerRule for PushdownFilter { +impl PhysicalOptimizerRule for FilterPushdown { fn optimize( &self, plan: Arc, config: &ConfigOptions, ) -> Result> { - let context = FilterDescriptionContext::new_default(plan); - - context - .transform_up(|node| { - if node.plan.as_any().downcast_ref::().is_some() { - let initial_plan = Arc::clone(&node.plan); - let mut accept_updated = false; - let updated_node = node.transform_down(|filter_node| { - Self::try_pushdown(filter_node, config, &mut accept_updated) - }); - - if accept_updated { - updated_node - } else { - Ok(Transformed::no(FilterDescriptionContext::new_default( - initial_plan, - ))) - } - } - // Other filter introducing operators extends here - else { - Ok(Transformed::no(node)) - } - }) - .map(|updated| updated.data.plan) + Ok(push_down_filters(Arc::clone(&plan), vec![], config)? + .updated_node + .unwrap_or(plan)) } fn name(&self) -> &str { @@ -414,122 +396,146 @@ impl PhysicalOptimizerRule for PushdownFilter { } } -impl PushdownFilter { - pub fn new() -> Self { - Self {} - } +/// Support state of each predicate for the children of the node. +/// These predicates are coming from the parent node. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ParentPredicateStates { + NoChildren, + Unsupported, + Supported, +} - fn try_pushdown( - mut node: FilterDescriptionContext, - config: &ConfigOptions, - accept_updated: &mut bool, - ) -> Result> { - let initial_description = FilterDescription { - filters: node.data.take_description(), - }; +fn push_down_filters( + node: Arc, + parent_predicates: Vec>, + config: &ConfigOptions, +) -> Result>> { + // If the node has any child, these will be rewritten as supported or unsupported + let mut parent_predicates_pushdown_states = + vec![ParentPredicateStates::NoChildren; parent_predicates.len()]; + let mut self_filters_pushdown_supports = vec![]; + let mut new_children = vec![]; - let FilterPushdownResult { - support, - remaining_description, - } = node - .plan - .try_pushdown_filters(initial_description, config)?; + let children = node.children(); + let filter_description = + node.gather_filters_for_pushdown(parent_predicates.clone(), config)?; - match support { - FilterPushdownSupport::Supported { - mut child_descriptions, - op, - revisit, - } => { - if revisit { - // This check handles cases where the current operator is entirely removed - // from the plan and replaced with its child. In such cases, to not skip - // over the new node, we need to explicitly re-apply this pushdown logic - // to the new node. - // - // TODO: If TreeNodeRecursion supports a Revisit mechanism in the future, - // this manual recursion could be removed. + for (child, parent_filters, self_filters) in izip!( + children, + filter_description.parent_filters(), + filter_description.self_filters() + ) { + // Here, `parent_filters` are the predicates which are provided by the parent node of + // the current node, and tried to be pushed down over the child which the loop points + // currently. `self_filters` are the predicates which are provided by the current node, + // and tried to be pushed down over the child similarly. - // If the operator is removed, it should not leave any filters as remaining - debug_assert!(remaining_description.filters.is_empty()); - // Operators having 2 children cannot be removed - debug_assert_eq!(child_descriptions.len(), 1); - debug_assert_eq!(node.children.len(), 1); + let num_self_filters = self_filters.len(); + let mut parent_supported_predicate_indices = vec![]; + let mut all_predicates = self_filters; - node.plan = op; - node.data = child_descriptions.swap_remove(0); - node.children = node.children.swap_remove(0).children; - Self::try_pushdown(node, config, accept_updated) - } else { - if remaining_description.filters.is_empty() { - // Filter can be pushed down safely - node.plan = op; - if node.children.is_empty() { - *accept_updated = true; - } else { - for (child, descr) in - node.children.iter_mut().zip(child_descriptions) - { - child.data = descr; - } - } - } else { - // Filter cannot be pushed down - node = insert_filter_exec( - node, - child_descriptions, - remaining_description, - )?; + // Iterate over each predicate coming from the parent + for (idx, filter) in parent_filters.into_iter().enumerate() { + // Check if we can push this filter down to our child. + // These supports are defined in `gather_filters_for_pushdown()` + match filter { + PredicateSupport::Supported(predicate) => { + // Queue this filter up for pushdown to this child + all_predicates.push(predicate); + parent_supported_predicate_indices.push(idx); + // Mark this filter as supported by our children if no child has marked it as unsupported + if parent_predicates_pushdown_states[idx] + != ParentPredicateStates::Unsupported + { + parent_predicates_pushdown_states[idx] = + ParentPredicateStates::Supported; } - Ok(Transformed::yes(node)) } - } - FilterPushdownSupport::NotSupported => { - if remaining_description.filters.is_empty() { - Ok(Transformed { - data: node, - transformed: false, - tnr: TreeNodeRecursion::Stop, - }) - } else { - node = insert_filter_exec( - node, - vec![FilterDescription::empty(); 1], - remaining_description, - )?; - Ok(Transformed { - data: node, - transformed: true, - tnr: TreeNodeRecursion::Stop, - }) + PredicateSupport::Unsupported(_) => { + // Mark as unsupported by our children + parent_predicates_pushdown_states[idx] = + ParentPredicateStates::Unsupported; } } } - } -} -fn insert_filter_exec( - node: FilterDescriptionContext, - mut child_descriptions: Vec, - remaining_description: FilterDescription, -) -> Result { - let mut new_child_node = node; + // Any filters that could not be pushed down to a child are marked as not-supported to our parents + let result = push_down_filters(Arc::clone(child), all_predicates, config)?; - // Filter has one child - if !child_descriptions.is_empty() { - debug_assert_eq!(child_descriptions.len(), 1); - new_child_node.data = child_descriptions.swap_remove(0); - } - let new_plan = Arc::new(FilterExec::try_new( - conjunction(remaining_description.filters), - Arc::clone(&new_child_node.plan), - )?); - let new_children = vec![new_child_node]; - let new_data = FilterDescription::empty(); + if let Some(new_child) = result.updated_node { + // If we have a filter pushdown result, we need to update our children + new_children.push(new_child); + } else { + // If we don't have a filter pushdown result, we need to update our children + new_children.push(Arc::clone(child)); + } + + // Our child doesn't know the difference between filters that were passed down + // from our parents and filters that the current node injected. We need to de-entangle + // this since we do need to distinguish between them. + let mut all_filters = result.filters.into_inner(); + let parent_predicates = all_filters.split_off(num_self_filters); + let self_predicates = all_filters; + self_filters_pushdown_supports.push(PredicateSupports::new(self_predicates)); - Ok(FilterDescriptionContext::new( - new_plan, - new_data, - new_children, - )) + for (idx, result) in parent_supported_predicate_indices + .iter() + .zip(parent_predicates) + { + let current_node_state = match result { + PredicateSupport::Supported(_) => ParentPredicateStates::Supported, + PredicateSupport::Unsupported(_) => ParentPredicateStates::Unsupported, + }; + match (current_node_state, parent_predicates_pushdown_states[*idx]) { + (r, ParentPredicateStates::NoChildren) => { + // If we have no result, use the current state from this child + parent_predicates_pushdown_states[*idx] = r; + } + (ParentPredicateStates::Supported, ParentPredicateStates::Supported) => { + // If the current child and all previous children are supported, + // the filter continues to support it + parent_predicates_pushdown_states[*idx] = + ParentPredicateStates::Supported; + } + _ => { + // Either the current child or a previous child marked this filter as unsupported + parent_predicates_pushdown_states[*idx] = + ParentPredicateStates::Unsupported; + } + } + } + } + // Re-create this node with new children + let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?; + // Remap the result onto the parent filters as they were given to us. + // Any filters that were not pushed down to any children are marked as unsupported. + let parent_pushdown_result = PredicateSupports::new( + parent_predicates_pushdown_states + .into_iter() + .zip(parent_predicates) + .map(|(state, filter)| match state { + ParentPredicateStates::NoChildren => { + PredicateSupport::Unsupported(filter) + } + ParentPredicateStates::Unsupported => { + PredicateSupport::Unsupported(filter) + } + ParentPredicateStates::Supported => PredicateSupport::Supported(filter), + }) + .collect(), + ); + // Check what the current node wants to do given the result of pushdown to it's children + let mut res = updated_node.handle_child_pushdown_result( + ChildPushdownResult { + parent_filters: parent_pushdown_result, + self_filters: self_filters_pushdown_supports, + }, + config, + )?; + // Compare pointers for new_node and node, if they are different we must replace + // ourselves because of changes in our children. + if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, &node) { + res.updated_node = Some(updated_node) + } + Ok(res) } diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 57dac21b6eee..5a43d7118d63 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -29,6 +29,7 @@ pub mod coalesce_batches; pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; +pub mod filter_pushdown; pub mod join_selection; pub mod limit_pushdown; pub mod limited_distinct_aggregation; @@ -36,7 +37,6 @@ pub mod optimizer; pub mod output_requirements; pub mod projection_pushdown; pub mod pruning; -pub mod push_down_filter; pub mod sanity_checker; pub mod topk_aggregation; pub mod update_aggr_exprs; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index d4ff7d6b9e15..78d3e2ad8873 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -25,12 +25,12 @@ use crate::coalesce_batches::CoalesceBatches; use crate::combine_partial_final_agg::CombinePartialFinalAggregate; use crate::enforce_distribution::EnforceDistribution; use crate::enforce_sorting::EnforceSorting; +use crate::filter_pushdown::FilterPushdown; use crate::join_selection::JoinSelection; use crate::limit_pushdown::LimitPushdown; use crate::limited_distinct_aggregation::LimitedDistinctAggregation; use crate::output_requirements::OutputRequirements; use crate::projection_pushdown::ProjectionPushdown; -use crate::push_down_filter::PushdownFilter; use crate::sanity_checker::SanityCheckPlan; use crate::topk_aggregation::TopKAggregation; use crate::update_aggr_exprs::OptimizeAggregateOrder; @@ -125,7 +125,7 @@ impl PhysicalOptimizer { // The FilterPushdown rule tries to push down filters as far as it can. // For example, it will push down filtering from a `FilterExec` to // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. - Arc::new(PushdownFilter::new()), + Arc::new(FilterPushdown::new()), // The LimitPushdown rule tries to push limits down as far as possible, // replacing operators with fetching variants, or adding limits // past operators that support limit pushdown. diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 34b3f1b0241b..a0dd7371b4a0 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -32,11 +32,12 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{BatchCoalescer, CoalescerState}; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - filter_pushdown_transparent, FilterDescription, FilterPushdownResult, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, }; use datafusion_common::config::ConfigOptions; use futures::ready; @@ -226,14 +227,22 @@ impl ExecutionPlan for CoalesceBatchesExec { CardinalityEffect::Equal } - fn try_pushdown_filters( + fn gather_filters_for_pushdown( &self, - fd: FilterDescription, + parent_filters: Vec>, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_transparent::>( - Arc::new(self.clone()), - fd, + ) -> Result { + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)) + } + + fn handle_child_pushdown_result( + &self, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::transparent( + child_pushdown_result, )) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 9551c2b1743e..b81b3c8beeac 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -17,7 +17,7 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -491,39 +491,60 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { Ok(None) } - /// Attempts to recursively push given filters from the top of the tree into leafs. - /// - /// This is used for various optimizations, such as: - /// - /// * Pushing down filters into scans in general to minimize the amount of data that needs to be materialzied. - /// * Pushing down dynamic filters from operators like TopK and Joins into scans. - /// - /// Generally the further down (closer to leaf nodes) that filters can be pushed, the better. - /// - /// Consider the case of a query such as `SELECT * FROM t WHERE a = 1 AND b = 2`. - /// With no filter pushdown the scan needs to read and materialize all the data from `t` and then filter based on `a` and `b`. - /// With filter pushdown into the scan it can first read only `a`, then `b` and keep track of - /// which rows match the filter. - /// Then only for rows that match the filter does it have to materialize the rest of the columns. - /// - /// # Default Implementation - /// - /// The default implementation assumes: - /// * Parent filters can't be passed onto children. - /// * This node has no filters to contribute. - /// - /// # Implementation Notes - /// - /// Most of the actual logic is implemented as a Physical Optimizer rule. - /// See [`PushdownFilter`] for more details. - /// - /// [`PushdownFilter`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/filter_pushdown/struct.PushdownFilter.html - fn try_pushdown_filters( + /// Collect filters that this node can push down to its children. + /// Filters that are being pushed down from parents are passed in, + /// and the node may generate additional filters to push down. + /// For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec, + /// what will happen is that we recurse down the plan calling `ExecutionPlan::gather_filters_for_pushdown`: + /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent + /// filters so it only returns that `FilterExec` wants to push down its own predicate. + /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from + /// `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key) + /// but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join). + /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec` + /// and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything + /// since it has no children and no additional filters to push down. + /// It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse + /// up the plan that `DataSourceExec` can actually bind the filters. + /// + /// The default implementation bars all parent filters from being pushed down and adds no new filters. + /// This is the safest option, making filter pushdown opt-in on a per-node pasis. + fn gather_filters_for_pushdown( + &self, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + Ok( + FilterDescription::new_with_child_count(self.children().len()) + .all_parent_filters_unsupported(parent_filters), + ) + } + + /// Handle the result of a child pushdown. + /// This is called as we recurse back up the plan tree after recursing down and calling [`ExecutionPlan::gather_filters_for_pushdown`]. + /// Once we know what the result of pushing down filters into children is we ask the current node what it wants to do with that result. + /// For a `DataSourceExec` that may be absorbing the filters to apply them during the scan phase + /// (also known as late materialization). + /// A `FilterExec` may absorb any filters its children could not absorb, or if there are no filters left it + /// may remove itself from the plan altogether. + /// It combines both [`ChildPushdownResult::parent_filters`] and [`ChildPushdownResult::self_filters`] into a single + /// predicate and replaces it's own predicate. + /// Then it passes [`PredicateSupport::Supported`] for each parent predicate to the parent. + /// A `HashJoinExec` may ignore the pushdown result since it needs to apply the filters as part of the join anyhow. + /// It passes [`ChildPushdownResult::parent_filters`] back up to it's parents wrapped in [`FilterPushdownPropagation::transparent`] + /// and [`ChildPushdownResult::self_filters`] is discarded. + /// + /// The default implementation is a no-op that passes the result of pushdown from the children to its parent. + /// + /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported + fn handle_child_pushdown_result( &self, - fd: FilterDescription, + child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_not_supported(fd)) + ) -> Result>> { + Ok(FilterPushdownPropagation::transparent( + child_pushdown_result, + )) } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6df3e236a0dd..9f5d9dc2984e 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -27,7 +28,7 @@ use super::{ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - FilterDescription, FilterPushdownResult, FilterPushdownSupport, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, PredicateSupport, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -44,24 +45,29 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; use datafusion_common::{ internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, }; use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::{BinaryExpr, Column}; +use datafusion_physical_expr::expressions::{lit, BinaryExpr, Column}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, - ExprBoundaries, PhysicalExpr, + analyze, conjunction, split_conjunction, AcrossPartitions, AnalysisContext, + ConstExpr, ExprBoundaries, PhysicalExpr, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use log::trace; +const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20; + /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. #[derive(Debug, Clone)] @@ -88,7 +94,7 @@ impl FilterExec { ) -> Result { match predicate.data_type(input.schema().as_ref())? { DataType::Boolean => { - let default_selectivity = 20; + let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY; let cache = Self::compute_properties( &input, &predicate, @@ -448,54 +454,126 @@ impl ExecutionPlan for FilterExec { try_embed_projection(projection, self) } - fn try_pushdown_filters( + fn gather_filters_for_pushdown( &self, - mut fd: FilterDescription, + parent_filters: Vec>, _config: &ConfigOptions, - ) -> Result>> { - // Extend the filter descriptions - fd.filters.push(Arc::clone(&self.predicate)); - - // Extract the information - let child_descriptions = vec![fd]; - let remaining_description = FilterDescription { filters: vec![] }; - let filter_input = Arc::clone(self.input()); + ) -> Result { + let self_filter = Arc::clone(&self.predicate); - if let Some(projection_indices) = self.projection.as_ref() { - // Push the filters down, but leave a ProjectionExec behind, instead of the FilterExec - let filter_child_schema = filter_input.schema(); - let proj_exprs = projection_indices + let parent_filters = if let Some(projection_indices) = self.projection.as_ref() { + // We need to invert the projection on any referenced columns in the filter + // Create a mapping from the output columns to the input columns (the inverse of the projection) + let inverse_projection = projection_indices .iter() - .map(|p| { - let field = filter_child_schema.field(*p).clone(); - ( - Arc::new(Column::new(field.name(), *p)) as Arc, - field.name().to_string(), - ) + .enumerate() + .map(|(i, &p)| (p, i)) + .collect::>(); + parent_filters + .into_iter() + .map(|f| { + f.transform_up(|expr| { + let mut res = + if let Some(col) = expr.as_any().downcast_ref::() { + let index = col.index(); + let index_in_input_schema = + inverse_projection.get(&index).ok_or_else(|| { + DataFusionError::Internal(format!( + "Column {} not found in projection", + index + )) + })?; + Transformed::yes(Arc::new(Column::new( + col.name(), + *index_in_input_schema, + )) as _) + } else { + Transformed::no(expr) + }; + // Columns can only exist in the leaves, no need to try all nodes + res.tnr = TreeNodeRecursion::Jump; + Ok(res) + }) + .data() }) - .collect::>(); - let projection_exec = - Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?) as _; - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: projection_exec, - revisit: false, - }, - remaining_description, - }) + .collect::>>()? } else { - // Pull out the FilterExec, and inform the rule as it should be re-run - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: filter_input, - revisit: true, - }, - remaining_description, - }) + parent_filters + }; + + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters) + .with_self_filter(self_filter)) + } + + fn handle_child_pushdown_result( + &self, + mut child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + assert_eq!( + child_pushdown_result.self_filters.len(), + 1, + "FilterExec should only have one child" + ); + assert_eq!( + child_pushdown_result.self_filters[0].len(), + 1, + "FilterExec produces only one filter" + ); + + // We absorb any parent filters that were not handled by our children + let mut unhandled_filters = + child_pushdown_result.parent_filters.collect_unsupported(); + + let self_filters = child_pushdown_result + .self_filters + .swap_remove(0) + .into_inner() + .swap_remove(0); + if let PredicateSupport::Unsupported(expr) = self_filters { + unhandled_filters.push(expr); } + + // If we have unhandled filters, we need to create a new FilterExec + let filter_input = Arc::clone(self.input()); + let new_predicate = conjunction(unhandled_filters); + let new_exec = if new_predicate.eq(&lit(true)) { + // FilterExec is no longer needed, but we may need to leave a projection in place + match self.projection() { + Some(projection_indices) => { + let filter_child_schema = filter_input.schema(); + let proj_exprs = projection_indices + .iter() + .map(|p| { + let field = filter_child_schema.field(*p).clone(); + ( + Arc::new(Column::new(field.name(), *p)) + as Arc, + field.name().to_string(), + ) + }) + .collect::>(); + Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?) + as Arc + } + None => { + // No projection needed, just return the input + filter_input + } + } + } else { + // Create a new FilterExec with the new predicate + Arc::new( + FilterExec::try_new(new_predicate, filter_input)? + .with_default_selectivity(self.default_selectivity())? + .with_projection(self.projection().cloned())?, + ) + }; + Ok(FilterPushdownPropagation { + filters: child_pushdown_result.parent_filters.make_supported(), + updated_node: Some(new_exec), + }) } } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 38f5aef5923e..0003fc9d7277 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -16,80 +16,285 @@ // under the License. use std::sync::Arc; +use std::vec::IntoIter; -use crate::ExecutionPlan; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -#[derive(Clone, Debug)] -pub struct FilterDescription { - /// Expressions coming from the parent nodes - pub filters: Vec>, +/// The result of a plan for pushing down a filter into a child node. +/// This contains references to filters so that nodes can mutate a filter +/// before pushing it down to a child node (e.g. to adjust a projection) +/// or can directly take ownership of `Unsupported` filters that their children +/// could not handle. +#[derive(Debug, Clone)] +pub enum PredicateSupport { + Supported(Arc), + Unsupported(Arc), } -impl Default for FilterDescription { - fn default() -> Self { - Self::empty() +/// A thin wrapper around [`PredicateSupport`]s that allows for easy collection of +/// supported and unsupported filters. Inner vector stores each predicate for one node. +#[derive(Debug, Clone)] +pub struct PredicateSupports(Vec); + +impl PredicateSupports { + /// Create a new FilterPushdowns with the given filters and their pushdown status. + pub fn new(pushdowns: Vec) -> Self { + Self(pushdowns) } -} -impl FilterDescription { - /// Takes the filters out of the struct, leaving an empty vector in its place. - pub fn take_description(&mut self) -> Vec> { - std::mem::take(&mut self.filters) + /// Create a new [`PredicateSupports`] with all filters as supported. + pub fn all_supported(filters: Vec>) -> Self { + let pushdowns = filters + .into_iter() + .map(PredicateSupport::Supported) + .collect(); + Self::new(pushdowns) + } + + /// Create a new [`PredicateSupports`] with all filters as unsupported. + pub fn all_unsupported(filters: Vec>) -> Self { + let pushdowns = filters + .into_iter() + .map(PredicateSupport::Unsupported) + .collect(); + Self::new(pushdowns) + } + + /// Transform all filters to supported, returning a new FilterPushdowns. + /// This does not modify the original [`PredicateSupports`]. + pub fn make_supported(self) -> Self { + let pushdowns = self + .0 + .into_iter() + .map(|f| match f { + PredicateSupport::Supported(expr) => PredicateSupport::Supported(expr), + PredicateSupport::Unsupported(expr) => PredicateSupport::Supported(expr), + }) + .collect(); + Self::new(pushdowns) + } + + /// Collect unsupported filters into a Vec, without removing them from the original + /// [`PredicateSupports`]. + pub fn collect_unsupported(&self) -> Vec> { + self.0 + .iter() + .filter_map(|f| match f { + PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), + PredicateSupport::Supported(_) => None, + }) + .collect() + } + + /// Collect all filters into a Vec, without removing them from the original + /// FilterPushdowns. + pub fn collect_all(self) -> Vec> { + self.0 + .into_iter() + .map(|f| match f { + PredicateSupport::Supported(expr) + | PredicateSupport::Unsupported(expr) => expr, + }) + .collect() + } + + pub fn into_inner(self) -> Vec { + self.0 } - pub fn empty() -> FilterDescription { - Self { filters: vec![] } + /// Return an iterator over the inner `Vec`. + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } + + /// Return the number of filters in the inner `Vec`. + pub fn len(&self) -> usize { + self.0.len() + } + + /// Check if the inner `Vec` is empty. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl IntoIterator for PredicateSupports { + type Item = PredicateSupport; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() } } -#[derive(Debug)] -pub enum FilterPushdownSupport { - Supported { - // Filter predicates which can be pushed down through the operator. - // NOTE that these are not placed into any operator. - child_descriptions: Vec, - // Possibly updated new operator - op: T, - // Whether the node is removed from the plan and the rule should be re-run manually - // on the new node. - // TODO: If TreeNodeRecursion supports Revisit mechanism, this flag can be removed - revisit: bool, - }, - NotSupported, +/// The result of pushing down filters into a child node. +/// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`]. +/// Nodes process this result and convert it into a [`FilterPushdownPropagation`] +/// that is returned to their parent. +/// +/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result +#[derive(Debug, Clone)] +pub struct ChildPushdownResult { + /// The combined result of pushing down each parent filter into each child. + /// For example, given the fitlers `[a, b]` and children `[1, 2, 3]` the matrix of responses: + /// + // | filter | child 1 | child 2 | child 3 | result | + // |--------|-------------|-----------|-----------|-------------| + // | a | Supported | Supported | Supported | Supported | + // | b | Unsupported | Supported | Supported | Unsupported | + /// + /// That is: if any child marks a filter as unsupported or if the filter was not pushed + /// down into any child then the result is unsupported. + /// If at least one children and all children that received the filter mark it as supported + /// then the result is supported. + pub parent_filters: PredicateSupports, + /// The result of pushing down each filter this node provided into each of it's children. + /// This is not combined with the parent filters so that nodes can treat each child independently. + pub self_filters: Vec, } -#[derive(Debug)] -pub struct FilterPushdownResult { - pub support: FilterPushdownSupport, - // Filters which cannot be pushed down through the operator. - // NOTE that caller of try_pushdown_filters() should handle these remanining predicates, - // possibly introducing a FilterExec on top of this operator. - pub remaining_description: FilterDescription, +/// The result of pushing down filters into a node that it returns to its parent. +/// This is what nodes return from [`ExecutionPlan::handle_child_pushdown_result`] to communicate +/// to the optimizer: +/// +/// 1. What to do with any parent filters that were not completely handled by the children. +/// 2. If the node needs to be replaced in the execution plan with a new node or not. +/// +/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result +#[derive(Debug, Clone)] +pub struct FilterPushdownPropagation { + pub filters: PredicateSupports, + pub updated_node: Option, } -pub fn filter_pushdown_not_supported( - remaining_description: FilterDescription, -) -> FilterPushdownResult { - FilterPushdownResult { - support: FilterPushdownSupport::NotSupported, - remaining_description, +impl FilterPushdownPropagation { + /// Create a new [`FilterPushdownPropagation`] that tells the parent node + /// that echoes back up to the parent the result of pushing down the filters + /// into the children. + pub fn transparent(child_pushdown_result: ChildPushdownResult) -> Self { + Self { + filters: child_pushdown_result.parent_filters, + updated_node: None, + } } + + /// Create a new [`FilterPushdownPropagation`] that tells the parent node + /// that none of the parent filters were not pushed down. + pub fn unsupported(parent_filters: Vec>) -> Self { + let unsupported = PredicateSupports::all_unsupported(parent_filters); + Self { + filters: unsupported, + updated_node: None, + } + } +} + +#[derive(Debug, Clone)] +struct ChildFilterDescription { + /// Description of which parent filters can be pushed down into this node. + /// Since we need to transmit filter pushdown results back to this node's parent + /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down. + /// We do this using a [`PredicateSupports`] which simplifies manipulating supported/unsupported filters. + parent_filters: PredicateSupports, + /// Description of which filters this node is pushing down to its children. + /// Since this is not transmitted back to the parents we can have variable sized inner arrays + /// instead of having to track supported/unsupported. + self_filters: Vec>, } -pub fn filter_pushdown_transparent( - plan: Arc, - fd: FilterDescription, -) -> FilterPushdownResult> { - let child_descriptions = vec![fd]; - let remaining_description = FilterDescription::empty(); - - FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: plan, - revisit: false, - }, - remaining_description, +impl ChildFilterDescription { + fn new() -> Self { + Self { + parent_filters: PredicateSupports::new(vec![]), + self_filters: vec![], + } + } +} + +#[derive(Debug, Clone)] +pub struct FilterDescription { + /// A filter description for each child. + /// This includes which parent filters and which self filters (from the node in question) + /// will get pushed down to each child. + child_filter_descriptions: Vec, +} + +impl FilterDescription { + pub fn new_with_child_count(num_children: usize) -> Self { + Self { + child_filter_descriptions: vec![ChildFilterDescription::new(); num_children], + } + } + + pub fn parent_filters(&self) -> Vec { + self.child_filter_descriptions + .iter() + .map(|d| &d.parent_filters) + .cloned() + .collect() + } + + pub fn self_filters(&self) -> Vec>> { + self.child_filter_descriptions + .iter() + .map(|d| &d.self_filters) + .cloned() + .collect() + } + + /// Mark all parent filters as supported for all children. + /// This is the case if the node allows filters to be pushed down through it + /// without any modification. + /// This broadcasts the parent filters to all children. + /// If handling of parent filters is different for each child then you should set the + /// field direclty. + /// For example, nodes like [`RepartitionExec`] that let filters pass through it transparently + /// use this to mark all parent filters as supported. + /// + /// [`RepartitionExec`]: crate::repartition::RepartitionExec + pub fn all_parent_filters_supported( + mut self, + parent_filters: Vec>, + ) -> Self { + let supported = PredicateSupports::all_supported(parent_filters); + for child in &mut self.child_filter_descriptions { + child.parent_filters = supported.clone(); + } + self + } + + /// Mark all parent filters as unsupported for all children. + /// This is the case if the node does not allow filters to be pushed down through it. + /// This broadcasts the parent filters to all children. + /// If handling of parent filters is different for each child then you should set the + /// field direclty. + /// For example, the default implementation of filter pushdwon in [`ExecutionPlan`] + /// assumes that filters cannot be pushed down to children. + /// + /// [`ExecutionPlan`]: crate::ExecutionPlan + pub fn all_parent_filters_unsupported( + mut self, + parent_filters: Vec>, + ) -> Self { + let unsupported = PredicateSupports::all_unsupported(parent_filters); + for child in &mut self.child_filter_descriptions { + child.parent_filters = unsupported.clone(); + } + self + } + + /// Add a filter generated / owned by the current node to be pushed down to all children. + /// This assumes that there is a single filter that that gets pushed down to all children + /// equally. + /// If there are multiple filters or pushdown to children is not homogeneous then + /// you should set the field directly. + /// For example: + /// - `TopK` uses this to push down a single filter to all children, it can use this method. + /// - `HashJoinExec` pushes down a filter only to the probe side, it cannot use this method. + pub fn with_self_filter(mut self, predicate: Arc) -> Self { + for child in &mut self.child_filter_descriptions { + child.self_filters = vec![Arc::clone(&predicate)]; + } + self } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index f7c4f7477f12..c86a37697a05 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -54,7 +54,7 @@ use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use crate::filter_pushdown::{ - filter_pushdown_transparent, FilterDescription, FilterPushdownResult, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, }; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -743,14 +743,22 @@ impl ExecutionPlan for RepartitionExec { )?))) } - fn try_pushdown_filters( + fn gather_filters_for_pushdown( &self, - fd: FilterDescription, + parent_filters: Vec>, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_transparent::>( - Arc::new(self.clone()), - fd, + ) -> Result { + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)) + } + + fn handle_child_pushdown_result( + &self, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::transparent( + child_pushdown_result, )) } } diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 99fbb713646d..252704f260b8 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -186,14 +186,14 @@ COPY ( ) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet' STORED AS PARQUET; -query TT rowsort -select * from t_pushdown where part == val +query TT +select * from t_pushdown where part == val order by part, val; ---- a a b b query TT -select * from t_pushdown where part != val +select * from t_pushdown where part != val order by part, val; ---- xyz c