Skip to content

Commit 485b80e

Browse files
authored
[Minor]: Remove input_schema field from window executor (#7810)
* Initial commit * Remove input schema from proto
1 parent 3ccbcfc commit 485b80e

File tree

12 files changed

+5
-75
lines changed

12 files changed

+5
-75
lines changed

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -608,20 +608,17 @@ fn analyze_window_sort_removal(
608608
add_sort_above(&mut window_child, sort_expr, None)?;
609609

610610
let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
611-
let input_schema = window_child.schema();
612611
let new_window = if uses_bounded_memory {
613612
Arc::new(BoundedWindowAggExec::try_new(
614613
window_expr.to_vec(),
615614
window_child,
616-
input_schema,
617615
partitionby_exprs.to_vec(),
618616
PartitionSearchMode::Sorted,
619617
)?) as _
620618
} else {
621619
Arc::new(WindowAggExec::try_new(
622620
window_expr.to_vec(),
623621
window_child,
624-
input_schema,
625622
partitionby_exprs.to_vec(),
626623
)?) as _
627624
};

datafusion/core/src/physical_optimizer/test_utils.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ pub fn bounded_window_exec(
238238
)
239239
.unwrap()],
240240
input.clone(),
241-
input.schema(),
242241
vec![],
243242
crate::physical_plan::windows::PartitionSearchMode::Sorted,
244243
)

datafusion/core/src/physical_planner.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -751,15 +751,13 @@ impl DefaultPhysicalPlanner {
751751
Arc::new(BoundedWindowAggExec::try_new(
752752
window_expr,
753753
input_exec,
754-
physical_input_schema,
755754
physical_partition_keys,
756755
PartitionSearchMode::Sorted,
757756
)?)
758757
} else {
759758
Arc::new(WindowAggExec::try_new(
760759
window_expr,
761760
input_exec,
762-
physical_input_schema,
763761
physical_partition_keys,
764762
)?)
765763
})

datafusion/core/tests/fuzz_cases/window_fuzz.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,6 @@ async fn run_window_test(
461461
)
462462
.unwrap()],
463463
exec1,
464-
schema.clone(),
465464
vec![],
466465
)
467466
.unwrap(),
@@ -484,7 +483,6 @@ async fn run_window_test(
484483
)
485484
.unwrap()],
486485
exec2,
487-
schema.clone(),
488486
vec![],
489487
search_mode,
490488
)

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,6 @@ pub struct BoundedWindowAggExec {
8888
window_expr: Vec<Arc<dyn WindowExpr>>,
8989
/// Schema after the window is run
9090
schema: SchemaRef,
91-
/// Schema before the window
92-
input_schema: SchemaRef,
9391
/// Partition Keys
9492
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
9593
/// Execution metrics
@@ -110,11 +108,10 @@ impl BoundedWindowAggExec {
110108
pub fn try_new(
111109
window_expr: Vec<Arc<dyn WindowExpr>>,
112110
input: Arc<dyn ExecutionPlan>,
113-
input_schema: SchemaRef,
114111
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
115112
partition_search_mode: PartitionSearchMode,
116113
) -> Result<Self> {
117-
let schema = create_schema(&input_schema, &window_expr)?;
114+
let schema = create_schema(&input.schema(), &window_expr)?;
118115
let schema = Arc::new(schema);
119116
let partition_by_exprs = window_expr[0].partition_by();
120117
let ordered_partition_by_indices = match &partition_search_mode {
@@ -140,7 +137,6 @@ impl BoundedWindowAggExec {
140137
input,
141138
window_expr,
142139
schema,
143-
input_schema,
144140
partition_keys,
145141
metrics: ExecutionPlanMetricsSet::new(),
146142
partition_search_mode,
@@ -158,11 +154,6 @@ impl BoundedWindowAggExec {
158154
&self.input
159155
}
160156

161-
/// Get the input schema before any window functions are applied
162-
pub fn input_schema(&self) -> SchemaRef {
163-
self.input_schema.clone()
164-
}
165-
166157
/// Return the output sort order of partition keys: For example
167158
/// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
168159
// We are sure that partition by columns are always at the beginning of sort_keys
@@ -303,7 +294,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
303294
Ok(Arc::new(BoundedWindowAggExec::try_new(
304295
self.window_expr.clone(),
305296
children[0].clone(),
306-
self.input_schema.clone(),
307297
self.partition_keys.clone(),
308298
self.partition_search_mode.clone(),
309299
)?))
@@ -333,7 +323,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
333323
fn statistics(&self) -> Statistics {
334324
let input_stat = self.input.statistics();
335325
let win_cols = self.window_expr.len();
336-
let input_cols = self.input_schema.fields().len();
326+
let input_cols = self.input.schema().fields().len();
337327
// TODO stats: some windowing function will maintain invariants such as min, max...
338328
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
339329
if let Some(input_col_stats) = input_stat.column_statistics {

datafusion/physical-plan/src/windows/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,6 @@ pub fn get_best_fitting_window(
421421
Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
422422
window_expr,
423423
input.clone(),
424-
input.schema(),
425424
physical_partition_keys.to_vec(),
426425
partition_search_mode,
427426
)?) as _))
@@ -435,7 +434,6 @@ pub fn get_best_fitting_window(
435434
Ok(Some(Arc::new(WindowAggExec::try_new(
436435
window_expr,
437436
input.clone(),
438-
input.schema(),
439437
physical_partition_keys.to_vec(),
440438
)?) as _))
441439
}
@@ -759,7 +757,6 @@ mod tests {
759757
schema.as_ref(),
760758
)?],
761759
blocking_exec,
762-
schema,
763760
vec![],
764761
)?);
765762

datafusion/physical-plan/src/windows/window_agg_exec.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ pub struct WindowAggExec {
5959
window_expr: Vec<Arc<dyn WindowExpr>>,
6060
/// Schema after the window is run
6161
schema: SchemaRef,
62-
/// Schema before the window
63-
input_schema: SchemaRef,
6462
/// Partition Keys
6563
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
6664
/// Execution metrics
@@ -75,10 +73,9 @@ impl WindowAggExec {
7573
pub fn try_new(
7674
window_expr: Vec<Arc<dyn WindowExpr>>,
7775
input: Arc<dyn ExecutionPlan>,
78-
input_schema: SchemaRef,
7976
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
8077
) -> Result<Self> {
81-
let schema = create_schema(&input_schema, &window_expr)?;
78+
let schema = create_schema(&input.schema(), &window_expr)?;
8279
let schema = Arc::new(schema);
8380

8481
let ordered_partition_by_indices =
@@ -87,7 +84,6 @@ impl WindowAggExec {
8784
input,
8885
window_expr,
8986
schema,
90-
input_schema,
9187
partition_keys,
9288
metrics: ExecutionPlanMetricsSet::new(),
9389
ordered_partition_by_indices,
@@ -104,11 +100,6 @@ impl WindowAggExec {
104100
&self.input
105101
}
106102

107-
/// Get the input schema before any window functions are applied
108-
pub fn input_schema(&self) -> SchemaRef {
109-
self.input_schema.clone()
110-
}
111-
112103
/// Return the output sort order of partition keys: For example
113104
/// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
114105
// We are sure that partition by columns are always at the beginning of sort_keys
@@ -230,7 +221,6 @@ impl ExecutionPlan for WindowAggExec {
230221
Ok(Arc::new(WindowAggExec::try_new(
231222
self.window_expr.clone(),
232223
children[0].clone(),
233-
self.input_schema.clone(),
234224
self.partition_keys.clone(),
235225
)?))
236226
}
@@ -259,7 +249,7 @@ impl ExecutionPlan for WindowAggExec {
259249
fn statistics(&self) -> Statistics {
260250
let input_stat = self.input.statistics();
261251
let win_cols = self.window_expr.len();
262-
let input_cols = self.input_schema.fields().len();
252+
let input_cols = self.input.schema().fields().len();
263253
// TODO stats: some windowing function will maintain invariants such as min, max...
264254
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
265255
if let Some(input_col_stats) = input_stat.column_statistics {

datafusion/proto/proto/datafusion.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1419,7 +1419,6 @@ message PartiallySortedPartitionSearchMode {
14191419
message WindowAggExecNode {
14201420
PhysicalPlanNode input = 1;
14211421
repeated PhysicalWindowExprNode window_expr = 2;
1422-
Schema input_schema = 4;
14231422
repeated PhysicalExprNode partition_keys = 5;
14241423
// Set optional to `None` for `BoundedWindowAggExec`.
14251424
oneof partition_search_mode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 0 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)