Skip to content

Commit 3ff9b52

Browse files
committed
Fix count on all null VALUES clause (apache#13029)
* Test Count accumulator with all-nulls * Fix count on null values Before the change, the `ValuesExec` containing `NullArray` would incorrectly report column statistics as being non-null, which would misinform `AggregateStatistics` optimizer and fold `count(always_null)` into row count instead of 0. This commit fixes the column statistics derivation for values with `NullArray` and therefore fixes execution of logical plans with count over such values. Note that the bug was not reproducible using DataFusion SQL frontend, because in DataFusion SQL the `VALUES (NULL)` doesn't have type `DataType:Null` (it has some apparently arbitrarily picked type instead). As a follow-up, all usages of `Array:null_count` should be inspected. The function can easily be misused (it returns "physical nulls", which do not exist for null type).
1 parent de130ee commit 3ff9b52

File tree

6 files changed

+166
-1
lines changed

6 files changed

+166
-1
lines changed

datafusion/core/tests/core_integration.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ mod dataframe;
2424
/// Run all tests that are found in the `macro_hygiene` directory
2525
mod macro_hygiene;
2626

27+
/// Run all tests that are found in the `execution` directory
28+
mod execution;
29+
2730
/// Run all tests that are found in the `expr_api` directory
2831
mod expr_api;
2932

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_array::Int64Array;
19+
use arrow_schema::{DataType, Field};
20+
use datafusion::execution::session_state::SessionStateBuilder;
21+
use datafusion_common::{Column, DFSchema, Result, ScalarValue};
22+
use datafusion_execution::TaskContext;
23+
use datafusion_expr::expr::AggregateFunction;
24+
use datafusion_expr::logical_plan::{LogicalPlan, Values};
25+
use datafusion_expr::{Aggregate, AggregateUDF, Expr};
26+
use datafusion_functions_aggregate::count::Count;
27+
use datafusion_physical_plan::collect;
28+
use std::collections::HashMap;
29+
use std::fmt::Debug;
30+
use std::ops::Deref;
31+
use std::sync::Arc;
32+
33+
///! Logical plans need to provide stable semantics, as downstream projects
34+
///! create them and depend on them. Test executable semantics of logical plans.
35+
36+
#[tokio::test]
37+
async fn count_only_nulls() -> Result<()> {
38+
// Input: VALUES (NULL), (NULL), (NULL) AS _(col)
39+
let input_schema = Arc::new(DFSchema::from_unqualified_fields(
40+
vec![Field::new("col", DataType::Null, true)].into(),
41+
HashMap::new(),
42+
)?);
43+
let input = Arc::new(LogicalPlan::Values(Values {
44+
schema: input_schema,
45+
values: vec![
46+
vec![Expr::Literal(ScalarValue::Null)],
47+
vec![Expr::Literal(ScalarValue::Null)],
48+
vec![Expr::Literal(ScalarValue::Null)],
49+
],
50+
}));
51+
let input_col_ref = Expr::Column(Column {
52+
relation: None,
53+
name: "col".to_string(),
54+
});
55+
56+
// Aggregation: count(col) AS count
57+
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
58+
input,
59+
vec![],
60+
vec![Expr::AggregateFunction(AggregateFunction {
61+
func: Arc::new(AggregateUDF::new_from_impl(Count::new())),
62+
args: vec![input_col_ref],
63+
distinct: false,
64+
filter: None,
65+
order_by: None,
66+
null_treatment: None,
67+
})],
68+
)?);
69+
70+
// Execute and verify results
71+
let session_state = SessionStateBuilder::new().build();
72+
let physical_plan = session_state.create_physical_plan(&aggregate).await?;
73+
let result =
74+
collect(physical_plan, Arc::new(TaskContext::from(&session_state))).await?;
75+
76+
let result = only(result.as_slice());
77+
let result_schema = result.schema();
78+
let field = only(result_schema.fields().deref());
79+
let column = only(result.columns());
80+
81+
assert_eq!(field.data_type(), &DataType::Int64); // TODO should be UInt64
82+
assert_eq!(column.deref(), &Int64Array::from(vec![0]));
83+
84+
Ok(())
85+
}
86+
87+
fn only<T>(elements: &[T]) -> &T
88+
where
89+
T: Debug,
90+
{
91+
let [element] = elements else {
92+
panic!("Expected exactly one element, got {:?}", elements);
93+
};
94+
element
95+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
mod logical_plan;

datafusion/functions-aggregate/src/count.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,3 +639,17 @@ impl Accumulator for DistinctCountAccumulator {
639639
}
640640
}
641641
}
642+
643+
#[cfg(test)]
644+
mod tests {
645+
use super::*;
646+
use arrow::array::NullArray;
647+
648+
#[test]
649+
fn count_accumulator_nulls() -> Result<()> {
650+
let mut accumulator = CountAccumulator::new();
651+
accumulator.update_batch(&[Arc::new(NullArray::new(10))])?;
652+
assert_eq!(accumulator.evaluate()?, ScalarValue::Int64(Some(0)));
653+
Ok(())
654+
}
655+
}

datafusion/physical-plan/src/common.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,11 @@ pub fn compute_record_batch_statistics(
156156
for partition in batches.iter() {
157157
for batch in partition {
158158
for (stat_index, col_index) in projection.iter().enumerate() {
159-
null_counts[stat_index] += batch.column(*col_index).null_count();
159+
null_counts[stat_index] += batch
160+
.column(*col_index)
161+
.logical_nulls()
162+
.map(|nulls| nulls.null_count())
163+
.unwrap_or_default();
160164
}
161165
}
162166
}

datafusion/physical-plan/src/values.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ mod tests {
215215
use crate::test::{self, make_partition};
216216

217217
use arrow_schema::{DataType, Field};
218+
use datafusion_common::stats::{ColumnStatistics, Precision};
218219

219220
#[tokio::test]
220221
async fn values_empty_case() -> Result<()> {
@@ -265,4 +266,34 @@ mod tests {
265266
let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]])
266267
.unwrap_err();
267268
}
269+
270+
#[test]
271+
fn values_stats_with_nulls_only() -> Result<()> {
272+
let data = vec![
273+
vec![lit(ScalarValue::Null)],
274+
vec![lit(ScalarValue::Null)],
275+
vec![lit(ScalarValue::Null)],
276+
];
277+
let rows = data.len();
278+
let values = ValuesExec::try_new(
279+
Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
280+
data,
281+
)?;
282+
283+
assert_eq!(
284+
values.statistics()?,
285+
Statistics {
286+
num_rows: Precision::Exact(rows),
287+
total_byte_size: Precision::Exact(8), // not important
288+
column_statistics: vec![ColumnStatistics {
289+
null_count: Precision::Exact(rows), // there are only nulls
290+
distinct_count: Precision::Absent,
291+
max_value: Precision::Absent,
292+
min_value: Precision::Absent,
293+
},],
294+
}
295+
);
296+
297+
Ok(())
298+
}
268299
}

0 commit comments

Comments
 (0)