Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! projections one by one if the operator below is amenable to this. If a
//! projection reaches a source, it can even dissappear from the plan entirely.

use std::collections::HashMap;
use std::sync::Arc;

use super::output_requirements::OutputRequirementExec;
Expand All @@ -42,9 +43,9 @@ use crate::physical_plan::{Distribution, ExecutionPlan};

use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::JoinSide;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::{
Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};
Expand Down Expand Up @@ -245,12 +246,36 @@ fn try_swapping_with_streaming_table(
}

/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
/// Two consecutive projections can always merge into a single projection.
fn try_unifying_projections(
projection: &ProjectionExec,
child: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut projected_exprs = vec![];
let mut column_ref_map: HashMap<Column, usize> = HashMap::new();

// Collect the column references usage in the outer projection.
projection.expr().iter().for_each(|(expr, _)| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be mistaken and it may not matter, but I think this will prevent pushing down exprs that refer to the same column more than once

Projection exprs=[b as c, b as d]
  Projection exprs = [a as b]

Into

Projection exprs=[a as c, a as d]

Copy link
Contributor Author

@haohuaijin haohuaijin Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't prevent Column and Literal push_down. We prevent other express push_down, used to prevent multiple evaluation of the same expression.

    if column_ref_map.iter().any(|(column, count)| {
        *count > 1 && !is_expr_trivial(&child.expr()[column.index()].0.clone())
    }) {
        return Ok(None);
    }

expr.apply(&mut |expr| {
Ok({
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
*column_ref_map.entry(column.clone()).or_default() += 1;
}
VisitRecursion::Continue
})
})
.unwrap();
});

// Merging these projections is not beneficial, e.g
// If an expression is not trivial and it is referred more than 1, unifies projections will be
// beneficial as caching mechanism for non-trivial computations.
// See discussion in: https://github.com/apache/arrow-datafusion/issues/8296
if column_ref_map.iter().any(|(column, count)| {
*count > 1 && !is_expr_trivial(&child.expr()[column.index()].0.clone())
}) {
return Ok(None);
}

for (expr, alias) in projection.expr() {
// If there is no match in the input projection, we cannot unify these
// projections. This case will arise if the projection expression contains
Expand All @@ -265,6 +290,13 @@ fn try_unifying_projections(
.map(|e| Some(Arc::new(e) as _))
}

/// Checks if the given expression is trivial.
/// An expression is considered trivial if it is either a `Column` or a `Literal`.
fn is_expr_trivial(expr: &Arc<dyn PhysicalExpr>) -> bool {
expr.as_any().downcast_ref::<Column>().is_some()
|| expr.as_any().downcast_ref::<Literal>().is_some()
}

/// Tries to swap `projection` with its input (`output_req`). If possible,
/// performs the swap and returns [`OutputRequirementExec`] as the top plan.
/// Otherwise, returns `None`.
Expand Down
39 changes: 39 additions & 0 deletions datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1065,3 +1065,42 @@ select z+1, y from (select x+1 as z, y from t) where y > 1;
----
3 2
3 3

query TT
EXPLAIN SELECT x/2, x/2+1 FROM t;
----
logical_plan
Projection: t.x / Int64(2)Int64(2)t.x AS t.x / Int64(2), t.x / Int64(2)Int64(2)t.x AS t.x / Int64(2) + Int64(1)
--Projection: t.x / Int64(2) AS t.x / Int64(2)Int64(2)t.x
----TableScan: t projection=[x]
physical_plan
ProjectionExec: expr=[t.x / Int64(2)Int64(2)t.x@0 as t.x / Int64(2), t.x / Int64(2)Int64(2)t.x@0 + 1 as t.x / Int64(2) + Int64(1)]
--ProjectionExec: expr=[x@0 / 2 as t.x / Int64(2)Int64(2)t.x]
----MemoryExec: partitions=1, partition_sizes=[1]

query II
SELECT x/2, x/2+1 FROM t;
----
0 1
0 1

query TT
EXPLAIN SELECT abs(x), abs(x) + abs(y) FROM t;
----
logical_plan
Projection: abs(t.x)t.x AS abs(t.x), abs(t.x)t.x AS abs(t.x) + abs(t.y)
--Projection: abs(t.x) AS abs(t.x)t.x, t.y
----TableScan: t projection=[x, y]
physical_plan
ProjectionExec: expr=[abs(t.x)t.x@0 as abs(t.x), abs(t.x)t.x@0 + abs(y@1) as abs(t.x) + abs(t.y)]
--ProjectionExec: expr=[abs(x@0) as abs(t.x)t.x, y@1 as y]
----MemoryExec: partitions=1, partition_sizes=[1]

query II
SELECT abs(x), abs(x) + abs(y) FROM t;
----
1 3
1 4

statement ok
DROP TABLE t;