Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ fn boundary_analysis_in_conjuctions_demo() -> Result<()> {
//
// But `AND` conjunctions are easier to reason with because their interval
// arithmetic follows naturally from set intersection operations, let us
// now look at an example that is a tad more complicated `OR` conjunctions.
// now look at an example that is a tad more complicated `OR` disjunctions.

// The expression we will look at is `age > 60 OR age <= 18`.
let age_greater_than_60_less_than_18 =
Expand All @@ -435,7 +435,7 @@ fn boundary_analysis_in_conjuctions_demo() -> Result<()> {
//
// Initial range: [14, 79] as described in our column statistics.
//
// From the left-hand side and right-hand side of our `OR` conjunctions
// From the left-hand side and right-hand side of our `OR` disjunctions
// we end up with two ranges, instead of just one.
//
// - age > 60: [61, 79]
Expand All @@ -446,7 +446,8 @@ fn boundary_analysis_in_conjuctions_demo() -> Result<()> {
let physical_expr = SessionContext::new()
.create_physical_expr(age_greater_than_60_less_than_18, &df_schema)?;

// Since we don't handle interval arithmetic for `OR` operator this will error out.
// However, analysis only supports a single interval, so we don't yet deal
// with the multiple possibilities of the `OR` disjunctions.
let analysis = analyze(
&physical_expr,
AnalysisContext::new(initial_boundaries),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr-common/src/interval_arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ impl Interval {
upper: ScalarValue::Boolean(Some(upper)),
})
}
_ => internal_err!("Incompatible data types for logical conjunction"),
_ => internal_err!("Incompatible data types for logical disjunction"),
}
}

Expand Down Expand Up @@ -971,6 +971,7 @@ pub fn apply_operator(op: &Operator, lhs: &Interval, rhs: &Interval) -> Result<I
Operator::Lt => lhs.lt(rhs),
Operator::LtEq => lhs.lt_eq(rhs),
Operator::And => lhs.and(rhs),
Operator::Or => lhs.or(rhs),
Operator::Plus => lhs.add(rhs),
Operator::Minus => lhs.sub(rhs),
Operator::Multiply => lhs.mul(rhs),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl ExprBoundaries {
) -> Result<Self> {
let field = schema.fields().get(col_index).ok_or_else(|| {
internal_datafusion_err!(
"Could not create `ExprBoundaries`: in `try_from_column` `col_index`
"Could not create `ExprBoundaries`: in `try_from_column` `col_index`
has gone out of bounds with a value of {col_index}, the schema has {} columns.",
schema.fields.len()
)
Expand Down Expand Up @@ -425,7 +425,7 @@ mod tests {
fn test_analyze_invalid_boundary_exprs() {
let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
let expr = col("a").lt(lit(10)).or(col("a").gt(lit(20)));
let expected_error = "Interval arithmetic does not support the operator OR";
let expected_error = "OR operator cannot yet propagate true intervals";
let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
let physical_expr =
Expand Down
96 changes: 95 additions & 1 deletion datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl PhysicalExpr for BinaryExpr {
}
} else if self.op.eq(&Operator::Or) {
if interval.eq(&Interval::CERTAINLY_FALSE) {
// A certainly false logical conjunction can only derive from certainly
// A certainly false logical disjunction can only derive from certainly
// false operands. Otherwise, we prove infeasibility.
Ok((!left_interval.eq(&Interval::CERTAINLY_TRUE)
&& !right_interval.eq(&Interval::CERTAINLY_TRUE))
Expand Down Expand Up @@ -5305,4 +5305,98 @@ mod tests {
assert!(matches!(result, ColumnarValue::Scalar(_)));
}
}

#[test]
fn test_evaluate_bounds_int32() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);

let a = Arc::new(Column::new("a", 0)) as _;
let b = Arc::new(Column::new("b", 1)) as _;

// Test addition bounds
let add_expr =
binary_expr(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema).unwrap();
let add_bounds = add_expr
.evaluate_bounds(&[
&Interval::make(Some(1), Some(10)).unwrap(),
&Interval::make(Some(5), Some(15)).unwrap(),
])
.unwrap();
assert_eq!(add_bounds, Interval::make(Some(6), Some(25)).unwrap());

// Test subtraction bounds
let sub_expr =
binary_expr(Arc::clone(&a), Operator::Minus, Arc::clone(&b), &schema)
.unwrap();
let sub_bounds = sub_expr
.evaluate_bounds(&[
&Interval::make(Some(1), Some(10)).unwrap(),
&Interval::make(Some(5), Some(15)).unwrap(),
])
.unwrap();
assert_eq!(sub_bounds, Interval::make(Some(-14), Some(5)).unwrap());

// Test multiplication bounds
let mul_expr =
binary_expr(Arc::clone(&a), Operator::Multiply, Arc::clone(&b), &schema)
.unwrap();
let mul_bounds = mul_expr
.evaluate_bounds(&[
&Interval::make(Some(1), Some(10)).unwrap(),
&Interval::make(Some(5), Some(15)).unwrap(),
])
.unwrap();
assert_eq!(mul_bounds, Interval::make(Some(5), Some(150)).unwrap());

// Test division bounds
let div_expr =
binary_expr(Arc::clone(&a), Operator::Divide, Arc::clone(&b), &schema)
.unwrap();
let div_bounds = div_expr
.evaluate_bounds(&[
&Interval::make(Some(10), Some(20)).unwrap(),
&Interval::make(Some(2), Some(5)).unwrap(),
])
.unwrap();
assert_eq!(div_bounds, Interval::make(Some(2), Some(10)).unwrap());
}

#[test]
fn test_evaluate_bounds_bool() {
let schema = Schema::new(vec![
Field::new("a", DataType::Boolean, false),
Field::new("b", DataType::Boolean, false),
]);

let a = Arc::new(Column::new("a", 0)) as _;
let b = Arc::new(Column::new("b", 1)) as _;

// Test OR bounds
let or_expr =
binary_expr(Arc::clone(&a), Operator::Or, Arc::clone(&b), &schema).unwrap();
let or_bounds = or_expr
.evaluate_bounds(&[
&Interval::make(Some(true), Some(true)).unwrap(),
&Interval::make(Some(false), Some(false)).unwrap(),
])
.unwrap();
assert_eq!(or_bounds, Interval::make(Some(true), Some(true)).unwrap());

// Test AND bounds
let and_expr =
binary_expr(Arc::clone(&a), Operator::And, Arc::clone(&b), &schema).unwrap();
let and_bounds = and_expr
.evaluate_bounds(&[
&Interval::make(Some(true), Some(true)).unwrap(),
&Interval::make(Some(false), Some(false)).unwrap(),
])
.unwrap();
assert_eq!(
and_bounds,
Interval::make(Some(false), Some(false)).unwrap()
);
}
}
15 changes: 13 additions & 2 deletions datafusion/physical-expr/src/intervals/cp_solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ use std::sync::Arc;
use super::utils::{
convert_duration_type_to_interval, convert_interval_type_to_duration, get_inverse_op,
};
use crate::expressions::Literal;
use crate::expressions::{BinaryExpr, Literal};
use crate::utils::{build_dag, ExprTreeNode};
use crate::PhysicalExpr;

use arrow::datatypes::{DataType, Schema};
use datafusion_common::{internal_err, Result};
use datafusion_common::{internal_err, not_impl_err, Result};
use datafusion_expr::interval_arithmetic::{apply_operator, satisfy_greater, Interval};
use datafusion_expr::Operator;

Expand Down Expand Up @@ -645,6 +645,17 @@ impl ExprIntervalGraph {
.map(|child| self.graph[*child].interval())
.collect::<Vec<_>>();
let node_interval = self.graph[node].interval();
// Special case: true OR could in principle be propagated by 3 interval sets,
// (i.e. left true, or right true, or both true) however we do not support this yet.
if node_interval == &Interval::CERTAINLY_TRUE
&& self.graph[node]
.expr
.as_any()
.downcast_ref::<BinaryExpr>()
.is_some_and(|expr| expr.op() == &Operator::Or)
{
return not_impl_err!("OR operator cannot yet propagate true intervals");
}
Comment on lines +648 to +658
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add this to keep the invalid-analysis tests failing. I could move this inside the implementation of PhysicalExpr for BinaryExpr, however that case currently returns Ok(Some(vec![])) and it wasn't clear to me if it was ok to change that case to be failing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense to me

Copy link
Contributor

@berkaysynnada berkaysynnada May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than error, this case should go to PropagationResult::CannotPropagate, if any child is true, or PropagationResult::Infeasible if both of them are false

let propagated_intervals = self.graph[node]
.expr
.propagate_constraints(node_interval, &children_intervals)?;
Expand Down