Skip to content

Commit da1e7ae

Browse files
authored
better preserve statistics when applying limits (#17381)
1 parent fc5888b commit da1e7ae

File tree

8 files changed

+330
-47
lines changed

8 files changed

+330
-47
lines changed

datafusion/common/src/stats.rs

Lines changed: 292 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::fmt::{self, Debug, Display};
2222
use crate::{Result, ScalarValue};
2323

2424
use crate::error::_plan_err;
25-
use arrow::datatypes::{DataType, Schema, SchemaRef};
25+
use arrow::datatypes::{DataType, Schema};
2626

2727
/// Represents a value with a degree of certainty. `Precision` is used to
2828
/// propagate information the precision of statistical values.
@@ -391,13 +391,15 @@ impl Statistics {
391391
/// parameter to compute global statistics in a multi-partition setting.
392392
pub fn with_fetch(
393393
mut self,
394-
schema: SchemaRef,
395394
fetch: Option<usize>,
396395
skip: usize,
397396
n_partitions: usize,
398397
) -> Result<Self> {
399398
let fetch_val = fetch.unwrap_or(usize::MAX);
400399

400+
// Get the ratio of rows after / rows before on a per-partition basis
401+
let num_rows_before = self.num_rows;
402+
401403
self.num_rows = match self {
402404
Statistics {
403405
num_rows: Precision::Exact(nr),
@@ -431,8 +433,7 @@ impl Statistics {
431433
// At this point we know that we were given a `fetch` value
432434
// as the `None` case would go into the branch above. Since
433435
// the input has more rows than `fetch + skip`, the number
434-
// of rows will be the `fetch`, but we won't be able to
435-
// predict the other statistics.
436+
// of rows will be the `fetch`, other statistics will have to be downgraded to inexact.
436437
check_num_rows(
437438
fetch_val.checked_mul(n_partitions),
438439
// We know that we have an estimate for the number of rows:
@@ -445,8 +446,32 @@ impl Statistics {
445446
..
446447
} => check_num_rows(fetch.and_then(|v| v.checked_mul(n_partitions)), false),
447448
};
448-
self.column_statistics = Statistics::unknown_column(&schema);
449-
self.total_byte_size = Precision::Absent;
449+
let ratio: f64 = match (num_rows_before, self.num_rows) {
450+
(
451+
Precision::Exact(nr_before) | Precision::Inexact(nr_before),
452+
Precision::Exact(nr_after) | Precision::Inexact(nr_after),
453+
) => {
454+
if nr_before == 0 {
455+
0.0
456+
} else {
457+
nr_after as f64 / nr_before as f64
458+
}
459+
}
460+
_ => 0.0,
461+
};
462+
self.column_statistics = self
463+
.column_statistics
464+
.into_iter()
465+
.map(ColumnStatistics::to_inexact)
466+
.collect();
467+
// Adjust the total_byte_size for the ratio of rows before and after, also marking it as inexact
468+
self.total_byte_size = match &self.total_byte_size {
469+
Precision::Exact(n) | Precision::Inexact(n) => {
470+
let adjusted = (*n as f64 * ratio) as usize;
471+
Precision::Inexact(adjusted)
472+
}
473+
Precision::Absent => Precision::Absent,
474+
};
450475
Ok(self)
451476
}
452477

@@ -1199,4 +1224,265 @@ mod tests {
11991224
// Distinct count should be Absent after merge
12001225
assert_eq!(col_stats.distinct_count, Precision::Absent);
12011226
}
1227+
1228+
#[test]
1229+
fn test_with_fetch_basic_preservation() {
1230+
// Test that column statistics and byte size are preserved (as inexact) when applying fetch
1231+
let original_stats = Statistics {
1232+
num_rows: Precision::Exact(1000),
1233+
total_byte_size: Precision::Exact(8000),
1234+
column_statistics: vec![
1235+
ColumnStatistics {
1236+
null_count: Precision::Exact(10),
1237+
max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1238+
min_value: Precision::Exact(ScalarValue::Int32(Some(0))),
1239+
sum_value: Precision::Exact(ScalarValue::Int32(Some(5050))),
1240+
distinct_count: Precision::Exact(50),
1241+
},
1242+
ColumnStatistics {
1243+
null_count: Precision::Exact(20),
1244+
max_value: Precision::Exact(ScalarValue::Int64(Some(200))),
1245+
min_value: Precision::Exact(ScalarValue::Int64(Some(10))),
1246+
sum_value: Precision::Exact(ScalarValue::Int64(Some(10100))),
1247+
distinct_count: Precision::Exact(75),
1248+
},
1249+
],
1250+
};
1251+
1252+
// Apply fetch of 100 rows (10% of original)
1253+
let result = original_stats.clone().with_fetch(Some(100), 0, 1).unwrap();
1254+
1255+
// Check num_rows
1256+
assert_eq!(result.num_rows, Precision::Exact(100));
1257+
1258+
// Check total_byte_size is scaled proportionally and marked as inexact
1259+
// 100/1000 = 0.1, so 8000 * 0.1 = 800
1260+
assert_eq!(result.total_byte_size, Precision::Inexact(800));
1261+
1262+
// Check column statistics are preserved but marked as inexact
1263+
assert_eq!(result.column_statistics.len(), 2);
1264+
1265+
// First column
1266+
assert_eq!(
1267+
result.column_statistics[0].null_count,
1268+
Precision::Inexact(10)
1269+
);
1270+
assert_eq!(
1271+
result.column_statistics[0].max_value,
1272+
Precision::Inexact(ScalarValue::Int32(Some(100)))
1273+
);
1274+
assert_eq!(
1275+
result.column_statistics[0].min_value,
1276+
Precision::Inexact(ScalarValue::Int32(Some(0)))
1277+
);
1278+
assert_eq!(
1279+
result.column_statistics[0].sum_value,
1280+
Precision::Inexact(ScalarValue::Int32(Some(5050)))
1281+
);
1282+
assert_eq!(
1283+
result.column_statistics[0].distinct_count,
1284+
Precision::Inexact(50)
1285+
);
1286+
1287+
// Second column
1288+
assert_eq!(
1289+
result.column_statistics[1].null_count,
1290+
Precision::Inexact(20)
1291+
);
1292+
assert_eq!(
1293+
result.column_statistics[1].max_value,
1294+
Precision::Inexact(ScalarValue::Int64(Some(200)))
1295+
);
1296+
assert_eq!(
1297+
result.column_statistics[1].min_value,
1298+
Precision::Inexact(ScalarValue::Int64(Some(10)))
1299+
);
1300+
assert_eq!(
1301+
result.column_statistics[1].sum_value,
1302+
Precision::Inexact(ScalarValue::Int64(Some(10100)))
1303+
);
1304+
assert_eq!(
1305+
result.column_statistics[1].distinct_count,
1306+
Precision::Inexact(75)
1307+
);
1308+
}
1309+
1310+
#[test]
1311+
fn test_with_fetch_inexact_input() {
1312+
// Test that inexact input statistics remain inexact
1313+
let original_stats = Statistics {
1314+
num_rows: Precision::Inexact(1000),
1315+
total_byte_size: Precision::Inexact(8000),
1316+
column_statistics: vec![ColumnStatistics {
1317+
null_count: Precision::Inexact(10),
1318+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1319+
min_value: Precision::Inexact(ScalarValue::Int32(Some(0))),
1320+
sum_value: Precision::Inexact(ScalarValue::Int32(Some(5050))),
1321+
distinct_count: Precision::Inexact(50),
1322+
}],
1323+
};
1324+
1325+
let result = original_stats.clone().with_fetch(Some(500), 0, 1).unwrap();
1326+
1327+
// Check num_rows is inexact
1328+
assert_eq!(result.num_rows, Precision::Inexact(500));
1329+
1330+
// Check total_byte_size is scaled and inexact
1331+
// 500/1000 = 0.5, so 8000 * 0.5 = 4000
1332+
assert_eq!(result.total_byte_size, Precision::Inexact(4000));
1333+
1334+
// Column stats remain inexact
1335+
assert_eq!(
1336+
result.column_statistics[0].null_count,
1337+
Precision::Inexact(10)
1338+
);
1339+
}
1340+
1341+
#[test]
1342+
fn test_with_fetch_skip_all_rows() {
1343+
// Test when skip >= num_rows (all rows are skipped)
1344+
let original_stats = Statistics {
1345+
num_rows: Precision::Exact(100),
1346+
total_byte_size: Precision::Exact(800),
1347+
column_statistics: vec![col_stats_i64(10)],
1348+
};
1349+
1350+
let result = original_stats.clone().with_fetch(Some(50), 100, 1).unwrap();
1351+
1352+
assert_eq!(result.num_rows, Precision::Exact(0));
1353+
// When ratio is 0/100 = 0, byte size should be 0
1354+
assert_eq!(result.total_byte_size, Precision::Inexact(0));
1355+
}
1356+
1357+
#[test]
1358+
fn test_with_fetch_no_limit() {
1359+
// Test when fetch is None and skip is 0 (no limit applied)
1360+
let original_stats = Statistics {
1361+
num_rows: Precision::Exact(100),
1362+
total_byte_size: Precision::Exact(800),
1363+
column_statistics: vec![col_stats_i64(10)],
1364+
};
1365+
1366+
let result = original_stats.clone().with_fetch(None, 0, 1).unwrap();
1367+
1368+
// Stats should be unchanged when no fetch and no skip
1369+
assert_eq!(result.num_rows, Precision::Exact(100));
1370+
assert_eq!(result.total_byte_size, Precision::Exact(800));
1371+
}
1372+
1373+
#[test]
1374+
fn test_with_fetch_with_skip() {
1375+
// Test with both skip and fetch
1376+
let original_stats = Statistics {
1377+
num_rows: Precision::Exact(1000),
1378+
total_byte_size: Precision::Exact(8000),
1379+
column_statistics: vec![col_stats_i64(10)],
1380+
};
1381+
1382+
// Skip 200, fetch 300, so we get rows 200-500
1383+
let result = original_stats
1384+
.clone()
1385+
.with_fetch(Some(300), 200, 1)
1386+
.unwrap();
1387+
1388+
assert_eq!(result.num_rows, Precision::Exact(300));
1389+
// 300/1000 = 0.3, so 8000 * 0.3 = 2400
1390+
assert_eq!(result.total_byte_size, Precision::Inexact(2400));
1391+
}
1392+
1393+
#[test]
1394+
fn test_with_fetch_multi_partition() {
1395+
// Test with multiple partitions
1396+
let original_stats = Statistics {
1397+
num_rows: Precision::Exact(1000), // per partition
1398+
total_byte_size: Precision::Exact(8000),
1399+
column_statistics: vec![col_stats_i64(10)],
1400+
};
1401+
1402+
// Fetch 100 per partition, 4 partitions = 400 total
1403+
let result = original_stats.clone().with_fetch(Some(100), 0, 4).unwrap();
1404+
1405+
assert_eq!(result.num_rows, Precision::Exact(400));
1406+
// 400/1000 = 0.4, so 8000 * 0.4 = 3200
1407+
assert_eq!(result.total_byte_size, Precision::Inexact(3200));
1408+
}
1409+
1410+
#[test]
1411+
fn test_with_fetch_absent_stats() {
1412+
// Test with absent statistics
1413+
let original_stats = Statistics {
1414+
num_rows: Precision::Absent,
1415+
total_byte_size: Precision::Absent,
1416+
column_statistics: vec![ColumnStatistics {
1417+
null_count: Precision::Absent,
1418+
max_value: Precision::Absent,
1419+
min_value: Precision::Absent,
1420+
sum_value: Precision::Absent,
1421+
distinct_count: Precision::Absent,
1422+
}],
1423+
};
1424+
1425+
let result = original_stats.clone().with_fetch(Some(100), 0, 1).unwrap();
1426+
1427+
// With absent input stats, output should be inexact estimate
1428+
assert_eq!(result.num_rows, Precision::Inexact(100));
1429+
assert_eq!(result.total_byte_size, Precision::Absent);
1430+
// Column stats should remain absent
1431+
assert_eq!(result.column_statistics[0].null_count, Precision::Absent);
1432+
}
1433+
1434+
#[test]
1435+
fn test_with_fetch_fetch_exceeds_rows() {
1436+
// Test when fetch is larger than available rows after skip
1437+
let original_stats = Statistics {
1438+
num_rows: Precision::Exact(100),
1439+
total_byte_size: Precision::Exact(800),
1440+
column_statistics: vec![col_stats_i64(10)],
1441+
};
1442+
1443+
// Skip 50, fetch 100, but only 50 rows remain
1444+
let result = original_stats.clone().with_fetch(Some(100), 50, 1).unwrap();
1445+
1446+
assert_eq!(result.num_rows, Precision::Exact(50));
1447+
// 50/100 = 0.5, so 800 * 0.5 = 400
1448+
assert_eq!(result.total_byte_size, Precision::Inexact(400));
1449+
}
1450+
1451+
#[test]
1452+
fn test_with_fetch_preserves_all_column_stats() {
1453+
// Comprehensive test that all column statistic fields are preserved
1454+
let original_col_stats = ColumnStatistics {
1455+
null_count: Precision::Exact(42),
1456+
max_value: Precision::Exact(ScalarValue::Int32(Some(999))),
1457+
min_value: Precision::Exact(ScalarValue::Int32(Some(-100))),
1458+
sum_value: Precision::Exact(ScalarValue::Int32(Some(123456))),
1459+
distinct_count: Precision::Exact(789),
1460+
};
1461+
1462+
let original_stats = Statistics {
1463+
num_rows: Precision::Exact(1000),
1464+
total_byte_size: Precision::Exact(8000),
1465+
column_statistics: vec![original_col_stats.clone()],
1466+
};
1467+
1468+
let result = original_stats.with_fetch(Some(250), 0, 1).unwrap();
1469+
1470+
let result_col_stats = &result.column_statistics[0];
1471+
1472+
// All values should be preserved but marked as inexact
1473+
assert_eq!(result_col_stats.null_count, Precision::Inexact(42));
1474+
assert_eq!(
1475+
result_col_stats.max_value,
1476+
Precision::Inexact(ScalarValue::Int32(Some(999)))
1477+
);
1478+
assert_eq!(
1479+
result_col_stats.min_value,
1480+
Precision::Inexact(ScalarValue::Int32(Some(-100)))
1481+
);
1482+
assert_eq!(
1483+
result_col_stats.sum_value,
1484+
Precision::Inexact(ScalarValue::Int32(Some(123456)))
1485+
);
1486+
assert_eq!(result_col_stats.distinct_count, Precision::Inexact(789));
1487+
}
12021488
}

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,17 +265,19 @@ async fn sql_filter() -> Result<()> {
265265
#[tokio::test]
266266
async fn sql_limit() -> Result<()> {
267267
let (stats, schema) = fully_defined();
268-
let col_stats = Statistics::unknown_column(&schema);
269268
let ctx = init_ctx(stats.clone(), schema)?;
270269

271270
let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").await.unwrap();
272271
let physical_plan = df.create_physical_plan().await.unwrap();
273-
// when the limit is smaller than the original number of lines
274-
// we loose all statistics except the for number of rows which becomes the limit
272+
// when the limit is smaller than the original number of lines we mark the statistics as inexact
275273
assert_eq!(
276274
Statistics {
277275
num_rows: Precision::Exact(5),
278-
column_statistics: col_stats,
276+
column_statistics: stats
277+
.column_statistics
278+
.iter()
279+
.map(|c| c.clone().to_inexact())
280+
.collect(),
279281
total_byte_size: Precision::Absent
280282
},
281283
physical_plan.partition_statistics(None)?

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -490,11 +490,20 @@ mod test {
490490
.map(|idx| local_limit.partition_statistics(Some(idx)))
491491
.collect::<Result<Vec<_>>>()?;
492492
assert_eq!(statistics.len(), 2);
493-
let schema = scan.schema();
494-
let mut expected_statistic_partition = Statistics::new_unknown(&schema);
495-
expected_statistic_partition.num_rows = Precision::Exact(1);
496-
assert_eq!(statistics[0], expected_statistic_partition);
497-
assert_eq!(statistics[1], expected_statistic_partition);
493+
let mut expected_0 = statistics[0].clone();
494+
expected_0.column_statistics = expected_0
495+
.column_statistics
496+
.into_iter()
497+
.map(|c| c.to_inexact())
498+
.collect();
499+
let mut expected_1 = statistics[1].clone();
500+
expected_1.column_statistics = expected_1
501+
.column_statistics
502+
.into_iter()
503+
.map(|c| c.to_inexact())
504+
.collect();
505+
assert_eq!(statistics[0], expected_0);
506+
assert_eq!(statistics[1], expected_1);
498507
Ok(())
499508
}
500509

0 commit comments

Comments
 (0)