diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index b7d66e4f2789..c840780a7434 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -603,11 +603,23 @@ mod tests { let dic_array = DictionaryArray::::try_new(keys, Arc::new(values))?; let c_dic: ArrayRef = Arc::new(dic_array); - let batch1 = RecordBatch::try_from_iter(vec![("c_dic", c_dic)])?; + // Data for column string_truncation: ["a".repeat(128), null, "b".repeat(128), null] + let string_truncation: ArrayRef = Arc::new(StringArray::from(vec![ + Some("a".repeat(128)), + None, + Some("b".repeat(128)), + None, + ])); + + let batch1 = RecordBatch::try_from_iter(vec![ + ("c_dic", c_dic), + ("string_truncation", string_truncation), + ])?; // Use store_parquet to write each batch to its own file // . batch1 written into first file and includes: // - column c_dic that has 4 rows with no null. Stats min and max of dictionary column is available. + // - column string_truncation that has 4 rows with 2 nulls. Stats min and max of string column is available but not exact. let store = Arc::new(RequestCountingObjectStore::new(Arc::new( LocalFileSystem::new(), ))); @@ -647,6 +659,19 @@ mod tests { Precision::Exact(Utf8(Some("a".into()))) ); + // column string_truncation + let string_truncation_stats = &stats.column_statistics[1]; + + assert_eq!(string_truncation_stats.null_count, Precision::Exact(2)); + assert_eq!( + string_truncation_stats.max_value, + Precision::Inexact(ScalarValue::Utf8View(Some("b".repeat(63) + "c"))) + ); + assert_eq!( + string_truncation_stats.min_value, + Precision::Inexact(ScalarValue::Utf8View(Some("a".repeat(64)))) + ); + Ok(()) } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index ab4d84ee368e..56718534a558 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -19,13 +19,13 @@ use std::any::Any; use std::cell::RefCell; -use std::fmt; use std::fmt::Debug; use std::ops::Range; use std::rc::Rc; use std::sync::Arc; +use std::{fmt, vec}; -use arrow::array::RecordBatch; +use arrow::array::{ArrayRef, BooleanArray, RecordBatch}; use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; @@ -36,7 +36,8 @@ use datafusion_datasource::write::{ use datafusion_datasource::file_format::{FileFormat, FileFormatFactory}; use datafusion_datasource::write::demux::DemuxedStreamReceiver; -use arrow::compute::sum; +use arrow::compute::kernels::cmp::eq; +use arrow::compute::{and, sum}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; #[cfg(feature = "parquet_encryption")] @@ -46,7 +47,7 @@ use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics, - DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION, + DataFusionError, GetExt, HashSet, Result, ScalarValue, DEFAULT_PARQUET_EXTENSION, }; use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; @@ -1170,7 +1171,8 @@ pub async fn fetch_statistics( /// # When only some columns have statistics: /// /// For columns with statistics: -/// - Min/max values are properly extracted and represented as Precision::Exact +/// - Min/max values are properly extracted and represented as [Precision::Exact] or [Precision::Inexact] +/// depending on the `is_max_value_exact` and `is_min_value_exact` flags. /// - Null counts are calculated by summing across row groups /// /// For columns without statistics, @@ -1216,6 +1218,8 @@ pub fn statistics_from_parquet_meta_calc( let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()]; + let mut is_max_value_exact = vec![Some(true); table_schema.fields().len()]; + let mut is_min_value_exact = vec![Some(true); table_schema.fields().len()]; table_schema .fields() @@ -1228,10 +1232,15 @@ pub fn statistics_from_parquet_meta_calc( file_metadata.schema_descr(), ) { Ok(stats_converter) => { + let mut accumulators = StatisticsAccumulators { + min_accs: &mut min_accs, + max_accs: &mut max_accs, + null_counts_array: &mut null_counts_array, + is_min_value_exact: &mut is_min_value_exact, + is_max_value_exact: &mut is_max_value_exact, + }; summarize_min_max_null_counts( - &mut min_accs, - &mut max_accs, - &mut null_counts_array, + &mut accumulators, idx, num_rows, &stats_converter, @@ -1251,6 +1260,8 @@ pub fn statistics_from_parquet_meta_calc( null_counts_array, &mut max_accs, &mut min_accs, + &mut is_max_value_exact, + &mut is_min_value_exact, ) } else { Statistics::unknown_column(&table_schema) @@ -1264,21 +1275,39 @@ fn get_col_stats( null_counts: Vec>, max_values: &mut [Option], min_values: &mut [Option], + is_max_value_exact: &mut [Option], + is_min_value_exact: &mut [Option], ) -> Vec { (0..schema.fields().len()) .map(|i| { - let max_value = match max_values.get_mut(i).unwrap() { - Some(max_value) => max_value.evaluate().ok(), - None => None, + let max_value = match ( + max_values.get_mut(i).unwrap(), + is_max_value_exact.get(i).unwrap(), + ) { + (Some(max_value), Some(true)) => { + max_value.evaluate().ok().map(Precision::Exact) + } + (Some(max_value), Some(false)) | (Some(max_value), None) => { + max_value.evaluate().ok().map(Precision::Inexact) + } + (None, _) => None, }; - let min_value = match min_values.get_mut(i).unwrap() { - Some(min_value) => min_value.evaluate().ok(), - None => None, + let min_value = match ( + min_values.get_mut(i).unwrap(), + is_min_value_exact.get(i).unwrap(), + ) { + (Some(min_value), Some(true)) => { + min_value.evaluate().ok().map(Precision::Exact) + } + (Some(min_value), Some(false)) | (Some(min_value), None) => { + min_value.evaluate().ok().map(Precision::Inexact) + } + (None, _) => None, }; ColumnStatistics { null_count: null_counts[i], - max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent), - min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent), + max_value: max_value.unwrap_or(Precision::Absent), + min_value: min_value.unwrap_or(Precision::Absent), sum_value: Precision::Absent, distinct_count: Precision::Absent, } @@ -1286,10 +1315,17 @@ fn get_col_stats( .collect() } +/// Holds the accumulator state for collecting statistics from row groups +struct StatisticsAccumulators<'a> { + min_accs: &'a mut [Option], + max_accs: &'a mut [Option], + null_counts_array: &'a mut [Precision], + is_min_value_exact: &'a mut [Option], + is_max_value_exact: &'a mut [Option], +} + fn summarize_min_max_null_counts( - min_accs: &mut [Option], - max_accs: &mut [Option], - null_counts_array: &mut [Precision], + accumulators: &mut StatisticsAccumulators, arrow_schema_index: usize, num_rows: usize, stats_converter: &StatisticsConverter, @@ -1298,19 +1334,36 @@ fn summarize_min_max_null_counts( let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; let min_values = stats_converter.row_group_mins(row_groups_metadata)?; let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; - - if let Some(max_acc) = &mut max_accs[arrow_schema_index] { - max_acc.update_batch(&[max_values])?; + let is_max_value_exact_stat = + stats_converter.row_group_is_max_value_exact(row_groups_metadata)?; + let is_min_value_exact_stat = + stats_converter.row_group_is_min_value_exact(row_groups_metadata)?; + + if let Some(max_acc) = &mut accumulators.max_accs[arrow_schema_index] { + max_acc.update_batch(&[Arc::clone(&max_values)])?; + let mut cur_max_acc = max_acc.clone(); + accumulators.is_max_value_exact[arrow_schema_index] = has_any_exact_match( + cur_max_acc.evaluate()?, + max_values, + is_max_value_exact_stat, + ); } - if let Some(min_acc) = &mut min_accs[arrow_schema_index] { - min_acc.update_batch(&[min_values])?; + if let Some(min_acc) = &mut accumulators.min_accs[arrow_schema_index] { + min_acc.update_batch(&[Arc::clone(&min_values)])?; + let mut cur_min_acc = min_acc.clone(); + accumulators.is_min_value_exact[arrow_schema_index] = has_any_exact_match( + cur_min_acc.evaluate()?, + min_values, + is_min_value_exact_stat, + ); } - null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) { - Some(null_count) => null_count as usize, - None => num_rows, - }); + accumulators.null_counts_array[arrow_schema_index] = + Precision::Exact(match sum(&null_counts) { + Some(null_count) => null_count as usize, + None => num_rows, + }); Ok(()) } @@ -1967,12 +2020,38 @@ fn create_max_min_accs( (max_values, min_values) } +/// Checks if any occurrence of `value` in `array` corresponds to a `true` +/// entry in the `exactness` array. +/// +/// This is used to determine if a calculated statistic (e.g., min or max) +/// is exact, by checking if at least one of its source values was exact. +/// +/// # Example +/// - `value`: `0` +/// - `array`: `[0, 1, 0, 3, 0, 5]` +/// - `exactness`: `[true, false, false, false, false, false]` +/// +/// The value `0` appears at indices `[0, 2, 4]`. The corresponding exactness +/// values are `[true, false, false]`. Since at least one is `true`, the +/// function returns `Some(true)`. +fn has_any_exact_match( + value: ScalarValue, + array: ArrayRef, + exactness: BooleanArray, +) -> Option { + let scalar_array = value.to_scalar().ok()?; + let eq_mask = eq(&scalar_array, &array).ok()?; + let combined_mask = and(&eq_mask, &exactness).ok()?; + Some(combined_mask.true_count() > 0) +} + #[cfg(test)] mod tests { use std::sync::Arc; use super::*; + use arrow::array::{BooleanArray, Int32Array}; use arrow::datatypes::DataType; use parquet::schema::parser::parse_message_type; @@ -2182,4 +2261,51 @@ mod tests { assert_eq!(result, expected_schema); } + + #[test] + fn test_has_any_exact_match() { + // Case 1: Mixed exact and inexact matches + { + let computed_min = ScalarValue::Int32(Some(0)); + let row_group_mins = + Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef; + let exactness = + BooleanArray::from(vec![true, false, false, false, false, false]); + + let result = has_any_exact_match(computed_min, row_group_mins, exactness); + assert_eq!(result, Some(true)); + } + // Case 2: All inexact matches + { + let computed_min = ScalarValue::Int32(Some(0)); + let row_group_mins = + Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef; + let exactness = + BooleanArray::from(vec![false, false, false, false, false, false]); + + let result = has_any_exact_match(computed_min, row_group_mins, exactness); + assert_eq!(result, Some(false)); + } + // Case 3: All exact matches + { + let computed_max = ScalarValue::Int32(Some(5)); + let row_group_maxes = + Arc::new(Int32Array::from(vec![1, 5, 3, 5, 2, 5])) as ArrayRef; + let exactness = + BooleanArray::from(vec![false, true, true, true, false, true]); + + let result = has_any_exact_match(computed_max, row_group_maxes, exactness); + assert_eq!(result, Some(true)); + } + // Case 4: All maxes are null values + { + let computed_max = ScalarValue::Int32(None); + let row_group_maxes = + Arc::new(Int32Array::from(vec![None, None, None, None])) as ArrayRef; + let exactness = BooleanArray::from(vec![None, Some(true), None, Some(false)]); + + let result = has_any_exact_match(computed_max, row_group_maxes, exactness); + assert_eq!(result, Some(false)); + } + } } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 042efedfc124..1edf10dfee30 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -736,7 +736,7 @@ macro_rules! min_max { } /// An accumulator to compute the maximum value -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MaxAccumulator { max: ScalarValue, } @@ -1057,7 +1057,7 @@ impl AggregateUDFImpl for Min { } /// An accumulator to compute the minimum value -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MinAccumulator { min: ScalarValue, }