Skip to content

Commit d18df33

Browse files
committed
feature: Add a WindowUDFImpl::simplfy() API
Signed-off-by: guojidan <[email protected]>
1 parent 9487ca0 commit d18df33

File tree

3 files changed

+114
-3
lines changed

3 files changed

+114
-3
lines changed

datafusion-examples/examples/advanced_udwf.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use datafusion::error::Result;
2626
use datafusion::prelude::*;
2727
use datafusion_common::ScalarValue;
2828
use datafusion_expr::{
29-
PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
29+
expr::WindowFunction,
30+
simplify::{ExprSimplifyResult, SimplifyInfo},
31+
AggregateFunction, PartitionEvaluator, Signature, WindowFrame, WindowUDF,
32+
WindowUDFImpl,
3033
};
3134

3235
/// This example shows how to use the full WindowUDFImpl API to implement a user
@@ -75,6 +78,30 @@ impl WindowUDFImpl for SmoothItUdf {
7578
Ok(DataType::Float64)
7679
}
7780

81+
/// rewrite
82+
fn simplify(
83+
&self,
84+
args: Vec<Expr>,
85+
partition_by: &[Expr],
86+
order_by: &[Expr],
87+
window_frame: &WindowFrame,
88+
null_treatment: &Option<datafusion_sql::sqlparser::ast::NullTreatment>,
89+
_info: &dyn SimplifyInfo,
90+
) -> Result<ExprSimplifyResult> {
91+
Ok(ExprSimplifyResult::Simplified(Expr::WindowFunction(
92+
WindowFunction {
93+
fun: datafusion_expr::WindowFunctionDefinition::AggregateFunction(
94+
AggregateFunction::Avg,
95+
),
96+
args,
97+
partition_by: partition_by.to_vec(),
98+
order_by: order_by.to_vec(),
99+
window_frame: window_frame.clone(),
100+
null_treatment: *null_treatment,
101+
},
102+
)))
103+
}
104+
78105
/// Create a `PartitionEvalutor` to evaluate this function on a new
79106
/// partition.
80107
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
@@ -144,7 +171,7 @@ async fn create_context() -> Result<SessionContext> {
144171

145172
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
146173
println!("pwd: {}", std::env::current_dir().unwrap().display());
147-
let csv_path = "../../datafusion/core/tests/data/cars.csv".to_string();
174+
let csv_path = "datafusion/core/tests/data/cars.csv".to_string();
148175
let read_options = CsvReadOptions::default().has_header(true);
149176

150177
ctx.register_csv("cars", &csv_path, read_options).await?;

datafusion/expr/src/udwf.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
//! [`WindowUDF`]: User Defined Window Functions
1919
2020
use crate::{
21+
simplify::{ExprSimplifyResult, SimplifyInfo},
2122
Expr, PartitionEvaluator, PartitionEvaluatorFactory, ReturnTypeFunction, Signature,
2223
WindowFrame,
2324
};
2425
use arrow::datatypes::DataType;
2526
use datafusion_common::Result;
27+
use sqlparser::ast::NullTreatment;
2628
use std::{
2729
any::Any,
2830
fmt::{self, Debug, Display, Formatter},
@@ -170,6 +172,28 @@ impl WindowUDF {
170172
self.inner.return_type(args)
171173
}
172174

175+
/// Do the function rewrite
176+
///
177+
/// See [`WindowUDFImpl::simplify`] for more details.
178+
pub fn simplify(
179+
&self,
180+
args: Vec<Expr>,
181+
partition_by: &[Expr],
182+
order_by: &[Expr],
183+
window_frame: &WindowFrame,
184+
null_treatment: &Option<NullTreatment>,
185+
info: &dyn SimplifyInfo,
186+
) -> Result<ExprSimplifyResult> {
187+
self.inner.simplify(
188+
args,
189+
partition_by,
190+
order_by,
191+
window_frame,
192+
null_treatment,
193+
info,
194+
)
195+
}
196+
173197
/// Return a `PartitionEvaluator` for evaluating this window function
174198
pub fn partition_evaluator_factory(&self) -> Result<Box<dyn PartitionEvaluator>> {
175199
self.inner.partition_evaluator()
@@ -266,6 +290,35 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
266290
fn aliases(&self) -> &[String] {
267291
&[]
268292
}
293+
294+
/// Optionally apply per-UDWF simplification / rewrite rules.
295+
///
296+
/// This can be used to apply function specific simplification rules during
297+
/// optimization. The default implementation does nothing.
298+
///
299+
/// Note that DataFusion handles simplifying arguments and "constant
300+
/// folding" (replacing a function call with constant arguments such as
301+
/// `my_add(1,2) --> 3` ). Thus, there is no need to implement such
302+
/// optimizations manually for specific UDFs.
303+
///
304+
/// Example:
305+
/// [`advanced_udwf.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udwf.rs
306+
///
307+
/// # Returns
308+
/// [`ExprSimplifyResult`] indicating the result of the simplification NOTE
309+
/// if the function cannot be simplified, the arguments *MUST* be returned
310+
/// unmodified
311+
fn simplify(
312+
&self,
313+
args: Vec<Expr>,
314+
_partition_by: &[Expr],
315+
_order_by: &[Expr],
316+
_window_frame: &WindowFrame,
317+
_null_treatment: &Option<NullTreatment>,
318+
_info: &dyn SimplifyInfo,
319+
) -> Result<ExprSimplifyResult> {
320+
Ok(ExprSimplifyResult::Original(args))
321+
}
269322
}
270323

271324
/// WindowUDF that adds an alias to the underlying function. It is better to

datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,16 @@ use datafusion_common::{
4040
use datafusion_common::{
4141
internal_err, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
4242
};
43-
use datafusion_expr::expr::{InList, InSubquery};
4443
use datafusion_expr::simplify::ExprSimplifyResult;
4544
use datafusion_expr::{
4645
and, lit, or, BinaryExpr, BuiltinScalarFunction, Case, ColumnarValue, Expr, Like,
4746
Operator, ScalarFunctionDefinition, Volatility,
4847
};
4948
use datafusion_expr::{expr::ScalarFunction, interval_arithmetic::NullableInterval};
49+
use datafusion_expr::{
50+
expr::{InList, InSubquery, WindowFunction},
51+
WindowFunctionDefinition,
52+
};
5053
use datafusion_physical_expr::{create_physical_expr, execution_props::ExecutionProps};
5154

5255
/// This structure handles API for expression simplification
@@ -1353,6 +1356,34 @@ impl<'a, S: SimplifyInfo> TreeNodeRewriter for Simplifier<'a, S> {
13531356
))),
13541357
},
13551358

1359+
Expr::WindowFunction(WindowFunction {
1360+
fun: WindowFunctionDefinition::WindowUDF(udwf),
1361+
args,
1362+
partition_by,
1363+
order_by,
1364+
window_frame,
1365+
null_treatment,
1366+
}) => match udwf.simplify(
1367+
args,
1368+
&partition_by,
1369+
&order_by,
1370+
&window_frame,
1371+
&null_treatment,
1372+
info,
1373+
)? {
1374+
ExprSimplifyResult::Original(args) => {
1375+
Transformed::no(Expr::WindowFunction(WindowFunction {
1376+
fun: WindowFunctionDefinition::WindowUDF(udwf),
1377+
args,
1378+
partition_by,
1379+
order_by,
1380+
window_frame,
1381+
null_treatment,
1382+
}))
1383+
}
1384+
ExprSimplifyResult::Simplified(expr) => Transformed::yes(expr),
1385+
},
1386+
13561387
//
13571388
// Rules for Between
13581389
//

0 commit comments

Comments
 (0)