Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 107 additions & 14 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,31 @@
// specific language governing permissions and limitations
// under the License.

extern crate arrow;
#[macro_use]
extern crate criterion;
extern crate arrow;
extern crate datafusion;

mod data_utils;

use crate::criterion::Criterion;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use itertools::Itertools;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::sync::Arc;
use test_utils::tpcds::tpcds_schemas;
use test_utils::tpch::tpch_schemas;
use test_utils::TableDef;
use tokio::runtime::Runtime;

const BENCHMARKS_PATH_1: &str = "../../benchmarks/";
const BENCHMARKS_PATH_2: &str = "./benchmarks/";
const CLICKBENCH_DATA_PATH: &str = "data/hits_partitioned/";

/// Create a logical plan from the specified sql
fn logical_plan(ctx: &SessionContext, sql: &str) {
let rt = Runtime::new().unwrap();
Expand Down Expand Up @@ -91,7 +100,37 @@ fn register_defs(ctx: SessionContext, defs: Vec<TableDef>) -> SessionContext {
ctx
}

fn register_clickbench_hits_table() -> SessionContext {
let ctx = SessionContext::new();
let rt = Runtime::new().unwrap();

// use an external table for clickbench benchmarks
let path =
if PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists() {
format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")
} else {
format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}")
};

let sql = format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{path}'");

rt.block_on(ctx.sql(&sql)).unwrap();

let count =
rt.block_on(async { ctx.table("hits").await.unwrap().count().await.unwrap() });
assert!(count > 0);
ctx
}

fn criterion_benchmark(c: &mut Criterion) {
// verify that we can load the clickbench data prior to running the benchmark
if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists()
&& !PathBuf::from(format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}")).exists()
{
panic!("benchmarks/data/hits_partitioned/ could not be loaded. Please run \
'benchmarks/bench.sh data clickbench_partitioned' prior to running this benchmark")
}

let ctx = create_context();

// Test simplest
Expand Down Expand Up @@ -235,9 +274,15 @@ fn criterion_benchmark(c: &mut Criterion) {
"q16", "q17", "q18", "q19", "q20", "q21", "q22",
];

let benchmarks_path = if PathBuf::from(BENCHMARKS_PATH_1).exists() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

BENCHMARKS_PATH_1
} else {
BENCHMARKS_PATH_2
};

for q in tpch_queries {
let sql =
std::fs::read_to_string(format!("../../benchmarks/queries/{q}.sql")).unwrap();
std::fs::read_to_string(format!("{benchmarks_path}queries/{q}.sql")).unwrap();
c.bench_function(&format!("physical_plan_tpch_{}", q), |b| {
b.iter(|| physical_plan(&tpch_ctx, &sql))
});
Expand All @@ -246,7 +291,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let all_tpch_sql_queries = tpch_queries
.iter()
.map(|q| {
std::fs::read_to_string(format!("../../benchmarks/queries/{q}.sql")).unwrap()
std::fs::read_to_string(format!("{benchmarks_path}queries/{q}.sql")).unwrap()
})
.collect::<Vec<_>>();

Expand All @@ -258,20 +303,25 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

c.bench_function("logical_plan_tpch_all", |b| {
b.iter(|| {
for sql in &all_tpch_sql_queries {
logical_plan(&tpch_ctx, sql)
}
})
});
// c.bench_function("logical_plan_tpch_all", |b| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could just delete it entirely?

// b.iter(|| {
// for sql in &all_tpch_sql_queries {
// logical_plan(&tpch_ctx, sql)
// }
// })
// });

// --- TPC-DS ---

let tpcds_ctx = register_defs(SessionContext::new(), tpcds_schemas());
let tests_path = if PathBuf::from("./tests/").exists() {
"./tests/"
} else {
"datafusion/core/tests/"
};

let raw_tpcds_sql_queries = (1..100)
.map(|q| std::fs::read_to_string(format!("./tests/tpc-ds/{q}.sql")).unwrap())
.map(|q| std::fs::read_to_string(format!("{tests_path}tpc-ds/{q}.sql")).unwrap())
.collect::<Vec<_>>();

// some queries have multiple statements
Expand All @@ -288,10 +338,53 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

c.bench_function("logical_plan_tpcds_all", |b| {
// c.bench_function("logical_plan_tpcds_all", |b| {
// b.iter(|| {
// for sql in &all_tpcds_sql_queries {
// logical_plan(&tpcds_ctx, sql)
// }
// })
// });

// -- clickbench --

let queries_file =
File::open(format!("{benchmarks_path}queries/clickbench/queries.sql")).unwrap();
let extended_file =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused at first what click bench Q48 was (as there are only 42 queries) -- but this now makes sense.

physical_plan_clickbench_q48
                        time:   [2.6437 ms 2.6674 ms 2.6943 ms]
Found 11 outliers among 100 measurements (11.00%)
  3 (3.00%) high mild
  8 (8.00%) high severe

It would probably be less confusing if this was called physical_plan_clickbench_extended_q5 or whatever to align with the naming of suites

File::open(format!("{benchmarks_path}queries/clickbench/extended.sql")).unwrap();

let clickbench_queries: Vec<String> = BufReader::new(queries_file)
.lines()
.chain(BufReader::new(extended_file).lines())
.map(|l| l.expect("Could not parse line"))
.collect_vec();

let clickbench_ctx = register_clickbench_hits_table();

// for (i, sql) in clickbench_queries.iter().enumerate() {
// c.bench_function(&format!("logical_plan_clickbench_q{}", i + 1), |b| {
// b.iter(|| logical_plan(&clickbench_ctx, sql))
// });
// }

for (i, sql) in clickbench_queries.iter().enumerate() {
c.bench_function(&format!("physical_plan_clickbench_q{}", i + 1), |b| {
b.iter(|| physical_plan(&clickbench_ctx, sql))
});
}

// c.bench_function("logical_plan_clickbench_all", |b| {
// b.iter(|| {
// for sql in &clickbench_queries {
// logical_plan(&clickbench_ctx, sql)
// }
// })
// });

c.bench_function("physical_plan_clickbench_all", |b| {
b.iter(|| {
for sql in &all_tpcds_sql_queries {
logical_plan(&tpcds_ctx, sql)
for sql in &clickbench_queries {
physical_plan(&clickbench_ctx, sql)
}
})
});
Expand Down