-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Add clickbench parquet based queries to sql_planner benchmark #13103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d7ae4fd
74425d3
ad5a86e
f4c6bb7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,22 +15,31 @@ | |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| extern crate arrow; | ||
| #[macro_use] | ||
| extern crate criterion; | ||
| extern crate arrow; | ||
| extern crate datafusion; | ||
|
|
||
| mod data_utils; | ||
|
|
||
| use crate::criterion::Criterion; | ||
| use arrow::datatypes::{DataType, Field, Fields, Schema}; | ||
| use datafusion::datasource::MemTable; | ||
| use datafusion::execution::context::SessionContext; | ||
| use itertools::Itertools; | ||
| use std::fs::File; | ||
| use std::io::{BufRead, BufReader}; | ||
| use std::path::PathBuf; | ||
| use std::sync::Arc; | ||
| use test_utils::tpcds::tpcds_schemas; | ||
| use test_utils::tpch::tpch_schemas; | ||
| use test_utils::TableDef; | ||
| use tokio::runtime::Runtime; | ||
|
|
||
| const BENCHMARKS_PATH_1: &str = "../../benchmarks/"; | ||
| const BENCHMARKS_PATH_2: &str = "./benchmarks/"; | ||
| const CLICKBENCH_DATA_PATH: &str = "data/hits_partitioned/"; | ||
|
|
||
| /// Create a logical plan from the specified sql | ||
| fn logical_plan(ctx: &SessionContext, sql: &str) { | ||
| let rt = Runtime::new().unwrap(); | ||
|
|
@@ -91,7 +100,37 @@ fn register_defs(ctx: SessionContext, defs: Vec<TableDef>) -> SessionContext { | |
| ctx | ||
| } | ||
|
|
||
| fn register_clickbench_hits_table() -> SessionContext { | ||
| let ctx = SessionContext::new(); | ||
| let rt = Runtime::new().unwrap(); | ||
|
|
||
| // use an external table for clickbench benchmarks | ||
| let path = | ||
| if PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists() { | ||
| format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}") | ||
| } else { | ||
| format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}") | ||
| }; | ||
|
|
||
| let sql = format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{path}'"); | ||
|
|
||
| rt.block_on(ctx.sql(&sql)).unwrap(); | ||
|
|
||
| let count = | ||
| rt.block_on(async { ctx.table("hits").await.unwrap().count().await.unwrap() }); | ||
| assert!(count > 0); | ||
| ctx | ||
| } | ||
|
|
||
| fn criterion_benchmark(c: &mut Criterion) { | ||
| // verify that we can load the clickbench data prior to running the benchmark | ||
| if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists() | ||
| && !PathBuf::from(format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}")).exists() | ||
| { | ||
| panic!("benchmarks/data/hits_partitioned/ could not be loaded. Please run \ | ||
| 'benchmarks/bench.sh data clickbench_partitioned' prior to running this benchmark") | ||
| } | ||
|
|
||
| let ctx = create_context(); | ||
|
|
||
| // Test simplest | ||
|
|
@@ -235,9 +274,15 @@ fn criterion_benchmark(c: &mut Criterion) { | |
| "q16", "q17", "q18", "q19", "q20", "q21", "q22", | ||
| ]; | ||
|
|
||
| let benchmarks_path = if PathBuf::from(BENCHMARKS_PATH_1).exists() { | ||
| BENCHMARKS_PATH_1 | ||
| } else { | ||
| BENCHMARKS_PATH_2 | ||
| }; | ||
|
|
||
| for q in tpch_queries { | ||
| let sql = | ||
| std::fs::read_to_string(format!("../../benchmarks/queries/{q}.sql")).unwrap(); | ||
| std::fs::read_to_string(format!("{benchmarks_path}queries/{q}.sql")).unwrap(); | ||
| c.bench_function(&format!("physical_plan_tpch_{}", q), |b| { | ||
| b.iter(|| physical_plan(&tpch_ctx, &sql)) | ||
| }); | ||
|
|
@@ -246,7 +291,7 @@ fn criterion_benchmark(c: &mut Criterion) { | |
| let all_tpch_sql_queries = tpch_queries | ||
| .iter() | ||
| .map(|q| { | ||
| std::fs::read_to_string(format!("../../benchmarks/queries/{q}.sql")).unwrap() | ||
| std::fs::read_to_string(format!("{benchmarks_path}queries/{q}.sql")).unwrap() | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
|
|
@@ -258,20 +303,25 @@ fn criterion_benchmark(c: &mut Criterion) { | |
| }) | ||
| }); | ||
|
|
||
| c.bench_function("logical_plan_tpch_all", |b| { | ||
| b.iter(|| { | ||
| for sql in &all_tpch_sql_queries { | ||
| logical_plan(&tpch_ctx, sql) | ||
| } | ||
| }) | ||
| }); | ||
| // c.bench_function("logical_plan_tpch_all", |b| { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we could just delete it entirely? |
||
| // b.iter(|| { | ||
| // for sql in &all_tpch_sql_queries { | ||
| // logical_plan(&tpch_ctx, sql) | ||
| // } | ||
| // }) | ||
| // }); | ||
|
|
||
| // --- TPC-DS --- | ||
|
|
||
| let tpcds_ctx = register_defs(SessionContext::new(), tpcds_schemas()); | ||
| let tests_path = if PathBuf::from("./tests/").exists() { | ||
| "./tests/" | ||
| } else { | ||
| "datafusion/core/tests/" | ||
| }; | ||
|
|
||
| let raw_tpcds_sql_queries = (1..100) | ||
| .map(|q| std::fs::read_to_string(format!("./tests/tpc-ds/{q}.sql")).unwrap()) | ||
| .map(|q| std::fs::read_to_string(format!("{tests_path}tpc-ds/{q}.sql")).unwrap()) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| // some queries have multiple statements | ||
|
|
@@ -288,10 +338,53 @@ fn criterion_benchmark(c: &mut Criterion) { | |
| }) | ||
| }); | ||
|
|
||
| c.bench_function("logical_plan_tpcds_all", |b| { | ||
| // c.bench_function("logical_plan_tpcds_all", |b| { | ||
| // b.iter(|| { | ||
| // for sql in &all_tpcds_sql_queries { | ||
| // logical_plan(&tpcds_ctx, sql) | ||
| // } | ||
| // }) | ||
| // }); | ||
|
|
||
| // -- clickbench -- | ||
|
|
||
| let queries_file = | ||
| File::open(format!("{benchmarks_path}queries/clickbench/queries.sql")).unwrap(); | ||
| let extended_file = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was confused at first what click bench Q48 was (as there are only 42 queries) -- but this now makes sense. It would probably be less confusing if this was called |
||
| File::open(format!("{benchmarks_path}queries/clickbench/extended.sql")).unwrap(); | ||
|
|
||
| let clickbench_queries: Vec<String> = BufReader::new(queries_file) | ||
| .lines() | ||
| .chain(BufReader::new(extended_file).lines()) | ||
| .map(|l| l.expect("Could not parse line")) | ||
| .collect_vec(); | ||
|
|
||
| let clickbench_ctx = register_clickbench_hits_table(); | ||
|
|
||
| // for (i, sql) in clickbench_queries.iter().enumerate() { | ||
| // c.bench_function(&format!("logical_plan_clickbench_q{}", i + 1), |b| { | ||
| // b.iter(|| logical_plan(&clickbench_ctx, sql)) | ||
| // }); | ||
| // } | ||
|
|
||
| for (i, sql) in clickbench_queries.iter().enumerate() { | ||
| c.bench_function(&format!("physical_plan_clickbench_q{}", i + 1), |b| { | ||
| b.iter(|| physical_plan(&clickbench_ctx, sql)) | ||
| }); | ||
| } | ||
|
|
||
| // c.bench_function("logical_plan_clickbench_all", |b| { | ||
| // b.iter(|| { | ||
| // for sql in &clickbench_queries { | ||
| // logical_plan(&clickbench_ctx, sql) | ||
| // } | ||
| // }) | ||
| // }); | ||
|
|
||
| c.bench_function("physical_plan_clickbench_all", |b| { | ||
| b.iter(|| { | ||
| for sql in &all_tpcds_sql_queries { | ||
| logical_plan(&tpcds_ctx, sql) | ||
| for sql in &clickbench_queries { | ||
| physical_plan(&clickbench_ctx, sql) | ||
| } | ||
| }) | ||
| }); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️