diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
index d74144b0abce..2615abfd3c0d 100644
--- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
@@ -22,89 +22,100 @@ use arrow::{
compute::SortOptions,
record_batch::RecordBatch,
};
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
+use arrow_array::{Float64Array, StringArray};
+use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion::{
+ datasource::MemTable,
+ execution::runtime_env::{RuntimeConfig, RuntimeEnv},
+};
+use datafusion_common::{
+ cast::{as_float64_array, as_string_array},
+ TableReference,
+};
use datafusion_execution::memory_pool::GreedyMemoryPool;
-use rand::Rng;
+use datafusion_physical_expr::expressions::col;
+use rand::{rngs::StdRng, Rng, SeedableRng};
use std::sync::Arc;
-use test_utils::{batches_to_vec, partitions_to_sorted_vec};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch};
+const KB: usize = 1 << 10;
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_1k_mem() {
- SortTest::new()
- .with_int32_batches(5)
- .with_pool_size(10240)
- .with_should_spill(false)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(20000)
- .with_pool_size(10240)
- .with_should_spill(true)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(1000000)
- .with_pool_size(10240)
- .with_should_spill(true)
- .run()
- .await;
+ for (batch_size, should_spill) in [(5, false), (20000, true), (1000000, true)] {
+ SortTest::new()
+ .with_int32_batches(batch_size)
+ .with_pool_size(10 * KB)
+ .with_should_spill(should_spill)
+ .run()
+ .await;
+ }
}
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
- SortTest::new()
- .with_int32_batches(5)
- .with_pool_size(102400)
- .with_should_spill(false)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(20000)
- .with_pool_size(102400)
- .with_should_spill(false)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(1000000)
- .with_pool_size(102400)
- .with_should_spill(true)
- .run()
- .await;
+ for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, true)] {
+ SortTest::new()
+ .with_int32_batches(batch_size)
+ .with_pool_size(100 * KB)
+ .with_should_spill(should_spill)
+ .run()
+ .await;
+ }
}
#[tokio::test]
async fn test_sort_unlimited_mem() {
- SortTest::new()
- .with_int32_batches(5)
- .with_pool_size(usize::MAX)
- .with_should_spill(false)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(20000)
- .with_pool_size(usize::MAX)
- .with_should_spill(false)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(1000000)
- .with_pool_size(usize::MAX)
- .with_should_spill(false)
- .run()
- .await;
+ for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, false)] {
+ SortTest::new()
+ .with_int32_batches(batch_size)
+ .with_pool_size(usize::MAX)
+ .with_should_spill(should_spill)
+ .run()
+ .await;
+ }
+}
+
+#[tokio::test]
+async fn test_sort_topk() {
+ for size in [10, 100, 1000, 10000, 1000000] {
+ let mut topk_scenario = TopKScenario::new()
+ .with_limit(10)
+ .with_table_name("t")
+ .with_col_name("x");
+
+ // test topk with i32
+ let collected_i32 = SortTest::new()
+ .with_input(topk_scenario.batches(size, ColType::I32))
+ .run_with_limit(&topk_scenario)
+ .await;
+ let actual = batches_to_vec(&collected_i32);
+ let excepted_i32 = topk_scenario.excepted_i32();
+ assert_eq!(actual, excepted_i32);
+
+ // test topk with f64
+ let collected_f64 = SortTest::new()
+ .with_input(topk_scenario.batches(size, ColType::F64))
+ .run_with_limit(&topk_scenario)
+ .await;
+ let actual: Vec