Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 87 additions & 6 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::expressions::CastExpr;
use arrow_schema::SchemaRef;
use datafusion_common::{JoinSide, JoinType};
use indexmap::IndexSet;
use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
Expand All @@ -31,12 +36,8 @@ use crate::{
PhysicalSortRequirement,
};

use arrow_schema::{SchemaRef, SortOptions};
use arrow_schema::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{JoinSide, JoinType};

use indexmap::IndexSet;
use itertools::Itertools;

/// A `EquivalenceProperties` object stores useful information related to a schema.
/// Currently, it keeps track of:
Expand Down Expand Up @@ -426,6 +427,87 @@ impl EquivalenceProperties {
(!meet.is_empty()).then_some(meet)
}

/// we substitute the ordering according to input expression type, this is a simplified version
/// In this case, we just substitute when the expression satisfy the following confition
/// I. just have one column and is a CAST expression
/// II. just have one parameter and is a ScalarFUnctionexpression and it is monotonic
/// TODO: we could precompute all the senario that is computable, for example: atan(x + 1000) should also be substituted if
/// x is DESC or ASC
pub fn substitute_ordering_component(
matching_exprs: Arc<Vec<&Arc<dyn PhysicalExpr>>>,
sort_expr: &[PhysicalSortExpr],
schema: SchemaRef,
) -> Vec<PhysicalSortExpr> {
sort_expr
.iter()
.filter(|sort_expr| {
matching_exprs.iter().any(|matched| !matched.eq(*sort_expr))
})
.map(|sort_expr| {
let referring_exprs: Vec<_> = matching_exprs
.iter()
.filter(|matched| expr_refers(matched, &sort_expr.expr))
.cloned()
.collect();
// does not referring to any matching component, we just skip it

if referring_exprs.len() == 1 {
// we check whether this expression is substitutable or not
let r_expr = referring_exprs[0].clone();
if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
// we need to know whether the Cast Expr matches or not
let expr_type =
sort_expr.expr.data_type(schema.as_ref()).unwrap();
if cast_expr.expr.eq(&sort_expr.expr)
&& cast_expr.is_bigger_cast(expr_type)
{
PhysicalSortExpr {
expr: r_expr.clone(),
options: sort_expr.options,
}
} else {
sort_expr.clone()
}
} else {
sort_expr.clone()
}
} else {
sort_expr.clone()
}
})
.collect()
}
/// In projection, supposed we have a input function 'A DESC B DESC' and the output shares the same expression
/// with A and B, we could surely use the ordering of the original ordering, However, if the A has been changed,
/// for example, A-> Cast(A, Int64) or any other form, it is invalid if we continue using the original ordering
/// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct
/// dependency map, happen in issue 8838: <https://github.com/apache/arrow-datafusion/issues/8838>
pub fn substitute_oeq_class(
&mut self,
exprs: &[(Arc<dyn PhysicalExpr>, String)],
mapping: &ProjectionMapping,
schema: SchemaRef,
) {
let matching_exprs: Arc<Vec<_>> = Arc::new(
exprs
.iter()
.filter(|(expr, _)| mapping.iter().any(|(source, _)| source.eq(expr)))
.map(|(source, _)| source)
.collect(),
);
let orderings = std::mem::take(&mut self.oeq_class.orderings);
let new_order = orderings
.into_iter()
.map(move |order| {
Self::substitute_ordering_component(
matching_exprs.clone(),
&order,
schema.clone(),
)
})
.collect();
self.oeq_class = OrderingEquivalenceClass::new(new_order);
}
/// Projects argument `expr` according to `projection_mapping`, taking
/// equivalences into account.
///
Expand Down Expand Up @@ -564,7 +646,6 @@ impl EquivalenceProperties {

// Get dependency map for existing orderings:
let dependency_map = self.construct_dependency_map(&mapping);

let orderings = mapping.iter().flat_map(|(source, target)| {
referred_dependencies(&dependency_map, source)
.into_iter()
Expand Down
30 changes: 25 additions & 5 deletions datafusion/physical-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
use std::any::Any;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
use DataType::*;

use arrow::compute::{can_cast_types, kernels, CastOptions};
use arrow::datatypes::{DataType, Schema};
Expand All @@ -41,7 +41,7 @@ const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
#[derive(Debug, Clone)]
pub struct CastExpr {
/// The expression to cast
expr: Arc<dyn PhysicalExpr>,
pub expr: Arc<dyn PhysicalExpr>,
/// The data type to cast to
cast_type: DataType,
/// Cast options
Expand Down Expand Up @@ -76,6 +76,26 @@ impl CastExpr {
pub fn cast_options(&self) -> &CastOptions<'static> {
&self.cast_options
}
pub fn is_bigger_cast(&self, src: DataType) -> bool {
if src == self.cast_type {
return true;
}
matches!(
(src, &self.cast_type),
(Int8, Int16 | Int32 | Int64)
| (Int16, Int32 | Int64)
| (Int32, Int64)
| (UInt8, UInt16 | UInt32 | UInt64)
| (UInt16, UInt32 | UInt64)
| (UInt32, UInt64)
| (
Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32,
Float32 | Float64
)
| (Int64 | UInt64, Float64)
| (Utf8, LargeUtf8)
)
}
}

impl fmt::Display for CastExpr {
Expand Down
16 changes: 11 additions & 5 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl ProjectionExec {
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let input_schema = input.schema();

let fields: Result<Vec<Field>> = expr
.iter()
.map(|(e, name)| {
Expand All @@ -95,7 +94,10 @@ impl ProjectionExec {
// construct a map from the input expressions to the output expression of the Projection
let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?;

let input_eqs = input.equivalence_properties();
let mut input_eqs = input.equivalence_properties();

input_eqs.substitute_oeq_class(&expr, &projection_mapping, input_schema.clone());

let project_eqs = input_eqs.project(&projection_mapping, schema.clone());
let output_ordering = project_eqs.oeq_class().output_ordering();

Expand Down Expand Up @@ -201,9 +203,13 @@ impl ExecutionPlan for ProjectionExec {
}

fn equivalence_properties(&self) -> EquivalenceProperties {
self.input
.equivalence_properties()
.project(&self.projection_mapping, self.schema())
let mut equi_properties = self.input.equivalence_properties();
equi_properties.substitute_oeq_class(
&self.expr,
&self.projection_mapping,
self.input.schema().clone(),
);
equi_properties.project(&self.projection_mapping, self.schema())
}

fn with_new_children(
Expand Down
86 changes: 86 additions & 0 deletions datafusion/sqllogictest/test_files/monotonic_projection_test.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# prepare the table
statement ok
CREATE EXTERNAL TABLE delta_encoding_required_column (
c_customer_sk INT NOT NULL,
c_current_cdemo_sk INT NOT NULL
)
STORED AS CSV
WITH ORDER (
c_customer_sk DESC,
c_current_cdemo_sk DESC
)
LOCATION '../../testing/data/csv/aggregate_test_100.csv';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema of the file, and this table is not consistent. Additionally file doesn't satisfy the WITH_ORDER invariant given during creation.


# test for substitute CAST senario
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also please add a test for times when casting may not preserve ordering?

For exmaple if the input is INT

0
1
2
10

If that is cast to UTF8 the data is now

"0"
"1"
"2"
"10"

Which is no longer sorted correctly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add that (maybe on the weekend)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added now

query TT
EXPLAIN
SELECT
CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big,
c_current_cdemo_sk
Comment on lines +34 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the select expressions were c_customer_sk, CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big, c_current_cdemo_sk planner wouldn't remove the ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC sort from the plan. However, it should be possible. Because, in current implementation, substitution cannot generate more than 1 valid ordering from a single ordering.

FROM delta_encoding_required_column
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
----
logical_plan
Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Int64) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]
physical_plan
SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
--ProjectionExec: expr=[CAST(c_customer_sk@0 AS Int64) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false

# test for commom rename
query TT
EXPLAIN
SELECT
c_customer_sk AS c_customer_sk_big,
c_current_cdemo_sk
FROM delta_encoding_required_column
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
----
logical_plan
Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
--Projection: delta_encoding_required_column.c_customer_sk AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]
physical_plan
ProjectionExec: expr=[c_customer_sk@0 as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false


# test for cast Utf8
query TT
EXPLAIN
SELECT
CAST(c_customer_sk AS STRING) AS c_customer_sk_big,
c_current_cdemo_sk
FROM delta_encoding_required_column
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
----
logical_plan
Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Utf8) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]
physical_plan
SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
--SortExec: expr=[c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
----ProjectionExec: expr=[CAST(c_customer_sk@0 AS Utf8) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false