Skip to content

Commit e5c1777

Browse files
committed
fix: support nullable columns in pre-sorted data sources
1 parent 9bf6bc4 commit e5c1777

File tree

4 files changed

+47
-29
lines changed

4 files changed

+47
-29
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1895,13 +1895,28 @@ mod tests {
18951895
struct File {
18961896
name: &'static str,
18971897
date: &'static str,
1898-
statistics: Vec<Option<(f64, f64)>>,
1898+
statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
18991899
}
19001900
impl File {
19011901
fn new(
19021902
name: &'static str,
19031903
date: &'static str,
19041904
statistics: Vec<Option<(f64, f64)>>,
1905+
) -> Self {
1906+
Self::new_nullable(
1907+
name,
1908+
date,
1909+
statistics
1910+
.into_iter()
1911+
.map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1912+
.collect(),
1913+
)
1914+
}
1915+
1916+
fn new_nullable(
1917+
name: &'static str,
1918+
date: &'static str,
1919+
statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
19051920
) -> Self {
19061921
Self {
19071922
name,
@@ -1968,21 +1983,35 @@ mod tests {
19681983
sort: vec![col("value").sort(false, true)],
19691984
expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
19701985
},
1971-
// reject nullable sort columns
19721986
TestCase {
1973-
name: "no nullable sort columns",
1987+
name: "nullable sort columns, nulls last",
19741988
file_schema: Schema::new(vec![Field::new(
19751989
"value".to_string(),
19761990
DataType::Float64,
1977-
true, // should fail because nullable
1991+
true,
19781992
)]),
19791993
files: vec![
1980-
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1981-
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1982-
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1994+
File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
1995+
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1996+
File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
19831997
],
19841998
sort: vec![col("value").sort(true, false)],
1985-
expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column")
1999+
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
2000+
},
2001+
TestCase {
2002+
name: "nullable sort columns, nulls first",
2003+
file_schema: Schema::new(vec![Field::new(
2004+
"value".to_string(),
2005+
DataType::Float64,
2006+
true,
2007+
)]),
2008+
files: vec![
2009+
File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
2010+
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
2011+
File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
2012+
],
2013+
sort: vec![col("value").sort(true, true)],
2014+
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
19862015
},
19872016
TestCase {
19882017
name: "all three non-overlapping",
@@ -2142,12 +2171,12 @@ mod tests {
21422171
.map(|stats| {
21432172
stats
21442173
.map(|(min, max)| ColumnStatistics {
2145-
min_value: Precision::Exact(ScalarValue::from(
2146-
min,
2147-
)),
2148-
max_value: Precision::Exact(ScalarValue::from(
2149-
max,
2150-
)),
2174+
min_value: Precision::Exact(
2175+
ScalarValue::Float64(min),
2176+
),
2177+
max_value: Precision::Exact(
2178+
ScalarValue::Float64(max),
2179+
),
21512180
..Default::default()
21522181
})
21532182
.unwrap_or_default()

datafusion/datasource/src/statistics.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,7 @@ impl MinMaxStatistics {
230230
.zip(sort_columns.iter().copied())
231231
.map(|(sort_expr, column)| {
232232
let schema = values.schema();
233-
234233
let idx = schema.index_of(column.name())?;
235-
let field = schema.field(idx);
236-
237-
// check that sort columns are non-nullable
238-
if field.is_nullable() {
239-
return plan_err!("cannot sort by nullable column");
240-
}
241234

242235
Ok(SortColumn {
243236
values: Arc::clone(values.column(idx)),

datafusion/sqllogictest/test_files/parquet.slt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,7 @@ STORED AS PARQUET;
130130
----
131131
3
132132

133-
# Check output plan again, expect no "output_ordering" clause in the physical_plan -> ParquetExec,
134-
# due to there being more files than partitions:
133+
# Check output plan again
135134
query TT
136135
EXPLAIN SELECT int_col, string_col
137136
FROM test_table
@@ -142,8 +141,7 @@ logical_plan
142141
02)--TableScan: test_table projection=[int_col, string_col]
143142
physical_plan
144143
01)SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST]
145-
02)--SortExec: expr=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], preserve_partitioning=[true]
146-
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], file_type=parquet
144+
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], file_type=parquet
147145

148146

149147
# Perform queries using MIN and MAX

datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ logical_plan
120120
02)--TableScan: test_table projection=[int_col, bigint_col, nulls_first_col, nulls_last_col]
121121
physical_plan
122122
01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST]
123-
02)--SortExec: expr=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST], preserve_partitioning=[true]
124-
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[int_col, bigint_col, nulls_first_col, nulls_last_col], file_type=parquet
123+
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST], file_type=parquet
125124

126125
# Another planning test, but project on a column with unsupported statistics
127126
# We should be able to ignore this and look at only the relevant statistics
@@ -138,8 +137,7 @@ logical_plan
138137
physical_plan
139138
01)ProjectionExec: expr=[string_col@0 as string_col]
140139
02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST]
141-
03)----SortExec: expr=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST], preserve_partitioning=[true]
142-
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[string_col, int_col, bigint_col, nulls_first_col, nulls_last_col], file_type=parquet
140+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[string_col, int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST], file_type=parquet
143141

144142
# Clean up & recreate but sort on descending column
145143
statement ok

0 commit comments

Comments
 (0)