From d7ae4fd388b8fe232f51ee1625166df43828f2e3 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Fri, 25 Oct 2024 03:41:50 +0000 Subject: [PATCH 1/3] Add clickbench parquet based queries to sql_planner benchmark. --- datafusion/core/benches/sql_planner.rs | 73 +++++++++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index 64d2760e9d97..fefc363bb480 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -15,22 +15,28 @@ // specific language governing permissions and limitations // under the License. +extern crate arrow; #[macro_use] extern crate criterion; -extern crate arrow; extern crate datafusion; mod data_utils; + use crate::criterion::Criterion; use arrow::datatypes::{DataType, Field, Fields, Schema}; use datafusion::datasource::MemTable; use datafusion::execution::context::SessionContext; +use itertools::Itertools; +use std::fs::File; +use std::io::{BufRead, BufReader}; use std::sync::Arc; use test_utils::tpcds::tpcds_schemas; use test_utils::tpch::tpch_schemas; use test_utils::TableDef; use tokio::runtime::Runtime; +const CLICKBENCH_DATA_PATH: &str = "../../benchmarks/data/hits_partitioned/"; + /// Create a logical plan from the specified sql fn logical_plan(ctx: &SessionContext, sql: &str) { let rt = Runtime::new().unwrap(); @@ -91,7 +97,29 @@ fn register_defs(ctx: SessionContext, defs: Vec) -> SessionContext { ctx } +fn register_clickbench_hits_table() -> SessionContext { + let ctx = SessionContext::new(); + let rt = Runtime::new().unwrap(); + + // use an external table for clickbench benchmarks + // let data_path = "./tests/data/clickbench_hits_10.parquet"; + + let sql = + format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{CLICKBENCH_DATA_PATH}'"); + + rt.block_on(ctx.sql(&sql)).unwrap(); + + let count = rt.block_on(async { ctx.table("hits").await.unwrap().count().await.unwrap() }); + assert!(count > 0); + ctx +} + fn criterion_benchmark(c: &mut Criterion) { + // verify that we can load the clickbench data prior to running the benchmark + File::open(CLICKBENCH_DATA_PATH).expect("benchmarks/data/hits_partitioned/ could not be \ + loaded. Please run 'benchmarks/bench.sh data clickbench_partitioned' prior to running \ + this benchmark"); + let ctx = create_context(); // Test simplest @@ -299,6 +327,49 @@ fn criterion_benchmark(c: &mut Criterion) { } }) }); + + // -- clickbench -- + + let queries_file = + File::open("../../benchmarks/queries/clickbench/queries.sql").unwrap(); + let extended_file = + File::open("../../benchmarks/queries/clickbench/extended.sql").unwrap(); + + let clickbench_queries: Vec = BufReader::new(queries_file) + .lines() + .chain(BufReader::new(extended_file).lines()) + .map(|l| l.expect("Could not parse line")) + .collect_vec(); + + let clickbench_ctx = register_clickbench_hits_table(); + + for (i, sql) in clickbench_queries.iter().enumerate() { + c.bench_function(&format!("logical_plan_clickbench_q{}", i + 1), |b| { + b.iter(|| logical_plan(&clickbench_ctx, sql)) + }); + } + + for (i, sql) in clickbench_queries.iter().enumerate() { + c.bench_function(&format!("physical_plan_clickbench_q{}", i + 1), |b| { + b.iter(|| physical_plan(&clickbench_ctx, sql)) + }); + } + + c.bench_function("logical_plan_clickbench_all", |b| { + b.iter(|| { + for sql in &clickbench_queries { + logical_plan(&clickbench_ctx, sql) + } + }) + }); + + c.bench_function("physical_plan_clickbench_all", |b| { + b.iter(|| { + for sql in &clickbench_queries { + physical_plan(&clickbench_ctx, sql) + } + }) + }); } criterion_group!(benches, criterion_benchmark); From 74425d30aa9d8fa501b6c0d7da96e3532dee4311 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Fri, 25 Oct 2024 12:58:56 +0000 Subject: [PATCH 2/3] Cargo fmt. --- datafusion/core/benches/sql_planner.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index fefc363bb480..f4d9275c9eba 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -104,12 +104,14 @@ fn register_clickbench_hits_table() -> SessionContext { // use an external table for clickbench benchmarks // let data_path = "./tests/data/clickbench_hits_10.parquet"; - let sql = - format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{CLICKBENCH_DATA_PATH}'"); + let sql = format!( + "CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{CLICKBENCH_DATA_PATH}'" + ); rt.block_on(ctx.sql(&sql)).unwrap(); - let count = rt.block_on(async { ctx.table("hits").await.unwrap().count().await.unwrap() }); + let count = + rt.block_on(async { ctx.table("hits").await.unwrap().count().await.unwrap() }); assert!(count > 0); ctx } From ad5a86ea14d2c45f1fb805f93b7998b80f358f8a Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Fri, 25 Oct 2024 17:08:55 +0000 Subject: [PATCH 3/3] Commented out most logical_plan tests & updated code to allow for running from either cargo or via target/release/deps/sql_planner-xyz --- datafusion/core/benches/sql_planner.rs | 99 ++++++++++++++++---------- 1 file changed, 60 insertions(+), 39 deletions(-) diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index f4d9275c9eba..06e49fc26126 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -29,13 +29,16 @@ use datafusion::execution::context::SessionContext; use itertools::Itertools; use std::fs::File; use std::io::{BufRead, BufReader}; +use std::path::PathBuf; use std::sync::Arc; use test_utils::tpcds::tpcds_schemas; use test_utils::tpch::tpch_schemas; use test_utils::TableDef; use tokio::runtime::Runtime; -const CLICKBENCH_DATA_PATH: &str = "../../benchmarks/data/hits_partitioned/"; +const BENCHMARKS_PATH_1: &str = "../../benchmarks/"; +const BENCHMARKS_PATH_2: &str = "./benchmarks/"; +const CLICKBENCH_DATA_PATH: &str = "data/hits_partitioned/"; /// Create a logical plan from the specified sql fn logical_plan(ctx: &SessionContext, sql: &str) { @@ -102,11 +105,14 @@ fn register_clickbench_hits_table() -> SessionContext { let rt = Runtime::new().unwrap(); // use an external table for clickbench benchmarks - // let data_path = "./tests/data/clickbench_hits_10.parquet"; + let path = + if PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists() { + format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}") + } else { + format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}") + }; - let sql = format!( - "CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{CLICKBENCH_DATA_PATH}'" - ); + let sql = format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{path}'"); rt.block_on(ctx.sql(&sql)).unwrap(); @@ -118,9 +124,12 @@ fn register_clickbench_hits_table() -> SessionContext { fn criterion_benchmark(c: &mut Criterion) { // verify that we can load the clickbench data prior to running the benchmark - File::open(CLICKBENCH_DATA_PATH).expect("benchmarks/data/hits_partitioned/ could not be \ - loaded. Please run 'benchmarks/bench.sh data clickbench_partitioned' prior to running \ - this benchmark"); + if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists() + && !PathBuf::from(format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}")).exists() + { + panic!("benchmarks/data/hits_partitioned/ could not be loaded. Please run \ + 'benchmarks/bench.sh data clickbench_partitioned' prior to running this benchmark") + } let ctx = create_context(); @@ -265,9 +274,15 @@ fn criterion_benchmark(c: &mut Criterion) { "q16", "q17", "q18", "q19", "q20", "q21", "q22", ]; + let benchmarks_path = if PathBuf::from(BENCHMARKS_PATH_1).exists() { + BENCHMARKS_PATH_1 + } else { + BENCHMARKS_PATH_2 + }; + for q in tpch_queries { let sql = - std::fs::read_to_string(format!("../../benchmarks/queries/{q}.sql")).unwrap(); + std::fs::read_to_string(format!("{benchmarks_path}queries/{q}.sql")).unwrap(); c.bench_function(&format!("physical_plan_tpch_{}", q), |b| { b.iter(|| physical_plan(&tpch_ctx, &sql)) }); @@ -276,7 +291,7 @@ fn criterion_benchmark(c: &mut Criterion) { let all_tpch_sql_queries = tpch_queries .iter() .map(|q| { - std::fs::read_to_string(format!("../../benchmarks/queries/{q}.sql")).unwrap() + std::fs::read_to_string(format!("{benchmarks_path}queries/{q}.sql")).unwrap() }) .collect::>(); @@ -288,13 +303,13 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - c.bench_function("logical_plan_tpch_all", |b| { - b.iter(|| { - for sql in &all_tpch_sql_queries { - logical_plan(&tpch_ctx, sql) - } - }) - }); + // c.bench_function("logical_plan_tpch_all", |b| { + // b.iter(|| { + // for sql in &all_tpch_sql_queries { + // logical_plan(&tpch_ctx, sql) + // } + // }) + // }); // --- TPC-DS --- @@ -303,9 +318,15 @@ fn criterion_benchmark(c: &mut Criterion) { // 41: check_analyzed_plan: Correlated column is not allowed in predicate let ignored = [41]; + let tests_path = if PathBuf::from("./tests/").exists() { + "./tests/" + } else { + "datafusion/core/tests/" + }; + let raw_tpcds_sql_queries = (1..100) .filter(|q| !ignored.contains(q)) - .map(|q| std::fs::read_to_string(format!("./tests/tpc-ds/{q}.sql")).unwrap()) + .map(|q| std::fs::read_to_string(format!("{tests_path}tpc-ds/{q}.sql")).unwrap()) .collect::>(); // some queries have multiple statements @@ -322,20 +343,20 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - c.bench_function("logical_plan_tpcds_all", |b| { - b.iter(|| { - for sql in &all_tpcds_sql_queries { - logical_plan(&tpcds_ctx, sql) - } - }) - }); + // c.bench_function("logical_plan_tpcds_all", |b| { + // b.iter(|| { + // for sql in &all_tpcds_sql_queries { + // logical_plan(&tpcds_ctx, sql) + // } + // }) + // }); // -- clickbench -- let queries_file = - File::open("../../benchmarks/queries/clickbench/queries.sql").unwrap(); + File::open(format!("{benchmarks_path}queries/clickbench/queries.sql")).unwrap(); let extended_file = - File::open("../../benchmarks/queries/clickbench/extended.sql").unwrap(); + File::open(format!("{benchmarks_path}queries/clickbench/extended.sql")).unwrap(); let clickbench_queries: Vec = BufReader::new(queries_file) .lines() @@ -345,11 +366,11 @@ fn criterion_benchmark(c: &mut Criterion) { let clickbench_ctx = register_clickbench_hits_table(); - for (i, sql) in clickbench_queries.iter().enumerate() { - c.bench_function(&format!("logical_plan_clickbench_q{}", i + 1), |b| { - b.iter(|| logical_plan(&clickbench_ctx, sql)) - }); - } + // for (i, sql) in clickbench_queries.iter().enumerate() { + // c.bench_function(&format!("logical_plan_clickbench_q{}", i + 1), |b| { + // b.iter(|| logical_plan(&clickbench_ctx, sql)) + // }); + // } for (i, sql) in clickbench_queries.iter().enumerate() { c.bench_function(&format!("physical_plan_clickbench_q{}", i + 1), |b| { @@ -357,13 +378,13 @@ fn criterion_benchmark(c: &mut Criterion) { }); } - c.bench_function("logical_plan_clickbench_all", |b| { - b.iter(|| { - for sql in &clickbench_queries { - logical_plan(&clickbench_ctx, sql) - } - }) - }); + // c.bench_function("logical_plan_clickbench_all", |b| { + // b.iter(|| { + // for sql in &clickbench_queries { + // logical_plan(&clickbench_ctx, sql) + // } + // }) + // }); c.bench_function("physical_plan_clickbench_all", |b| { b.iter(|| {