Skip to content

Commit e322839

Browse files
authored
Move PartitionSearchMode into datafusion_physical_plan, rename to InputOrderMode (#8364)
* Move PartitionSearchMode into datafusion_physical_plan * Improve comments * Rename to InputOrderMode * Update prost
1 parent 4ceb2de commit e322839

File tree

15 files changed

+188
-175
lines changed

15 files changed

+188
-175
lines changed

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,15 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
5353
use crate::physical_plan::windows::{
5454
get_best_fitting_window, BoundedWindowAggExec, WindowAggExec,
5555
};
56-
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
56+
use crate::physical_plan::{
57+
with_new_children_if_necessary, Distribution, ExecutionPlan, InputOrderMode,
58+
};
5759

5860
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
5961
use datafusion_common::{plan_err, DataFusionError};
6062
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
6163

6264
use datafusion_physical_plan::repartition::RepartitionExec;
63-
use datafusion_physical_plan::windows::PartitionSearchMode;
6465
use itertools::izip;
6566

6667
/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
@@ -611,7 +612,7 @@ fn analyze_window_sort_removal(
611612
window_expr.to_vec(),
612613
window_child,
613614
partitionby_exprs.to_vec(),
614-
PartitionSearchMode::Sorted,
615+
InputOrderMode::Sorted,
615616
)?) as _
616617
} else {
617618
Arc::new(WindowAggExec::try_new(

datafusion/core/src/physical_optimizer/test_utils.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::physical_plan::sorts::sort::SortExec;
3535
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
3636
use crate::physical_plan::union::UnionExec;
3737
use crate::physical_plan::windows::create_window_expr;
38-
use crate::physical_plan::{ExecutionPlan, Partitioning};
38+
use crate::physical_plan::{ExecutionPlan, InputOrderMode, Partitioning};
3939
use crate::prelude::{CsvReadOptions, SessionContext};
4040

4141
use arrow_schema::{Schema, SchemaRef, SortOptions};
@@ -44,7 +44,6 @@ use datafusion_execution::object_store::ObjectStoreUrl;
4444
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
4545
use datafusion_physical_expr::expressions::col;
4646
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
47-
use datafusion_physical_plan::windows::PartitionSearchMode;
4847

4948
use async_trait::async_trait;
5049

@@ -240,7 +239,7 @@ pub fn bounded_window_exec(
240239
.unwrap()],
241240
input.clone(),
242241
vec![],
243-
PartitionSearchMode::Sorted,
242+
InputOrderMode::Sorted,
244243
)
245244
.unwrap(),
246245
)

datafusion/core/src/physical_planner.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,10 @@ use crate::physical_plan::sorts::sort::SortExec;
6363
use crate::physical_plan::union::UnionExec;
6464
use crate::physical_plan::unnest::UnnestExec;
6565
use crate::physical_plan::values::ValuesExec;
66-
use crate::physical_plan::windows::{
67-
BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
68-
};
66+
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
6967
use crate::physical_plan::{
70-
aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan, Partitioning,
71-
PhysicalExpr, WindowExpr,
68+
aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan, InputOrderMode,
69+
Partitioning, PhysicalExpr, WindowExpr,
7270
};
7371

7472
use arrow::compute::SortOptions;
@@ -761,7 +759,7 @@ impl DefaultPhysicalPlanner {
761759
window_expr,
762760
input_exec,
763761
physical_partition_keys,
764-
PartitionSearchMode::Sorted,
762+
InputOrderMode::Sorted,
765763
)?)
766764
} else {
767765
Arc::new(WindowAggExec::try_new(

datafusion/core/tests/fuzz_cases/window_fuzz.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ use arrow::util::pretty::pretty_format_batches;
2525
use datafusion::physical_plan::memory::MemoryExec;
2626
use datafusion::physical_plan::sorts::sort::SortExec;
2727
use datafusion::physical_plan::windows::{
28-
create_window_expr, BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
28+
create_window_expr, BoundedWindowAggExec, WindowAggExec,
2929
};
30-
use datafusion::physical_plan::{collect, ExecutionPlan};
30+
use datafusion::physical_plan::{collect, ExecutionPlan, InputOrderMode};
3131
use datafusion::prelude::{SessionConfig, SessionContext};
3232
use datafusion_common::{Result, ScalarValue};
3333
use datafusion_expr::type_coercion::aggregates::coerce_types;
@@ -43,9 +43,7 @@ use hashbrown::HashMap;
4343
use rand::rngs::StdRng;
4444
use rand::{Rng, SeedableRng};
4545

46-
use datafusion_physical_plan::windows::PartitionSearchMode::{
47-
Linear, PartiallySorted, Sorted,
48-
};
46+
use datafusion_physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted};
4947

5048
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
5149
async fn window_bounded_window_random_comparison() -> Result<()> {
@@ -385,9 +383,9 @@ async fn run_window_test(
385383
random_seed: u64,
386384
partition_by_columns: Vec<&str>,
387385
orderby_columns: Vec<&str>,
388-
search_mode: PartitionSearchMode,
386+
search_mode: InputOrderMode,
389387
) -> Result<()> {
390-
let is_linear = !matches!(search_mode, PartitionSearchMode::Sorted);
388+
let is_linear = !matches!(search_mode, InputOrderMode::Sorted);
391389
let mut rng = StdRng::seed_from_u64(random_seed);
392390
let schema = input1[0].schema();
393391
let session_config = SessionConfig::new().with_batch_size(50);

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,9 @@ use crate::aggregates::{
2727
};
2828

2929
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
30-
use crate::windows::{
31-
get_ordered_partition_by_indices, get_window_mode, PartitionSearchMode,
32-
};
30+
use crate::windows::{get_ordered_partition_by_indices, get_window_mode};
3331
use crate::{
34-
DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
32+
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, Partitioning,
3533
SendableRecordBatchStream, Statistics,
3634
};
3735

@@ -304,7 +302,9 @@ pub struct AggregateExec {
304302
/// Execution metrics
305303
metrics: ExecutionPlanMetricsSet,
306304
required_input_ordering: Option<LexRequirement>,
307-
partition_search_mode: PartitionSearchMode,
305+
/// Describes how the input is ordered relative to the group by columns
306+
input_order_mode: InputOrderMode,
307+
/// Describe how the output is ordered
308308
output_ordering: Option<LexOrdering>,
309309
}
310310

@@ -409,15 +409,15 @@ fn get_aggregate_search_mode(
409409
aggr_expr: &mut [Arc<dyn AggregateExpr>],
410410
order_by_expr: &mut [Option<LexOrdering>],
411411
ordering_req: &mut Vec<PhysicalSortExpr>,
412-
) -> PartitionSearchMode {
412+
) -> InputOrderMode {
413413
let groupby_exprs = group_by
414414
.expr
415415
.iter()
416416
.map(|(item, _)| item.clone())
417417
.collect::<Vec<_>>();
418-
let mut partition_search_mode = PartitionSearchMode::Linear;
418+
let mut input_order_mode = InputOrderMode::Linear;
419419
if !group_by.is_single() || groupby_exprs.is_empty() {
420-
return partition_search_mode;
420+
return input_order_mode;
421421
}
422422

423423
if let Some((should_reverse, mode)) =
@@ -439,9 +439,9 @@ fn get_aggregate_search_mode(
439439
);
440440
*ordering_req = reverse_order_bys(ordering_req);
441441
}
442-
partition_search_mode = mode;
442+
input_order_mode = mode;
443443
}
444-
partition_search_mode
444+
input_order_mode
445445
}
446446

447447
/// Check whether group by expression contains all of the expression inside `requirement`
@@ -515,7 +515,7 @@ impl AggregateExec {
515515
&input.equivalence_properties(),
516516
)?;
517517
let mut ordering_req = requirement.unwrap_or(vec![]);
518-
let partition_search_mode = get_aggregate_search_mode(
518+
let input_order_mode = get_aggregate_search_mode(
519519
&group_by,
520520
&input,
521521
&mut aggr_expr,
@@ -567,7 +567,7 @@ impl AggregateExec {
567567
metrics: ExecutionPlanMetricsSet::new(),
568568
required_input_ordering,
569569
limit: None,
570-
partition_search_mode,
570+
input_order_mode,
571571
output_ordering,
572572
})
573573
}
@@ -767,8 +767,8 @@ impl DisplayAs for AggregateExec {
767767
write!(f, ", lim=[{limit}]")?;
768768
}
769769

770-
if self.partition_search_mode != PartitionSearchMode::Linear {
771-
write!(f, ", ordering_mode={:?}", self.partition_search_mode)?;
770+
if self.input_order_mode != InputOrderMode::Linear {
771+
write!(f, ", ordering_mode={:?}", self.input_order_mode)?;
772772
}
773773
}
774774
}
@@ -819,7 +819,7 @@ impl ExecutionPlan for AggregateExec {
819819
/// infinite, returns an error to indicate this.
820820
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
821821
if children[0] {
822-
if self.partition_search_mode == PartitionSearchMode::Linear {
822+
if self.input_order_mode == InputOrderMode::Linear {
823823
// Cannot run without breaking pipeline.
824824
plan_err!(
825825
"Aggregate Error: `GROUP BY` clauses with columns without ordering and GROUPING SETS are not supported for unbounded inputs."

datafusion/physical-plan/src/aggregates/order/mod.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use datafusion_physical_expr::{EmitTo, PhysicalSortExpr};
2323
mod full;
2424
mod partial;
2525

26-
use crate::windows::PartitionSearchMode;
26+
use crate::InputOrderMode;
2727
pub(crate) use full::GroupOrderingFull;
2828
pub(crate) use partial::GroupOrderingPartial;
2929

@@ -42,18 +42,16 @@ impl GroupOrdering {
4242
/// Create a `GroupOrdering` for the the specified ordering
4343
pub fn try_new(
4444
input_schema: &Schema,
45-
mode: &PartitionSearchMode,
45+
mode: &InputOrderMode,
4646
ordering: &[PhysicalSortExpr],
4747
) -> Result<Self> {
4848
match mode {
49-
PartitionSearchMode::Linear => Ok(GroupOrdering::None),
50-
PartitionSearchMode::PartiallySorted(order_indices) => {
49+
InputOrderMode::Linear => Ok(GroupOrdering::None),
50+
InputOrderMode::PartiallySorted(order_indices) => {
5151
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)
5252
.map(GroupOrdering::Partial)
5353
}
54-
PartitionSearchMode::Sorted => {
55-
Ok(GroupOrdering::Full(GroupOrderingFull::new()))
56-
}
54+
InputOrderMode::Sorted => Ok(GroupOrdering::Full(GroupOrderingFull::new())),
5755
}
5856
}
5957

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ impl GroupedHashAggregateStream {
346346
.find_longest_permutation(&agg_group_by.output_exprs());
347347
let group_ordering = GroupOrdering::try_new(
348348
&group_schema,
349-
&agg.partition_search_mode,
349+
&agg.input_order_mode,
350350
ordering.as_slice(),
351351
)?;
352352

datafusion/physical-plan/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub mod joins;
5858
pub mod limit;
5959
pub mod memory;
6060
pub mod metrics;
61+
mod ordering;
6162
pub mod projection;
6263
pub mod repartition;
6364
pub mod sorts;
@@ -72,6 +73,7 @@ pub mod windows;
7273

7374
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
7475
pub use crate::metrics::Metric;
76+
pub use crate::ordering::InputOrderMode;
7577
pub use crate::topk::TopK;
7678
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
7779

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
/// Specifies how the input to an aggregation or window operator is ordered
19+
/// relative to their `GROUP BY` or `PARTITION BY` expressions.
20+
///
21+
/// For example, if the existing ordering is `[a ASC, b ASC, c ASC]`
22+
///
23+
/// ## Window Functions
24+
/// - A `PARTITION BY b` clause can use `Linear` mode.
25+
/// - A `PARTITION BY a, c` or a `PARTITION BY c, a` can use
26+
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
27+
/// (The vector stores the index of `a` in the respective PARTITION BY expression.)
28+
/// - A `PARTITION BY a, b` or a `PARTITION BY b, a` can use `Sorted` mode.
29+
///
30+
/// ## Aggregations
31+
/// - A `GROUP BY b` clause can use `Linear` mode.
32+
/// - A `GROUP BY a, c` or a `GROUP BY BY c, a` can use
33+
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
34+
/// (The vector stores the index of `a` in the respective PARTITION BY expression.)
35+
/// - A `GROUP BY a, b` or a `GROUP BY b, a` can use `Sorted` mode.
36+
///
37+
/// Note these are the same examples as above, but with `GROUP BY` instead of
38+
/// `PARTITION BY` to make the examples easier to read.
39+
#[derive(Debug, Clone, PartialEq)]
40+
pub enum InputOrderMode {
41+
/// There is no partial permutation of the expressions satisfying the
42+
/// existing ordering.
43+
Linear,
44+
/// There is a partial permutation of the expressions satisfying the
45+
/// existing ordering. Indices describing the longest partial permutation
46+
/// are stored in the vector.
47+
PartiallySorted(Vec<usize>),
48+
/// There is a (full) permutation of the expressions satisfying the
49+
/// existing ordering.
50+
Sorted,
51+
}

0 commit comments

Comments
 (0)