-
Couldn't load subscription status.
- Fork 1.7k
feat: Add method to add analyzer rules to SessionContext #10849
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6b63dcc
9a1eae8
e134719
62c5ab3
925f2c6
c81b4cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -384,9 +384,9 @@ impl SessionState { | |
| /// Add `analyzer_rule` to the end of the list of | ||
| /// [`AnalyzerRule`]s used to rewrite queries. | ||
| pub fn add_analyzer_rule( | ||
| mut self, | ||
| &mut self, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think technically this is an API change as now the api takes a However, I think the change is good as now What do you think about adding an api to make things consistent? (we could do this as a separate PR) pub fn with_analyzer_rule(
mut self,
analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
) -> Self {
..
}There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also update Also, do we need to add |
||
| analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>, | ||
| ) -> Self { | ||
| ) -> &Self { | ||
| self.analyzer.rules.push(analyzer_rule); | ||
| self | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -92,8 +92,12 @@ use datafusion::{ | |
| }; | ||
|
|
||
| use async_trait::async_trait; | ||
| use datafusion_common::tree_node::Transformed; | ||
| use datafusion_common::config::ConfigOptions; | ||
| use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; | ||
| use datafusion_common::ScalarValue; | ||
| use datafusion_expr::Projection; | ||
| use datafusion_optimizer::optimizer::ApplyOrder; | ||
| use datafusion_optimizer::AnalyzerRule; | ||
| use futures::{Stream, StreamExt}; | ||
|
|
||
| /// Execute the specified sql and return the resulting record batches | ||
|
|
@@ -132,11 +136,13 @@ async fn setup_table_without_schemas(mut ctx: SessionContext) -> Result<SessionC | |
| Ok(ctx) | ||
| } | ||
|
|
||
| const QUERY1: &str = "SELECT * FROM sales limit 3"; | ||
|
|
||
| const QUERY: &str = | ||
| "SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3"; | ||
|
|
||
| const QUERY1: &str = "SELECT * FROM sales limit 3"; | ||
|
|
||
| const QUERY2: &str = "SELECT 42, arrow_typeof(42)"; | ||
|
|
||
| // Run the query using the specified execution context and compare it | ||
| // to the known result | ||
| async fn run_and_compare_query(mut ctx: SessionContext, description: &str) -> Result<()> { | ||
|
|
@@ -164,6 +170,34 @@ async fn run_and_compare_query(mut ctx: SessionContext, description: &str) -> Re | |
| Ok(()) | ||
| } | ||
|
|
||
| // Run the query using the specified execution context and compare it | ||
| // to the known result | ||
| async fn run_and_compare_query_with_analyzer_rule( | ||
| mut ctx: SessionContext, | ||
| description: &str, | ||
| ) -> Result<()> { | ||
| let expected = vec![ | ||
| "+------------+--------------------------+", | ||
| "| UInt64(42) | arrow_typeof(UInt64(42)) |", | ||
| "+------------+--------------------------+", | ||
| "| 42 | UInt64 |", | ||
| "+------------+--------------------------+", | ||
| ]; | ||
|
|
||
| let s = exec_sql(&mut ctx, QUERY2).await?; | ||
| let actual = s.lines().collect::<Vec<_>>(); | ||
|
|
||
| assert_eq!( | ||
| expected, | ||
| actual, | ||
| "output mismatch for {}. Expectedn\n{}Actual:\n{}", | ||
| description, | ||
| expected.join("\n"), | ||
| s | ||
| ); | ||
| Ok(()) | ||
| } | ||
|
|
||
| // Run the query using the specified execution context and compare it | ||
| // to the known result | ||
| async fn run_and_compare_query_with_auto_schemas( | ||
|
|
@@ -208,6 +242,13 @@ async fn normal_query() -> Result<()> { | |
| run_and_compare_query(ctx, "Default context").await | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| // Run the query using default planners, optimizer and custom analyzer rule | ||
| async fn normal_query_with_analyzer() -> Result<()> { | ||
| let ctx = SessionContext::new().add_analyzer_rule(Arc::new(MyAnalyzerRule {})); | ||
| run_and_compare_query_with_analyzer_rule(ctx, "MyAnalyzerRule").await | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| // Run the query using topk optimization | ||
| async fn topk_query() -> Result<()> { | ||
|
|
@@ -248,9 +289,10 @@ async fn topk_plan() -> Result<()> { | |
| fn make_topk_context() -> SessionContext { | ||
| let config = SessionConfig::new().with_target_partitions(48); | ||
| let runtime = Arc::new(RuntimeEnv::default()); | ||
| let state = SessionState::new_with_config_rt(config, runtime) | ||
| let mut state = SessionState::new_with_config_rt(config, runtime) | ||
| .with_query_planner(Arc::new(TopKQueryPlanner {})) | ||
| .add_optimizer_rule(Arc::new(TopKOptimizerRule {})); | ||
| state.add_analyzer_rule(Arc::new(MyAnalyzerRule {})); | ||
| SessionContext::new_with_state(state) | ||
| } | ||
|
|
||
|
|
@@ -633,3 +675,52 @@ impl RecordBatchStream for TopKReader { | |
| self.input.schema() | ||
| } | ||
| } | ||
|
|
||
| struct MyAnalyzerRule {} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to add test that exercises these APIs? For example, perhaps a test that does something like select 42, arrow_typeof(42)Which I think this code will print out 42 and |
||
|
|
||
| impl AnalyzerRule for MyAnalyzerRule { | ||
| fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> { | ||
| Self::analyze_plan(plan) | ||
| } | ||
|
|
||
| fn name(&self) -> &str { | ||
| "my_analyzer_rule" | ||
| } | ||
| } | ||
|
|
||
| impl MyAnalyzerRule { | ||
| fn analyze_plan(plan: LogicalPlan) -> Result<LogicalPlan> { | ||
| plan.transform(|plan| { | ||
| Ok(match plan { | ||
| LogicalPlan::Projection(projection) => { | ||
| let expr = Self::analyze_expr(projection.expr.clone())?; | ||
| Transformed::yes(LogicalPlan::Projection(Projection::try_new( | ||
| expr, | ||
| projection.input, | ||
| )?)) | ||
| } | ||
| _ => Transformed::no(plan), | ||
| }) | ||
| }) | ||
| .data() | ||
| } | ||
|
|
||
| fn analyze_expr(expr: Vec<Expr>) -> Result<Vec<Expr>> { | ||
| expr.into_iter() | ||
| .map(|e| { | ||
| e.transform(|e| { | ||
| Ok(match e { | ||
| Expr::Literal(ScalarValue::Int64(i)) => { | ||
| // transform to UInt64 | ||
| Transformed::yes(Expr::Literal(ScalarValue::UInt64( | ||
| i.map(|i| i as u64), | ||
| ))) | ||
| } | ||
| _ => Transformed::no(e), | ||
| }) | ||
| }) | ||
| .data() | ||
| }) | ||
| .collect() | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to have an examples, or doc test for the this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- we are tracking adding an example for how to use custom analyzer rules in #10855, so perhaps we can add the example as part of that ticket (i think @goldmedal said he may have some time to work on that eventually)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I have a WIP PR to improve these examples -- I hope to get it up for review sometime this weekend