Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,19 @@ impl CaseExpr {
let mut current_value = new_null_array(&return_type, batch.num_rows());
// We only consider non-null values while comparing with whens
let mut remainder = not(&base_nulls)?;
let mut remainder_count = remainder.true_count();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have found in past evaluations, that the code generated for true_count is astonishingly fast (it uses some special hardware instruction) so I am not surprised this works well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know that that’s fairly cheap.

What I’m experimenting with is retaining the filtered record batch from one loop iteration to the next so that the amount of data to be churned through each iteration shrinks.

for i in 0..self.when_then_expr.len() {
let when_value = self.when_then_expr[i]
.0
.evaluate_selection(batch, &remainder)?;
if remainder_count == 0 {
break;
}

let when_value = if remainder_count == batch.num_rows() {
self.when_then_expr[i].0.evaluate(batch)?
} else {
self.when_then_expr[i]
.0
.evaluate_selection(batch, &remainder)?
};
let when_value = when_value.into_array(batch.num_rows())?;
// build boolean array representing which rows match the "when" value
let when_match = compare_with_eq(
Expand All @@ -230,13 +239,18 @@ impl CaseExpr {
let when_match = and(&when_match, &remainder)?;

// When no rows available for when clause, skip then clause
if when_match.true_count() == 0 {
let when_match_count = when_match.true_count();
if when_match_count == 0 {
continue;
}

let then_value = self.when_then_expr[i]
.1
.evaluate_selection(batch, &when_match)?;
let then_value = if when_match_count == batch.num_rows() {
self.when_then_expr[i].1.evaluate(batch)?
} else {
self.when_then_expr[i]
.1
.evaluate_selection(batch, &when_match)?
};

current_value = match then_value {
ColumnarValue::Scalar(ScalarValue::Null) => {
Expand All @@ -251,6 +265,7 @@ impl CaseExpr {
};

remainder = and_not(&remainder, &when_match)?;
remainder_count -= when_match_count;
}

if let Some(e) = self.else_expr() {
Expand Down Expand Up @@ -280,10 +295,15 @@ impl CaseExpr {
// start with nulls as default output
let mut current_value = new_null_array(&return_type, batch.num_rows());
let mut remainder = BooleanArray::from(vec![true; batch.num_rows()]);
let mut remainder_count = batch.num_rows();
for i in 0..self.when_then_expr.len() {
let when_value = self.when_then_expr[i]
.0
.evaluate_selection(batch, &remainder)?;
let when_value = if remainder_count == batch.num_rows() {
self.when_then_expr[i].0.evaluate(batch)?
} else {
self.when_then_expr[i]
.0
.evaluate_selection(batch, &remainder)?
};
let when_value = when_value.into_array(batch.num_rows())?;
let when_value = as_boolean_array(&when_value).map_err(|_| {
internal_datafusion_err!("WHEN expression did not return a BooleanArray")
Expand All @@ -297,13 +317,18 @@ impl CaseExpr {
let when_value = and(&when_value, &remainder)?;

// When no rows available for when clause, skip then clause
if when_value.true_count() == 0 {
let when_match_count = when_value.true_count();
if when_match_count == 0 {
continue;
}

let then_value = self.when_then_expr[i]
.1
.evaluate_selection(batch, &when_value)?;
let then_value = if when_match_count == batch.num_rows() {
self.when_then_expr[i].1.evaluate(batch)?
} else {
self.when_then_expr[i]
.1
.evaluate_selection(batch, &when_value)?
};

current_value = match then_value {
ColumnarValue::Scalar(ScalarValue::Null) => {
Expand All @@ -320,10 +345,14 @@ impl CaseExpr {
// Succeed tuples should be filtered out for short-circuit evaluation,
// null values for the current when expr should be kept
remainder = and_not(&remainder, &when_value)?;
remainder_count -= when_match_count;
if remainder_count == 0 {
break;
}
}

if let Some(e) = self.else_expr() {
if remainder.true_count() > 0 {
if remainder_count > 0 {
// keep `else_expr`'s data type and return type consistent
let expr = try_cast(Arc::clone(e), &batch.schema(), return_type.clone())?;
let else_ = expr
Expand Down