Skip to content

Commit 0fcd077

Browse files
authored
Fix compute_record_batch_statistics wrong with projection (#8489)
* Minor: Improve the document format of JoinHashMap * fix `compute_record_batch_statistics` wrong with `projection` * fix test * fix test
1 parent b71bec0 commit 0fcd077

File tree

3 files changed

+57
-44
lines changed

3 files changed

+57
-44
lines changed

datafusion/physical-plan/src/common.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::{ColumnStatistics, ExecutionPlan, Statistics};
3030
use arrow::datatypes::Schema;
3131
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
3232
use arrow::record_batch::RecordBatch;
33+
use arrow_array::Array;
3334
use datafusion_common::stats::Precision;
3435
use datafusion_common::{plan_err, DataFusionError, Result};
3536
use datafusion_execution::memory_pool::MemoryReservation;
@@ -139,17 +140,22 @@ pub fn compute_record_batch_statistics(
139140
) -> Statistics {
140141
let nb_rows = batches.iter().flatten().map(RecordBatch::num_rows).sum();
141142

142-
let total_byte_size = batches
143-
.iter()
144-
.flatten()
145-
.map(|b| b.get_array_memory_size())
146-
.sum();
147-
148143
let projection = match projection {
149144
Some(p) => p,
150145
None => (0..schema.fields().len()).collect(),
151146
};
152147

148+
let total_byte_size = batches
149+
.iter()
150+
.flatten()
151+
.map(|b| {
152+
projection
153+
.iter()
154+
.map(|index| b.column(*index).get_array_memory_size())
155+
.sum::<usize>()
156+
})
157+
.sum();
158+
153159
let mut column_statistics = vec![ColumnStatistics::new_unknown(); projection.len()];
154160

155161
for partition in batches.iter() {
@@ -388,6 +394,7 @@ mod tests {
388394
datatypes::{DataType, Field, Schema},
389395
record_batch::RecordBatch,
390396
};
397+
use arrow_array::UInt64Array;
391398
use datafusion_expr::Operator;
392399
use datafusion_physical_expr::expressions::{col, Column};
393400

@@ -685,20 +692,30 @@ mod tests {
685692
let schema = Arc::new(Schema::new(vec![
686693
Field::new("f32", DataType::Float32, false),
687694
Field::new("f64", DataType::Float64, false),
695+
Field::new("u64", DataType::UInt64, false),
688696
]));
689697
let batch = RecordBatch::try_new(
690698
Arc::clone(&schema),
691699
vec![
692700
Arc::new(Float32Array::from(vec![1., 2., 3.])),
693701
Arc::new(Float64Array::from(vec![9., 8., 7.])),
702+
Arc::new(UInt64Array::from(vec![4, 5, 6])),
694703
],
695704
)?;
705+
706+
// just select f32,f64
707+
let select_projection = Some(vec![0, 1]);
708+
let byte_size = batch
709+
.project(&select_projection.clone().unwrap())
710+
.unwrap()
711+
.get_array_memory_size();
712+
696713
let actual =
697-
compute_record_batch_statistics(&[vec![batch]], &schema, Some(vec![0, 1]));
714+
compute_record_batch_statistics(&[vec![batch]], &schema, select_projection);
698715

699-
let mut expected = Statistics {
716+
let expected = Statistics {
700717
num_rows: Precision::Exact(3),
701-
total_byte_size: Precision::Exact(464), // this might change a bit if the way we compute the size changes
718+
total_byte_size: Precision::Exact(byte_size),
702719
column_statistics: vec![
703720
ColumnStatistics {
704721
distinct_count: Precision::Absent,
@@ -715,9 +732,6 @@ mod tests {
715732
],
716733
};
717734

718-
// Prevent test flakiness due to undefined / changing implementation details
719-
expected.total_byte_size = actual.total_byte_size.clone();
720-
721735
assert_eq!(actual, expected);
722736
Ok(())
723737
}

datafusion/sqllogictest/test_files/groupby.slt

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2021,14 +2021,15 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
20212021
----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4
20222022
------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallySorted([0])
20232023
--------------SortExec: expr=[col0@3 ASC NULLS LAST]
2024-
----------------CoalesceBatchesExec: target_batch_size=8192
2025-
------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
2026-
--------------------CoalesceBatchesExec: target_batch_size=8192
2027-
----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
2028-
------------------------MemoryExec: partitions=1, partition_sizes=[3]
2029-
--------------------CoalesceBatchesExec: target_batch_size=8192
2030-
----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
2031-
------------------------MemoryExec: partitions=1, partition_sizes=[3]
2024+
----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1]
2025+
------------------CoalesceBatchesExec: target_batch_size=8192
2026+
--------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
2027+
----------------------CoalesceBatchesExec: target_batch_size=8192
2028+
------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
2029+
--------------------------MemoryExec: partitions=1, partition_sizes=[3]
2030+
----------------------CoalesceBatchesExec: target_batch_size=8192
2031+
------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
2032+
--------------------------MemoryExec: partitions=1, partition_sizes=[3]
20322033

20332034
# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
20342035
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
@@ -2709,9 +2710,9 @@ SortExec: expr=[sn@2 ASC NULLS LAST]
27092710
--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate]
27102711
----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount)]
27112712
------SortExec: expr=[sn@5 ASC NULLS LAST]
2712-
--------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, sn@5 as sn, amount@8 as amount]
2713+
--------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount]
27132714
----------CoalesceBatchesExec: target_batch_size=8192
2714-
------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@4, currency@2)], filter=ts@0 >= ts@1
2715+
------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1
27152716
--------------MemoryExec: partitions=1, partition_sizes=[1]
27162717
--------------MemoryExec: partitions=1, partition_sizes=[1]
27172718

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1569,15 +1569,13 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
15691569
----TableScan: join_t1 projection=[t1_id, t1_name]
15701570
----TableScan: join_t2 projection=[t2_id]
15711571
physical_plan
1572-
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@3 as t2_id, t1_name@1 as t1_name]
1572+
ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name]
15731573
--CoalesceBatchesExec: target_batch_size=2
1574-
----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)]
1575-
------CoalescePartitionsExec
1576-
--------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
1577-
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
1578-
------------MemoryExec: partitions=1, partition_sizes=[1]
1579-
------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
1580-
--------MemoryExec: partitions=1, partition_sizes=[1]
1574+
----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t2_id@0, join_t1.t1_id + UInt32(11)@2)]
1575+
------MemoryExec: partitions=1, partition_sizes=[1]
1576+
------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
1577+
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
1578+
----------MemoryExec: partitions=1, partition_sizes=[1]
15811579

15821580
statement ok
15831581
set datafusion.optimizer.repartition_joins = true;
@@ -1595,18 +1593,18 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
15951593
----TableScan: join_t1 projection=[t1_id, t1_name]
15961594
----TableScan: join_t2 projection=[t2_id]
15971595
physical_plan
1598-
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@3 as t2_id, t1_name@1 as t1_name]
1596+
ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name]
15991597
--CoalesceBatchesExec: target_batch_size=2
1600-
----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)]
1598+
----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t2_id@0, join_t1.t1_id + UInt32(11)@2)]
1599+
------CoalesceBatchesExec: target_batch_size=2
1600+
--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
1601+
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
1602+
------------MemoryExec: partitions=1, partition_sizes=[1]
16011603
------CoalesceBatchesExec: target_batch_size=2
16021604
--------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(11)@2], 2), input_partitions=2
16031605
----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
16041606
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
16051607
--------------MemoryExec: partitions=1, partition_sizes=[1]
1606-
------CoalesceBatchesExec: target_batch_size=2
1607-
--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
1608-
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
1609-
------------MemoryExec: partitions=1, partition_sizes=[1]
16101608

16111609
# Right side expr key inner join
16121610

@@ -2821,13 +2819,13 @@ physical_plan
28212819
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
28222820
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
28232821
----CoalesceBatchesExec: target_batch_size=2
2824-
------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
2822+
------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
28252823
--------CoalesceBatchesExec: target_batch_size=2
2826-
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
2824+
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
28272825
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
28282826
--------------MemoryExec: partitions=1, partition_sizes=[1]
28292827
--------CoalesceBatchesExec: target_batch_size=2
2830-
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
2828+
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
28312829
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
28322830
--------------MemoryExec: partitions=1, partition_sizes=[1]
28332831

@@ -2862,13 +2860,13 @@ physical_plan
28622860
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
28632861
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
28642862
----CoalesceBatchesExec: target_batch_size=2
2865-
------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
2863+
------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
28662864
--------CoalesceBatchesExec: target_batch_size=2
2867-
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
2865+
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
28682866
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
28692867
--------------MemoryExec: partitions=1, partition_sizes=[1]
28702868
--------CoalesceBatchesExec: target_batch_size=2
2871-
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
2869+
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
28722870
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
28732871
--------------MemoryExec: partitions=1, partition_sizes=[1]
28742872

@@ -2924,7 +2922,7 @@ physical_plan
29242922
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
29252923
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
29262924
----CoalesceBatchesExec: target_batch_size=2
2927-
------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
2925+
------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
29282926
--------MemoryExec: partitions=1, partition_sizes=[1]
29292927
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
29302928
----------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2960,7 +2958,7 @@ physical_plan
29602958
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
29612959
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
29622960
----CoalesceBatchesExec: target_batch_size=2
2963-
------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
2961+
------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
29642962
--------MemoryExec: partitions=1, partition_sizes=[1]
29652963
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
29662964
----------MemoryExec: partitions=1, partition_sizes=[1]

0 commit comments

Comments
 (0)