From 1920bfea036ce72a896cb08eae4c52e662d178e4 Mon Sep 17 00:00:00 2001 From: reilly Date: Fri, 6 Oct 2023 21:37:39 +0800 Subject: [PATCH] [test] add fuzz test for topk Signed-off-by: reilly --- datafusion/core/tests/fuzz_cases/sort_fuzz.rs | 308 ++++++++++++++---- 1 file changed, 244 insertions(+), 64 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index d74144b0abce..2615abfd3c0d 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -22,89 +22,100 @@ use arrow::{ compute::SortOptions, record_batch::RecordBatch, }; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; -use datafusion::physical_plan::expressions::{col, PhysicalSortExpr}; +use arrow_array::{Float64Array, StringArray}; +use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion::{ + datasource::MemTable, + execution::runtime_env::{RuntimeConfig, RuntimeEnv}, +}; +use datafusion_common::{ + cast::{as_float64_array, as_string_array}, + TableReference, +}; use datafusion_execution::memory_pool::GreedyMemoryPool; -use rand::Rng; +use datafusion_physical_expr::expressions::col; +use rand::{rngs::StdRng, Rng, SeedableRng}; use std::sync::Arc; -use test_utils::{batches_to_vec, partitions_to_sorted_vec}; +use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch}; +const KB: usize = 1 << 10; #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn test_sort_1k_mem() { - SortTest::new() - .with_int32_batches(5) - .with_pool_size(10240) - .with_should_spill(false) - .run() - .await; - - SortTest::new() - .with_int32_batches(20000) - .with_pool_size(10240) - .with_should_spill(true) - .run() - .await; - - SortTest::new() - .with_int32_batches(1000000) - .with_pool_size(10240) - .with_should_spill(true) - .run() - .await; + for (batch_size, should_spill) in [(5, false), (20000, true), (1000000, true)] { + SortTest::new() + .with_int32_batches(batch_size) + .with_pool_size(10 * KB) + .with_should_spill(should_spill) + .run() + .await; + } } #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn test_sort_100k_mem() { - SortTest::new() - .with_int32_batches(5) - .with_pool_size(102400) - .with_should_spill(false) - .run() - .await; - - SortTest::new() - .with_int32_batches(20000) - .with_pool_size(102400) - .with_should_spill(false) - .run() - .await; - - SortTest::new() - .with_int32_batches(1000000) - .with_pool_size(102400) - .with_should_spill(true) - .run() - .await; + for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, true)] { + SortTest::new() + .with_int32_batches(batch_size) + .with_pool_size(100 * KB) + .with_should_spill(should_spill) + .run() + .await; + } } #[tokio::test] async fn test_sort_unlimited_mem() { - SortTest::new() - .with_int32_batches(5) - .with_pool_size(usize::MAX) - .with_should_spill(false) - .run() - .await; - - SortTest::new() - .with_int32_batches(20000) - .with_pool_size(usize::MAX) - .with_should_spill(false) - .run() - .await; - - SortTest::new() - .with_int32_batches(1000000) - .with_pool_size(usize::MAX) - .with_should_spill(false) - .run() - .await; + for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, false)] { + SortTest::new() + .with_int32_batches(batch_size) + .with_pool_size(usize::MAX) + .with_should_spill(should_spill) + .run() + .await; + } +} + +#[tokio::test] +async fn test_sort_topk() { + for size in [10, 100, 1000, 10000, 1000000] { + let mut topk_scenario = TopKScenario::new() + .with_limit(10) + .with_table_name("t") + .with_col_name("x"); + + // test topk with i32 + let collected_i32 = SortTest::new() + .with_input(topk_scenario.batches(size, ColType::I32)) + .run_with_limit(&topk_scenario) + .await; + let actual = batches_to_vec(&collected_i32); + let excepted_i32 = topk_scenario.excepted_i32(); + assert_eq!(actual, excepted_i32); + + // test topk with f64 + let collected_f64 = SortTest::new() + .with_input(topk_scenario.batches(size, ColType::F64)) + .run_with_limit(&topk_scenario) + .await; + let actual: Vec> = batches_to_f64_vec(&collected_f64); + let excepted_f64 = topk_scenario.excepted_f64(); + assert_eq!(actual, excepted_f64); + + // test topk with str + let collected_str = SortTest::new() + .with_input(topk_scenario.batches(size, ColType::Str)) + .run_with_limit(&topk_scenario) + .await; + let actual: Vec> = batches_to_str_vec(&collected_str); + let excepted_str = topk_scenario.excepted_str(); + assert_eq!(actual, excepted_str); + } } #[derive(Debug, Default)] @@ -121,6 +132,11 @@ impl SortTest { Default::default() } + fn with_input(mut self, batches: Vec>) -> Self { + self.input = batches.clone(); + self + } + /// Create batches of int32 values of rows fn with_int32_batches(mut self, rows: usize) -> Self { self.input = vec![make_staggered_i32_batches(rows)]; @@ -138,6 +154,44 @@ impl SortTest { self } + async fn run_with_limit<'a>( + &self, + topk_scenario: &TopKScenario<'a>, + ) -> Vec { + let input = self.input.clone(); + let schema = input + .iter() + .flat_map(|p| p.iter()) + .next() + .expect("at least one batch") + .schema(); + + let table = MemTable::try_new(schema, input.clone()).unwrap(); + + let ctx = SessionContext::new(); + + ctx.register_table( + TableReference::Bare { + table: topk_scenario.table_name.into(), + }, + Arc::new(table), + ) + .unwrap(); + + let df = ctx + .table(topk_scenario.table_name) + .await + .unwrap() + .sort(vec![ + datafusion_expr::col(topk_scenario.col_name).sort(true, true) + ]) + .unwrap() + .limit(0, Some(topk_scenario.limit)) + .unwrap(); + + df.collect().await.unwrap() + } + /// Sort the input using SortExec and ensure the results are /// correct according to `Vec::sort` both with and without spilling async fn run(&self) { @@ -208,6 +262,109 @@ impl SortTest { } } +enum ColType { + I32, + F64, + Str, +} + +struct TopKScenario<'a> { + limit: usize, + batches: Vec>, + table_name: &'a str, + col_name: &'a str, +} + +impl<'a> TopKScenario<'a> { + fn new() -> Self { + TopKScenario { + limit: 0, + batches: vec![], + table_name: "", + col_name: "", + } + } + + fn with_limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } + + fn with_table_name(mut self, table_name: &'a str) -> Self { + self.table_name = table_name; + self + } + + fn with_col_name(mut self, col_name: &'a str) -> Self { + self.col_name = col_name; + self + } + + fn batches(&mut self, len: usize, t: ColType) -> Vec> { + let batches = match t { + ColType::I32 => make_staggered_i32_batches(len), + ColType::F64 => make_staggered_f64_batches(len), + ColType::Str => make_staggered_str_batches(len), + }; + self.batches = vec![batches]; + self.batches.clone() + } + + fn excepted_i32(&self) -> Vec> { + let excepted = partitions_to_sorted_vec(&self.batches); + excepted[0..self.limit].into() + } + + fn excepted_f64(&self) -> Vec> { + let mut excepted: Vec> = self + .batches + .iter() + .flat_map(|batches| batches_to_f64_vec(batches).into_iter()) + .collect(); + excepted.sort_by(|a, b| a.partial_cmp(b).unwrap()); + excepted[0..self.limit].into() + } + + fn excepted_str(&self) -> Vec> { + let mut excepted: Vec> = self + .batches + .iter() + .flat_map(|batches| batches_to_str_vec(batches).into_iter()) + .collect(); + excepted.sort_unstable(); + excepted[0..self.limit].into() + } +} + +impl Default for TopKScenario<'_> { + fn default() -> Self { + Self::new() + } +} + +fn make_staggered_f64_batches(len: usize) -> Vec { + let mut rng = StdRng::seed_from_u64(100); + let remainder = RecordBatch::try_from_iter(vec![( + "x", + Arc::new(Float64Array::from_iter_values( + (0..len).map(|_| rng.gen_range(0.0..1000.7)), + )) as ArrayRef, + )]) + .unwrap(); + stagger_batch(remainder) +} + +fn make_staggered_str_batches(len: usize) -> Vec { + let remainder = RecordBatch::try_from_iter(vec![( + "x", + Arc::new(StringArray::from_iter_values( + (0..len).map(|_| get_random_string(6)), + )) as ArrayRef, + )]) + .unwrap(); + stagger_batch(remainder) +} + /// Return randomly sized record batches in a field named 'x' of type `Int32` /// with randomized i32 content fn make_staggered_i32_batches(len: usize) -> Vec { @@ -232,3 +389,26 @@ fn make_staggered_i32_batches(len: usize) -> Vec { } batches } + +/// Return random ASCII String with len +fn get_random_string(len: usize) -> String { + rand::thread_rng() + .sample_iter(rand::distributions::Alphanumeric) + .take(len) + .map(char::from) + .collect() +} + +fn batches_to_f64_vec(batches: &[RecordBatch]) -> Vec> { + batches + .iter() + .flat_map(|batch| as_float64_array(batch.column(0)).unwrap().iter()) + .collect() +} + +fn batches_to_str_vec(batches: &[RecordBatch]) -> Vec> { + batches + .iter() + .flat_map(|batch| as_string_array(batch.column(0)).unwrap().iter()) + .collect() +}