Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1615,10 +1615,12 @@ mod tests {

#[tokio::test]
async fn test_insert_into_append_new_json_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
helper_test_append_new_files_to_table(
FileType::JSON,
FileCompressionType::UNCOMPRESSED,
None,
Some(config_map),
)
.await?;
Ok(())
Expand All @@ -1637,21 +1639,25 @@ mod tests {

#[tokio::test]
async fn test_insert_into_append_new_csv_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
helper_test_append_new_files_to_table(
FileType::CSV,
FileCompressionType::UNCOMPRESSED,
None,
Some(config_map),
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_append_new_parquet_files_defaults() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
None,
Some(config_map),
)
.await?;
Ok(())
Expand Down Expand Up @@ -1838,6 +1844,7 @@ mod tests {
"datafusion.execution.parquet.write_batch_size".into(),
"5".into(),
);
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
Expand Down
34 changes: 22 additions & 12 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ fn add_hash_on_top(
// until Repartition(Hash).
dist_onward: &mut Option<ExecTree>,
input_idx: usize,
repartition_beneficial_stats: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
if n_target == input.output_partitioning().partition_count() && n_target == 1 {
// In this case adding a hash repartition is unnecessary as the hash
Expand All @@ -1044,9 +1045,13 @@ fn add_hash_on_top(
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.bounded_order_preserving_variants`).
let should_preserve_ordering = input.output_ordering().is_some();
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
let mut new_plan = add_roundrobin_on_top(input, n_target, dist_onward, 0)?;
let mut new_plan = if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
add_roundrobin_on_top(input, n_target, dist_onward, 0)?
} else {
input
};
new_plan = Arc::new(
RepartitionExec::try_new(new_plan, Partitioning::Hash(hash_exprs, n_target))?
.with_preserve_order(should_preserve_ordering),
Expand Down Expand Up @@ -1223,6 +1228,7 @@ fn ensure_distribution(
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
let batch_size = config.execution.batch_size;
let is_unbounded = unbounded_output(&dist_context.plan);
// Use order preserving variants either of the conditions true
// - it is desired according to config
Expand All @@ -1233,13 +1239,7 @@ fn ensure_distribution(
if dist_context.plan.children().is_empty() {
return Ok(Transformed::No(dist_context));
}
// Don't need to apply when the returned row count is not greater than 1:
let stats = dist_context.plan.statistics();
let mut repartition_beneficial_stat = true;
if stats.is_exact {
repartition_beneficial_stat =
stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
}

// Remove unnecessary repartition from the physical plan if any
let DistributionContext {
mut plan,
Expand All @@ -1263,7 +1263,6 @@ fn ensure_distribution(
plan = updated_window;
}
};

let n_children = plan.children().len();
// This loop iterates over all the children to:
// - Increase parallelism for every child if it is beneficial.
Expand All @@ -1289,9 +1288,19 @@ fn ensure_distribution(
maintains,
child_idx,
)| {
// Don't need to apply when the returned row count is not greater than 1:
let stats = child.statistics();
let repartition_beneficial_stats = if stats.is_exact {
stats
.num_rows
.map(|num_rows| num_rows > batch_size)
.unwrap_or(true)
} else {
true
};
if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stat)
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count, it is not beneficial:
&& child.output_partitioning().partition_count() < target_partitions
{
Expand Down Expand Up @@ -1340,6 +1349,7 @@ fn ensure_distribution(
target_partitions,
dist_onward,
child_idx,
repartition_beneficial_stats,
)?;
}
Distribution::UnspecifiedDistribution => {}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/sql/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,16 @@ ORDER BY 1, 2;
" AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=2",
" AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
" ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=2",
" AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
];
Expand Down
21 changes: 15 additions & 6 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,12 +1021,21 @@ impl ExecutionPlan for AggregateExec {
..Default::default()
}
}
_ => Statistics {
// the output row count is surely not larger than its input row count
num_rows: self.input.statistics().num_rows,
is_exact: false,
..Default::default()
},
_ => {
let input_stats = self.input.statistics();
// Input statistics is exact and number of rows not greater than 1:
let is_exact = input_stats.is_exact
&& (input_stats
.num_rows
.map(|num_rows| num_rows == 1)
.unwrap_or(false));
Statistics {
// the output row count is surely not larger than its input row count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it matters, but the output rows could be larger than the input rows for COUNT(*) queries -- specifically if there are no input rows, COUNT(*) still produces an output row 🤔

❯ create table t(x int) as values (1);
0 rows in set. Query took 0.001 seconds.

❯ select count(*) from t where x > 1000;
+----------+
| COUNT(*) |
+----------+
| 0        |
+----------+

Copy link
Contributor Author

@mustafasrepo mustafasrepo Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right. I missed that. I think the safest way is to check num_rows == 1. Changed accordingly

num_rows: self.input.statistics().num_rows,
is_exact,
..Default::default()
}
}
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2342,8 +2342,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]


Expand Down Expand Up @@ -2397,8 +2397,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
Expand All @@ -2416,8 +2416,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
Expand All @@ -2435,8 +2435,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
Expand All @@ -2454,8 +2454,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TI
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ CopyTo: format=parquet output_url=test_files/scratch/copy/table single_file_outp
--TableScan: source_table projection=[col1, col2]
physical_plan
InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
--MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--MemoryExec: partitions=1, partition_sizes=[1]

# Error case
query error DataFusion error: Invalid or Unsupported Configuration: Format not explicitly set and unable to get file extension!
Expand Down
Loading