-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add standalone example for OptimizerRule
#11087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b07ea5c
0a32950
e15b4ad
f397046
73c87a6
98b1be0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,213 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; | ||
| use arrow_schema::DataType; | ||
| use datafusion::prelude::SessionContext; | ||
| use datafusion_common::tree_node::{Transformed, TreeNode}; | ||
| use datafusion_common::{Result, ScalarValue}; | ||
| use datafusion_expr::{ | ||
| BinaryExpr, ColumnarValue, Expr, LogicalPlan, Operator, ScalarUDF, ScalarUDFImpl, | ||
| Signature, Volatility, | ||
| }; | ||
| use datafusion_optimizer::optimizer::ApplyOrder; | ||
| use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; | ||
| use std::any::Any; | ||
| use std::sync::Arc; | ||
|
|
||
| /// This example demonstrates how to add your own [`OptimizerRule`] | ||
| /// to DataFusion. | ||
| /// | ||
| /// [`OptimizerRule`]s transform [`LogicalPlan`]s into an equivalent (but | ||
| /// hopefully faster) form. | ||
| /// | ||
| /// See [analyzer_rule.rs] for an example of AnalyzerRules, which are for | ||
| /// changing plan semantics. | ||
| #[tokio::main] | ||
| pub async fn main() -> Result<()> { | ||
| // DataFusion includes many built in OptimizerRules for tasks such as outer | ||
| // to inner join conversion and constant folding. | ||
| // | ||
| // Note you can change the order of optimizer rules using the lower level | ||
| // `SessionState` API | ||
| let ctx = SessionContext::new(); | ||
| ctx.add_optimizer_rule(Arc::new(MyOptimizerRule {})); | ||
|
|
||
| // Now, let's plan and run queries with the new rule | ||
| ctx.register_batch("person", person_batch())?; | ||
| let sql = "SELECT * FROM person WHERE age = 22"; | ||
| let plan = ctx.sql(sql).await?.into_optimized_plan()?; | ||
|
|
||
| // We can see the effect of our rewrite on the output plan that the filter | ||
| // has been rewritten to `my_eq` | ||
| // | ||
| // Filter: my_eq(person.age, Int32(22)) | ||
| // TableScan: person projection=[name, age] | ||
| println!("Logical Plan:\n\n{}\n", plan.display_indent()); | ||
|
|
||
| // The query below doesn't respect a filter `where age = 22` because | ||
| // the plan has been rewritten using UDF which returns always true | ||
| // | ||
| // And the output verifies the predicates have been changed (as the my_eq | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // function always returns true) | ||
| // | ||
| // +--------+-----+ | ||
| // | name | age | | ||
| // +--------+-----+ | ||
| // | Andy | 11 | | ||
| // | Andrew | 22 | | ||
| // | Oleks | 33 | | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| // +--------+-----+ | ||
| ctx.sql(sql).await?.show().await?; | ||
|
|
||
| // however we can see the rule doesn't trigger for queries with predicates | ||
| // other than `=` | ||
| // | ||
| // +-------+-----+ | ||
| // | name | age | | ||
| // +-------+-----+ | ||
| // | Andy | 11 | | ||
| // | Oleks | 33 | | ||
| // +-------+-----+ | ||
| ctx.sql("SELECT * FROM person WHERE age <> 22") | ||
| .await? | ||
| .show() | ||
| .await?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// An example OptimizerRule that replaces all `col = <const>` predicates with a | ||
| /// user defined function | ||
| struct MyOptimizerRule {} | ||
|
|
||
| impl OptimizerRule for MyOptimizerRule { | ||
| fn name(&self) -> &str { | ||
| "my_optimizer_rule" | ||
| } | ||
|
|
||
| // New OptimizerRules should use the "rewrite" api as it is more efficient | ||
| fn supports_rewrite(&self) -> bool { | ||
| true | ||
| } | ||
|
|
||
| /// Ask the optimizer to handle the plan recursion. `rewrite` will be called | ||
| /// on each plan node. | ||
| fn apply_order(&self) -> Option<ApplyOrder> { | ||
| Some(ApplyOrder::BottomUp) | ||
| } | ||
|
|
||
| fn rewrite( | ||
| &self, | ||
| plan: LogicalPlan, | ||
| _config: &dyn OptimizerConfig, | ||
| ) -> Result<Transformed<LogicalPlan>> { | ||
| plan.map_expressions(|expr| { | ||
| // This closure is called for all expressions in the current plan | ||
| // | ||
| // For example, given a plan like `SELECT a + b, 5 + 10` | ||
| // | ||
| // The closure would be called twice: | ||
| // 1. once for `a + b` | ||
| // 2. once for `5 + 10` | ||
| self.rewrite_expr(expr) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| impl MyOptimizerRule { | ||
| /// Rewrites an Expr replacing all `<col> = <const>` expressions with | ||
| /// a call to my_eq udf | ||
| fn rewrite_expr(&self, expr: Expr) -> Result<Transformed<Expr>> { | ||
| // do a bottom up rewrite of the expression tree | ||
| expr.transform_up(|expr| { | ||
| // Closure called for each sub tree | ||
| match expr { | ||
| Expr::BinaryExpr(binary_expr) if is_binary_eq(&binary_expr) => { | ||
| // destruture the expression | ||
| let BinaryExpr { left, op: _, right } = binary_expr; | ||
| // rewrite to `my_eq(left, right)` | ||
| let udf = ScalarUDF::new_from_impl(MyEq::new()); | ||
| let call = udf.call(vec![*left, *right]); | ||
| Ok(Transformed::yes(call)) | ||
| } | ||
| _ => Ok(Transformed::no(expr)), | ||
| } | ||
| }) | ||
| // Note that the TreeNode API handles propagating the transformed flag | ||
| // and errors up the call chain | ||
| } | ||
| } | ||
|
|
||
| /// return true of the expression is an equality expression for a literal or | ||
| /// column reference | ||
| fn is_binary_eq(binary_expr: &BinaryExpr) -> bool { | ||
| binary_expr.op == Operator::Eq | ||
| && is_lit_or_col(binary_expr.left.as_ref()) | ||
| && is_lit_or_col(binary_expr.right.as_ref()) | ||
| } | ||
|
|
||
| /// Return true if the expression is a literal or column reference | ||
| fn is_lit_or_col(expr: &Expr) -> bool { | ||
| matches!(expr, Expr::Column(_) | Expr::Literal(_)) | ||
| } | ||
|
|
||
| /// A simple user defined filter function | ||
| #[derive(Debug, Clone)] | ||
| struct MyEq { | ||
| signature: Signature, | ||
| } | ||
|
|
||
| impl MyEq { | ||
| fn new() -> Self { | ||
| Self { | ||
| signature: Signature::any(2, Volatility::Stable), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl ScalarUDFImpl for MyEq { | ||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
| } | ||
|
|
||
| fn name(&self) -> &str { | ||
| "my_eq" | ||
| } | ||
|
|
||
| fn signature(&self) -> &Signature { | ||
| &self.signature | ||
| } | ||
|
|
||
| fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { | ||
| Ok(DataType::Boolean) | ||
| } | ||
|
|
||
| fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> { | ||
| // this example simply returns "true" which is not what a real | ||
| // implementation would do. | ||
| Ok(ColumnarValue::Scalar(ScalarValue::from(true))) | ||
| } | ||
| } | ||
|
|
||
| /// Return a RecordBatch with made up data | ||
| fn person_batch() -> RecordBatch { | ||
| let name: ArrayRef = | ||
| Arc::new(StringArray::from_iter_values(["Andy", "Andrew", "Oleks"])); | ||
| let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22, 33])); | ||
| RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,7 +75,7 @@ use url::Url; | |
| pub use datafusion_execution::config::SessionConfig; | ||
| pub use datafusion_execution::TaskContext; | ||
| pub use datafusion_expr::execution_props::ExecutionProps; | ||
| use datafusion_optimizer::AnalyzerRule; | ||
| use datafusion_optimizer::{AnalyzerRule, OptimizerRule}; | ||
|
|
||
| mod avro; | ||
| mod csv; | ||
|
|
@@ -332,13 +332,21 @@ impl SessionContext { | |
| self | ||
| } | ||
|
|
||
| /// Adds an analyzer rule to the `SessionState` in the current `SessionContext`. | ||
| pub fn add_analyzer_rule( | ||
| self, | ||
| analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>, | ||
| ) -> Self { | ||
| /// Adds an optimizer rule to the end of the existing rules. | ||
| /// | ||
| /// See [`SessionState`] for more control of when the rule is applied. | ||
| pub fn add_optimizer_rule( | ||
| &self, | ||
| optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>, | ||
| ) { | ||
| self.state.write().append_optimizer_rule(optimizer_rule); | ||
| } | ||
|
|
||
| /// Adds an analyzer rule to the end of the existing rules. | ||
| /// | ||
| /// See [`SessionState`] for more control of when the rule is applied. | ||
| pub fn add_analyzer_rule(&self, analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| self.state.write().add_analyzer_rule(analyzer_rule); | ||
| self | ||
| } | ||
|
|
||
| /// Registers an [`ObjectStore`] to be used with a specific URL prefix. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -402,6 +402,16 @@ impl SessionState { | |
| self | ||
| } | ||
|
|
||
| // the add_optimizer_rule takes an owned reference | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed the naming in SessionState is pretty inconsistent |
||
| // it should probably be renamed to `with_optimizer_rule` to follow builder style | ||
| // and `add_optimizer_rule` that takes &mut self added instead of this | ||
| pub(crate) fn append_optimizer_rule( | ||
| &mut self, | ||
| optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>, | ||
| ) { | ||
| self.optimizer.rules.push(optimizer_rule); | ||
| } | ||
|
|
||
| /// Add `physical_optimizer_rule` to the end of the list of | ||
| /// [`PhysicalOptimizerRule`]s used to rewrite queries. | ||
| pub fn add_physical_optimizer_rule( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is
analyzer_rule.rsa file in the repo?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be once I can get someone to approve #11089