diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index f681ae57a3f1..fe7fb955033f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -235,6 +235,22 @@ config_namespace! { /// /// Defaults to the number of CPU cores on the system pub planning_concurrency: usize, default = num_cpus::get() + + /// Specifies the reserved memory for each spillable sort operation to + /// facilitate an in-memory merge. + /// + /// When a sort operation spills to disk, the in-memory data must be + /// sorted and merged before being written to a file. This setting reserves + /// a specific amount of memory for that in-memory sort/merge process. + /// + /// Note: This setting is irrelevant if the sort operation cannot spill + /// (i.e., if there's no `DiskManager` configured). + pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024 + + /// When sorting, below what size should data be concatenated + /// and sorted in a single RecordBatch rather than sorted in + /// batches and merged. + pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 } } diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 99b72a1b4064..3f83e186ea49 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -574,14 +574,21 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering: let sort_exprs = self.input.output_ordering().unwrap_or(&[]); - // Merge streams (while preserving ordering) coming from input partitions to this partition: + + // Merge streams (while preserving ordering) coming from + // input partitions to this partition: + let fetch = None; + let merge_reservation = + MemoryConsumer::new(format!("{}[Merge {partition}]", self.name())) + .register(context.memory_pool()); streaming_merge( input_streams, self.schema(), sort_exprs, BaselineMetrics::new(&self.metrics, partition), context.session_config().batch_size(), - None, + fetch, + merge_reservation, ) } else { Ok(Box::pin(RepartitionStream { diff --git a/datafusion/core/src/physical_plan/sorts/builder.rs b/datafusion/core/src/physical_plan/sorts/builder.rs index 1c5ec356eed9..3527d5738223 100644 --- a/datafusion/core/src/physical_plan/sorts/builder.rs +++ b/datafusion/core/src/physical_plan/sorts/builder.rs @@ -19,6 +19,7 @@ use arrow::compute::interleave; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; #[derive(Debug, Copy, Clone, Default)] struct BatchCursor { @@ -37,6 +38,9 @@ pub struct BatchBuilder { /// Maintain a list of [`RecordBatch`] and their corresponding stream batches: Vec<(usize, RecordBatch)>, + /// Accounts for memory used by buffered batches + reservation: MemoryReservation, + /// The current [`BatchCursor`] for each stream cursors: Vec, @@ -47,23 +51,31 @@ pub struct BatchBuilder { impl BatchBuilder { /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size` - pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self { + pub fn new( + schema: SchemaRef, + stream_count: usize, + batch_size: usize, + reservation: MemoryReservation, + ) -> Self { Self { schema, batches: Vec::with_capacity(stream_count * 2), cursors: vec![BatchCursor::default(); stream_count], indices: Vec::with_capacity(batch_size), + reservation, } } /// Append a new batch in `stream_idx` - pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) { + pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> { + self.reservation.try_grow(batch.get_array_memory_size())?; let batch_idx = self.batches.len(); self.batches.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { batch_idx, row_idx: 0, - } + }; + Ok(()) } /// Append the next row from `stream_idx` @@ -119,7 +131,7 @@ impl BatchBuilder { // We can therefore drop all but the last batch for each stream let mut batch_idx = 0; let mut retained = 0; - self.batches.retain(|(stream_idx, _)| { + self.batches.retain(|(stream_idx, batch)| { let stream_cursor = &mut self.cursors[*stream_idx]; let retain = stream_cursor.batch_idx == batch_idx; batch_idx += 1; @@ -127,6 +139,8 @@ impl BatchBuilder { if retain { stream_cursor.batch_idx = retained; retained += 1; + } else { + self.reservation.shrink(batch.get_array_memory_size()); } retain }); diff --git a/datafusion/core/src/physical_plan/sorts/cursor.rs b/datafusion/core/src/physical_plan/sorts/cursor.rs index a9e512213057..c0c791288644 100644 --- a/datafusion/core/src/physical_plan/sorts/cursor.rs +++ b/datafusion/core/src/physical_plan/sorts/cursor.rs @@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp; use arrow::row::{Row, Rows}; use arrow_array::types::ByteArrayType; use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray}; +use datafusion_execution::memory_pool::MemoryReservation; use std::cmp::Ordering; /// A [`Cursor`] for [`Rows`] @@ -29,6 +30,11 @@ pub struct RowCursor { num_rows: usize, rows: Rows, + + /// Tracks for the memory used by in the `Rows` of this + /// cursor. Freed on drop + #[allow(dead_code)] + reservation: MemoryReservation, } impl std::fmt::Debug for RowCursor { @@ -41,12 +47,22 @@ impl std::fmt::Debug for RowCursor { } impl RowCursor { - /// Create a new SortKeyCursor - pub fn new(rows: Rows) -> Self { + /// Create a new SortKeyCursor from `rows` and a `reservation` + /// that tracks its memory. + /// + /// Panic's if the reservation is not for exactly `rows.size()` + /// bytes + pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { + assert_eq!( + rows.size(), + reservation.size(), + "memory reservation mismatch" + ); Self { cur_row: 0, num_rows: rows.num_rows(), rows, + reservation, } } diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs b/datafusion/core/src/physical_plan/sorts/merge.rs index 736df7dbe81a..f8a1457dd62a 100644 --- a/datafusion/core/src/physical_plan/sorts/merge.rs +++ b/datafusion/core/src/physical_plan/sorts/merge.rs @@ -31,6 +31,7 @@ use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::*; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; use futures::Stream; use std::pin::Pin; use std::task::{ready, Context, Poll}; @@ -42,7 +43,7 @@ macro_rules! primitive_merge_helper { } macro_rules! merge_helper { - ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident) => {{ + ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{ let streams = FieldCursorStream::<$t>::new($sort, $streams); return Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), @@ -50,6 +51,7 @@ macro_rules! merge_helper { $tracking_metrics, $batch_size, $fetch, + $reservation, ))); }}; } @@ -63,28 +65,36 @@ pub fn streaming_merge( metrics: BaselineMetrics, batch_size: usize, fetch: Option, + reservation: MemoryReservation, ) -> Result { // Special case single column comparisons with optimized cursor implementations if expressions.len() == 1 { let sort = expressions[0].clone(); let data_type = sort.expr.data_type(schema.as_ref())?; downcast_primitive! { - data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch), - DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch) - DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch) - DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch) - DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch) + data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation), + DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) _ => {} } } - let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?; + let streams = RowCursorStream::try_new( + schema.as_ref(), + expressions, + streams, + reservation.new_empty(), + )?; + Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), schema, metrics, batch_size, fetch, + reservation, ))) } @@ -162,11 +172,12 @@ impl SortPreservingMergeStream { metrics: BaselineMetrics, batch_size: usize, fetch: Option, + reservation: MemoryReservation, ) -> Self { let stream_count = streams.partitions(); Self { - in_progress: BatchBuilder::new(schema, stream_count, batch_size), + in_progress: BatchBuilder::new(schema, stream_count, batch_size, reservation), streams, metrics, aborted: false, @@ -197,8 +208,7 @@ impl SortPreservingMergeStream { Some(Err(e)) => Poll::Ready(Err(e)), Some(Ok((cursor, batch))) => { self.cursors[idx] = Some(cursor); - self.in_progress.push_batch(idx, batch); - Poll::Ready(Ok(())) + Poll::Ready(self.in_progress.push_batch(idx, batch)) } } } diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index c7ae09bb2e34..411a425b51db 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -210,23 +210,37 @@ struct ExternalSorter { /// If Some, the maximum number of output rows that will be /// produced. fetch: Option, - /// Memory usage tracking + /// Reservation for in_mem_batches reservation: MemoryReservation, - /// The partition id that this Sort is handling (for identification) - partition_id: usize, - /// A handle to the runtime to get Disk spill files + /// Reservation for the merging of in-memory batches. If the sort + /// might spill, `sort_spill_reservation_bytes` will be + /// pre-reserved to ensure there is some space for this sort/merge. + merge_reservation: MemoryReservation, + /// A handle to the runtime to get spill files runtime: Arc, /// The target number of rows for output batches batch_size: usize, + /// How much memory to reserve for performing in-memory sort/merges + /// prior to spilling. + sort_spill_reservation_bytes: usize, + /// If the in size of buffered memory batches is below this size, + /// the data will be concated and sorted in place rather than + /// sort/merged. + sort_in_place_threshold_bytes: usize, } impl ExternalSorter { + // TOOD: make a builder or some other nicer API to avoid the + // clippy warning + #[allow(clippy::too_many_arguments)] pub fn new( partition_id: usize, schema: SchemaRef, expr: Vec, batch_size: usize, fetch: Option, + sort_spill_reservation_bytes: usize, + sort_in_place_threshold_bytes: usize, metrics: &ExecutionPlanMetricsSet, runtime: Arc, ) -> Self { @@ -235,6 +249,10 @@ impl ExternalSorter { .with_can_spill(true) .register(&runtime.memory_pool); + let merge_reservation = + MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) + .register(&runtime.memory_pool); + Self { schema, in_mem_batches: vec![], @@ -244,9 +262,11 @@ impl ExternalSorter { metrics, fetch, reservation, - partition_id, + merge_reservation, runtime, batch_size, + sort_spill_reservation_bytes, + sort_in_place_threshold_bytes, } } @@ -257,6 +277,7 @@ impl ExternalSorter { if input.num_rows() == 0 { return Ok(()); } + self.reserve_memory_for_merge()?; let size = batch_byte_size(&input); if self.reservation.try_grow(size).is_err() { @@ -318,12 +339,10 @@ impl ExternalSorter { self.metrics.baseline.clone(), self.batch_size, self.fetch, + self.reservation.new_empty(), ) } else if !self.in_mem_batches.is_empty() { - let result = self.in_mem_sort_stream(self.metrics.baseline.clone()); - // Report to the memory manager we are no longer using memory - self.reservation.free(); - result + self.in_mem_sort_stream(self.metrics.baseline.clone()) } else { Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) } @@ -374,6 +393,11 @@ impl ExternalSorter { return Ok(()); } + // Release the memory reserved for merge back to the pool so + // there is some left when `in_memo_sort_stream` requests an + // allocation. + self.merge_reservation.free(); + self.in_mem_batches = self .in_mem_sort_stream(self.metrics.baseline.intermediate())? .try_collect() @@ -385,7 +409,10 @@ impl ExternalSorter { .map(|x| x.get_array_memory_size()) .sum(); - self.reservation.resize(size); + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; + + self.reservation.try_resize(size)?; self.in_mem_batches_sorted = true; Ok(()) } @@ -455,26 +482,27 @@ impl ExternalSorter { assert_ne!(self.in_mem_batches.len(), 0); if self.in_mem_batches.len() == 1 { let batch = self.in_mem_batches.remove(0); - let stream = self.sort_batch_stream(batch, metrics)?; - self.in_mem_batches.clear(); - return Ok(stream); + let reservation = self.reservation.take(); + return self.sort_batch_stream(batch, metrics, reservation); } - // If less than 1MB of in-memory data, concatenate and sort in place - // - // This is a very rough heuristic and likely could be refined further - if self.reservation.size() < 1048576 { + // If less than sort_in_place_threshold_bytes, concatenate and sort in place + if self.reservation.size() < self.sort_in_place_threshold_bytes { // Concatenate memory batches together and sort let batch = concat_batches(&self.schema, &self.in_mem_batches)?; self.in_mem_batches.clear(); - return self.sort_batch_stream(batch, metrics); + self.reservation.try_resize(batch.get_array_memory_size())?; + let reservation = self.reservation.take(); + return self.sort_batch_stream(batch, metrics, reservation); } let streams = std::mem::take(&mut self.in_mem_batches) .into_iter() .map(|batch| { let metrics = self.metrics.baseline.intermediate(); - Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1)) + let reservation = self.reservation.split(batch.get_array_memory_size()); + let input = self.sort_batch_stream(batch, metrics, reservation)?; + Ok(spawn_buffered(input, 1)) }) .collect::>()?; @@ -485,35 +513,49 @@ impl ExternalSorter { metrics, self.batch_size, self.fetch, + self.merge_reservation.new_empty(), ) } - /// Sorts a single `RecordBatch` into a single stream + /// Sorts a single `RecordBatch` into a single stream. + /// + /// `reservation` accounts for the memory used by this batch and + /// is released when the sort is complete fn sort_batch_stream( &self, batch: RecordBatch, metrics: BaselineMetrics, + reservation: MemoryReservation, ) -> Result { + assert_eq!(batch.get_array_memory_size(), reservation.size()); let schema = batch.schema(); - let mut reservation = - MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id)) - .register(&self.runtime.memory_pool); - - // TODO: This should probably be try_grow (#5885) - reservation.resize(batch.get_array_memory_size()); - let fetch = self.fetch; let expressions = self.expr.clone(); let stream = futures::stream::once(futures::future::lazy(move |_| { let sorted = sort_batch(&batch, &expressions, fetch)?; metrics.record_output(sorted.num_rows()); drop(batch); - reservation.free(); + drop(reservation); Ok(sorted) })); Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } + + /// If this sort may spill, pre-allocates + /// `sort_spill_reservation_bytes` of memory to gurarantee memory + /// left for the in memory sort/merge. + fn reserve_memory_for_merge(&mut self) -> Result<()> { + // Reserve headroom for next merge sort + if self.runtime.disk_manager.tmp_files_enabled() { + let size = self.sort_spill_reservation_bytes; + if self.merge_reservation.size() != size { + self.merge_reservation.try_resize(size)?; + } + } + + Ok(()) + } } impl Debug for ExternalSorter { @@ -801,6 +843,8 @@ impl ExecutionPlan for SortExec { let mut input = self.input.execute(partition, context.clone())?; + let execution_options = &context.session_config().options().execution; + trace!("End SortExec's input.execute for partition: {}", partition); let mut sorter = ExternalSorter::new( @@ -809,6 +853,8 @@ impl ExecutionPlan for SortExec { self.expr.clone(), context.session_config().batch_size(), self.fetch, + execution_options.sort_spill_reservation_bytes, + execution_options.sort_in_place_threshold_bytes, &self.metrics_set, context.runtime_env(), ); @@ -914,9 +960,15 @@ mod tests { #[tokio::test] async fn test_sort_spill() -> Result<()> { // trigger spill there will be 4 batches with 5.5KB for each - let config = RuntimeConfig::new().with_memory_limit(12288, 1.0); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); + let session_config = SessionConfig::new(); + let sort_spill_reservation_bytes = session_config + .options() + .execution + .sort_spill_reservation_bytes; + let rt_config = RuntimeConfig::new() + .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0); + let runtime = Arc::new(RuntimeEnv::new(rt_config)?); + let session_ctx = SessionContext::with_config_rt(session_config, runtime); let partitions = 4; let csv = test::scan_partitioned_csv(partitions)?; @@ -996,11 +1048,18 @@ mod tests { ]; for (fetch, expect_spillage) in test_options { - let config = RuntimeConfig::new() - .with_memory_limit(avg_batch_size * (partitions - 1), 1.0); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let session_ctx = - SessionContext::with_config_rt(SessionConfig::new(), runtime); + let session_config = SessionConfig::new(); + let sort_spill_reservation_bytes = session_config + .options() + .execution + .sort_spill_reservation_bytes; + + let rt_config = RuntimeConfig::new().with_memory_limit( + sort_spill_reservation_bytes + avg_batch_size * (partitions - 1), + 1.0, + ); + let runtime = Arc::new(RuntimeEnv::new(rt_config)?); + let session_ctx = SessionContext::with_config_rt(session_config, runtime); let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index e8d571631bab..6b978b5ee753 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -30,6 +30,7 @@ use crate::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; +use datafusion_execution::memory_pool::MemoryConsumer; use arrow::datatypes::SchemaRef; use datafusion_common::{DataFusionError, Result}; @@ -213,6 +214,10 @@ impl ExecutionPlan for SortPreservingMergeExec { ); let schema = self.schema(); + let reservation = + MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]")) + .register(&context.runtime_env().memory_pool); + match input_partitions { 0 => Err(DataFusionError::Internal( "SortPreservingMergeExec requires at least one input partition" @@ -241,6 +246,7 @@ impl ExecutionPlan for SortPreservingMergeExec { BaselineMetrics::new(&self.metrics, partition), context.session_config().batch_size(), self.fetch, + reservation, )?; debug!("Got stream result from SortPreservingMergeStream::new_from_receivers"); @@ -843,14 +849,18 @@ mod tests { } let metrics = ExecutionPlanMetricsSet::new(); + let reservation = + MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool); + let fetch = None; let merge_stream = streaming_merge( streams, batches.schema(), sort.as_slice(), BaselineMetrics::new(&metrics, 0), task_ctx.session_config().batch_size(), - None, + fetch, + reservation, ) .unwrap(); diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs b/datafusion/core/src/physical_plan/sorts/stream.rs index 97a3b85fa535..9ef13b7eb25e 100644 --- a/datafusion/core/src/physical_plan/sorts/stream.rs +++ b/datafusion/core/src/physical_plan/sorts/stream.rs @@ -23,6 +23,7 @@ use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, SortField}; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; use futures::stream::{Fuse, StreamExt}; use std::marker::PhantomData; use std::sync::Arc; @@ -84,6 +85,8 @@ pub struct RowCursorStream { column_expressions: Vec>, /// Input streams streams: FusedStreams, + /// Tracks the memory used by `converter` + reservation: MemoryReservation, } impl RowCursorStream { @@ -91,6 +94,7 @@ impl RowCursorStream { schema: &Schema, expressions: &[PhysicalSortExpr], streams: Vec, + reservation: MemoryReservation, ) -> Result { let sort_fields = expressions .iter() @@ -104,6 +108,7 @@ impl RowCursorStream { let converter = RowConverter::new(sort_fields)?; Ok(Self { converter, + reservation, column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), streams: FusedStreams(streams), }) @@ -117,7 +122,12 @@ impl RowCursorStream { .collect::>>()?; let rows = self.converter.convert_columns(&cols)?; - Ok(RowCursor::new(rows)) + self.reservation.try_resize(self.converter.size())?; + + // track the memory in the newly created Rows. + let mut rows_reservation = self.reservation.new_empty(); + rows_reservation.try_grow(rows.size())?; + Ok(RowCursor::new(rows, rows_reservation)) } } diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs index 1f72e0fcb45b..d927b2807d7b 100644 --- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs @@ -22,13 +22,13 @@ use arrow::{ compute::SortOptions, record_batch::RecordBatch, }; -use datafusion::execution::memory_pool::GreedyMemoryPool; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::expressions::{col, PhysicalSortExpr}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::GreedyMemoryPool; use rand::Rng; use std::sync::Arc; use test_utils::{batches_to_vec, partitions_to_sorted_vec}; @@ -76,10 +76,20 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { let exec = MemoryExec::try_new(&input, schema, None).unwrap(); let sort = Arc::new(SortExec::new(sort, Arc::new(exec))); + let session_config = SessionConfig::new(); + // Make sure there is enough space for the initial spill + // reservation + let pool_size = pool_size.saturating_add( + session_config + .options() + .execution + .sort_spill_reservation_bytes, + ); + let runtime_config = RuntimeConfig::new() .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); - let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); + let session_ctx = SessionContext::with_config_rt(session_config, runtime); let task_ctx = session_ctx.task_ctx(); let collected = collect(sort.clone(), task_ctx).await.unwrap(); @@ -88,9 +98,17 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { let actual = batches_to_vec(&collected); if spill { - assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); + assert_ne!( + sort.metrics().unwrap().spill_count().unwrap(), + 0, + "{pool_size} {size}" + ); } else { - assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); + assert_eq!( + sort.metrics().unwrap().spill_count().unwrap(), + 0, + "{pool_size} {size}" + ); } assert_eq!( diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index a7cff6cbd758..80bbbed8f07f 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -17,12 +17,21 @@ //! This module contains tests for limiting memory at runtime in DataFusion -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{Int32Type, SchemaRef}; use arrow::record_batch::RecordBatch; +use arrow_array::{ArrayRef, DictionaryArray}; +use arrow_schema::SortOptions; +use async_trait::async_trait; +use datafusion::assert_batches_eq; use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::common::batch_byte_size; +use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::streaming::PartitionStream; +use datafusion_expr::{Expr, TableType}; +use datafusion_physical_expr::PhysicalSortExpr; use futures::StreamExt; -use std::sync::Arc; +use std::any::Any; +use std::sync::{Arc, OnceLock}; use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::{MemTable, TableProvider}; @@ -31,8 +40,8 @@ use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_optimizer::join_selection::JoinSelection; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::SendableRecordBatchStream; -use datafusion_common::assert_contains; +use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; +use datafusion_common::{assert_contains, Result}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_execution::TaskContext; @@ -196,6 +205,110 @@ async fn symmetric_hash_join() { .await } +#[tokio::test] +async fn sort_preserving_merge() { + let partition_size = batches_byte_size(&dict_batches()); + + TestCase::new( + // This query uses the exact same ordering as the input table + // so only a merge is needed + "select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10", + vec![ + "Resources exhausted: Failed to allocate additional", + "SortPreservingMergeExec", + ], + // provide insufficient memory to merge + partition_size / 2, + ) + // two partitions of data, so a merge is required + .with_scenario(Scenario::DictionaryStrings(2)) + .with_expected_plan( + // It is important that this plan only has + // SortPreservingMergeExec (not a Sort which would compete + // with the SortPreservingMergeExec for memory) + &[ + "+---------------+-------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+-------------------------------------------------------------------------------------------------------------+", + "| logical_plan | Limit: skip=0, fetch=10 |", + "| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |", + "| | TableScan: t projection=[a, b] |", + "| physical_plan | GlobalLimitExec: skip=0, fetch=10 |", + "| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |", + "| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |", + "| | |", + "+---------------+-------------------------------------------------------------------------------------------------------------+", + ] + ) + .run() + .await +} + +#[tokio::test] +async fn sort_spill_reservation() { + let partition_size = batches_byte_size(&dict_batches()); + + let base_config = SessionConfig::new() + // do not allow the sort to use the 'concat in place' path + .with_sort_in_place_threshold_bytes(10); + + // This test case shows how sort_spill_reservation works by + // purposely sorting data that requires non trivial memory to + // sort/merge. + let test = TestCase::new( + // This query uses a different order than the input table to + // force a sort. It also needs to have multiple columns to + // force RowFormat / interner that makes merge require + // substantial memory + "select * from t ORDER BY a , b DESC", + vec![], // expected errors set below + // enough memory to sort if we don't try to merge it all at once + (partition_size * 5) / 2, + ) + // use a single partiton so only a sort is needed + .with_scenario(Scenario::DictionaryStrings(1)) + .with_disk_manager_config(DiskManagerConfig::NewOs) + .with_expected_plan( + // It is important that this plan only has a SortExec, not + // also merge, so we can ensure the sort could finish + // given enough merging memory + &[ + "+---------------+--------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+--------------------------------------------------------------------------------------------------------+", + "| logical_plan | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST |", + "| | TableScan: t projection=[a, b] |", + "| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] |", + "| | MemoryExec: partitions=1, partition_sizes=[5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |", + "| | |", + "+---------------+--------------------------------------------------------------------------------------------------------+", + ] + ); + + let config = base_config + .clone() + // provide insufficient reserved space for merging, + // the sort will fail while trying to merge + .with_sort_spill_reservation_bytes(1024); + + test.clone() + .with_expected_errors(vec![ + "Resources exhausted: Failed to allocate additional", + "ExternalSorterMerge", // merging in sort fails + ]) + .with_config(config) + .run() + .await; + + let config = base_config + // reserve sufficient space up front for merge and this time, + // which will force the spills to happen with less buffered + // input and thus with enough to merge. + .with_sort_spill_reservation_bytes(2 * partition_size); + + test.with_config(config).with_expected_success().run().await; +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] @@ -205,9 +318,17 @@ struct TestCase { memory_limit: usize, config: SessionConfig, scenario: Scenario, + /// How should the disk manager (that allows spilling) be + /// configured? Defaults to `Disabled` + disk_manager_config: DiskManagerConfig, + /// Expected explain plan, if non emptry + expected_plan: Vec, + /// Is the plan expected to pass? Defaults to false + expected_success: bool, } impl TestCase { + // TODO remove expected errors and memory limits and query from constructor fn new<'a>( query: impl Into, expected_errors: impl IntoIterator, @@ -222,21 +343,56 @@ impl TestCase { memory_limit, config: SessionConfig::new(), scenario: Scenario::AccessLog, + disk_manager_config: DiskManagerConfig::Disabled, + expected_plan: vec![], + expected_success: false, } } + /// Set a list of expected strings that must appear in any errors + fn with_expected_errors<'a>( + mut self, + expected_errors: impl IntoIterator, + ) -> Self { + self.expected_errors = + expected_errors.into_iter().map(|s| s.to_string()).collect(); + self + } + /// Specify the configuration to use pub fn with_config(mut self, config: SessionConfig) -> Self { self.config = config; self } + /// Mark that the test expects the query to run successfully + pub fn with_expected_success(mut self) -> Self { + self.expected_success = true; + self + } + /// Specify the scenario to run pub fn with_scenario(mut self, scenario: Scenario) -> Self { self.scenario = scenario; self } + /// Specify if the disk manager should be enabled. If true, + /// operators that support it can spill + pub fn with_disk_manager_config( + mut self, + disk_manager_config: DiskManagerConfig, + ) -> Self { + self.disk_manager_config = disk_manager_config; + self + } + + /// Specify an expected plan to review + pub fn with_expected_plan(mut self, expected_plan: &[&str]) -> Self { + self.expected_plan = expected_plan.iter().map(|s| s.to_string()).collect(); + self + } + /// Run the test, panic'ing on error async fn run(self) { let Self { @@ -245,33 +401,62 @@ impl TestCase { memory_limit, config, scenario, + disk_manager_config, + expected_plan, + expected_success, } = self; let table = scenario.table(); let rt_config = RuntimeConfig::new() // do not allow spilling - .with_disk_manager(DiskManagerConfig::Disabled) + .with_disk_manager(disk_manager_config) .with_memory_limit(memory_limit, MEMORY_FRACTION); let runtime = RuntimeEnv::new(rt_config).unwrap(); // Configure execution - let state = SessionState::with_config_rt(config, Arc::new(runtime)) - .with_physical_optimizer_rules(scenario.rules()); + let state = SessionState::with_config_rt(config, Arc::new(runtime)); + let state = match scenario.rules() { + Some(rules) => state.with_physical_optimizer_rules(rules), + None => state, + }; let ctx = SessionContext::with_state(state); ctx.register_table("t", table).expect("registering table"); let df = ctx.sql(&query).await.expect("Planning query"); + if !expected_plan.is_empty() { + let expected_plan: Vec<_> = + expected_plan.iter().map(|s| s.as_str()).collect(); + let actual_plan = df + .clone() + .explain(false, false) + .unwrap() + .collect() + .await + .unwrap(); + assert_batches_eq!(expected_plan, &actual_plan); + } + match df.collect().await { Ok(_batches) => { - panic!("Unexpected success when running, expected memory limit failure") + if !expected_success { + panic!( + "Unexpected success when running, expected memory limit failure" + ) + } } Err(e) => { - for error_substring in expected_errors { - assert_contains!(e.to_string(), error_substring); + if expected_success { + panic!( + "Unexpected failure when running, expected success but got: {e}" + ) + } else { + for error_substring in expected_errors { + assert_contains!(e.to_string(), error_substring); + } } } } @@ -290,6 +475,9 @@ enum Scenario { /// 1000 rows of access log data with batches of 50 rows in a /// [`StreamingTable`] AccessLogStreaming, + + /// N partitions of of sorted, dictionary encoded strings + DictionaryStrings(usize), } impl Scenario { @@ -317,24 +505,53 @@ impl Scenario { .with_infinite_table(true); Arc::new(table) } + Self::DictionaryStrings(num_partitions) => { + use datafusion::physical_expr::expressions::col; + let batches: Vec> = std::iter::repeat(dict_batches()) + .take(*num_partitions) + .collect(); + + let schema = batches[0][0].schema(); + let options = SortOptions { + descending: false, + nulls_first: false, + }; + let sort_information = vec![ + PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options, + }, + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options, + }, + ]; + + let table = SortedTableProvider::new(batches, sort_information); + Arc::new(table) + } } } - /// return the optimizer rules to use - fn rules(&self) -> Vec> { + /// return specific physical optimizer rules to use + fn rules(&self) -> Option>> { match self { Self::AccessLog => { // Disabling physical optimizer rules to avoid sorts / // repartitions (since RepartitionExec / SortExec also // has a memory budget which we'll likely hit first) - vec![] + Some(vec![]) } Self::AccessLogStreaming => { // Disable all physical optimizer rules except the // JoinSelection rule to avoid sorts or repartition, // as they also have memory budgets that may be hit // first - vec![Arc::new(JoinSelection::new())] + Some(vec![Arc::new(JoinSelection::new())]) + } + Self::DictionaryStrings(_) => { + // Use default rules + None } } } @@ -347,6 +564,56 @@ fn access_log_batches() -> Vec { .collect() } +static DICT_BATCHES: OnceLock> = OnceLock::new(); + +/// Returns 5 sorted string dictionary batches each with 50 rows with +/// this schema. +/// +/// a: Dictionary, +/// b: Dictionary, +fn dict_batches() -> Vec { + DICT_BATCHES.get_or_init(make_dict_batches).clone() +} + +fn make_dict_batches() -> Vec { + let batch_size = 50; + + let mut i = 0; + let gen = std::iter::from_fn(move || { + // create values like + // 0000000001 + // 0000000002 + // ... + // 0000000002 + + let values: Vec<_> = (i..i + batch_size).map(|x| format!("{x:010}")).collect(); + //println!("values: \n{values:?}"); + let array: DictionaryArray = + values.iter().map(|s| s.as_str()).collect(); + let array = Arc::new(array) as ArrayRef; + let batch = + RecordBatch::try_from_iter(vec![("a", array.clone()), ("b", array)]).unwrap(); + + i += batch_size; + Some(batch) + }); + + let num_batches = 5; + + let batches: Vec<_> = gen.take(num_batches).collect(); + + batches.iter().enumerate().for_each(|(i, batch)| { + println!("Dict batch[{i}] size is: {}", batch_byte_size(batch)); + }); + + batches +} + +// How many bytes does the memory from dict_batches consume? +fn batches_byte_size(batches: &[RecordBatch]) -> usize { + batches.iter().map(batch_byte_size).sum() +} + struct DummyStreamPartition { schema: SchemaRef, batches: Vec, @@ -366,3 +633,53 @@ impl PartitionStream for DummyStreamPartition { )) } } + +/// Wrapper over a TableProvider that can provide ordering information +struct SortedTableProvider { + schema: SchemaRef, + batches: Vec>, + sort_information: Vec, +} + +impl SortedTableProvider { + fn new( + batches: Vec>, + sort_information: Vec, + ) -> Self { + let schema = batches[0][0].schema(); + Self { + schema, + batches, + sort_information, + } + } +} + +#[async_trait] +impl TableProvider for SortedTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let mem_exec = + MemoryExec::try_new(&self.batches, self.schema(), projection.cloned())? + .with_sort_information(self.sort_information.clone()); + + Ok(Arc::new(mem_exec)) + } +} diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index fcb818d5fd48..162e20820191 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -153,6 +153,8 @@ datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false datafusion.execution.parquet.skip_metadata true datafusion.execution.planning_concurrency 13 +datafusion.execution.sort_in_place_threshold_bytes 1048576 +datafusion.execution.sort_spill_reservation_bytes 10485760 datafusion.execution.target_partitions 7 datafusion.execution.time_zone +00:00 datafusion.explain.logical_plan_only false diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index c8478499365e..44fcc2ab49b4 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -287,6 +287,32 @@ impl SessionConfig { self.options.optimizer.enable_round_robin_repartition } + /// Set the size of [`sort_spill_reservation_bytes`] to control + /// memory pre-reservation + /// + /// [`sort_spill_reservation_bytes`]: datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes + pub fn with_sort_spill_reservation_bytes( + mut self, + sort_spill_reservation_bytes: usize, + ) -> Self { + self.options.execution.sort_spill_reservation_bytes = + sort_spill_reservation_bytes; + self + } + + /// Set the size of [`sort_in_place_threshold_bytes`] to control + /// how sort does things. + /// + /// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes + pub fn with_sort_in_place_threshold_bytes( + mut self, + sort_in_place_threshold_bytes: usize, + ) -> Self { + self.options.execution.sort_in_place_threshold_bytes = + sort_in_place_threshold_bytes; + self + } + /// Convert configuration options to name-value pairs with values /// converted to strings. /// diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 107c58fbe327..e8d2ed9cc0f5 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -102,6 +102,13 @@ impl DiskManager { } } + /// Return true if this disk manager supports creating temporary + /// files. If this returns false, any call to `create_tmp_file` + /// will error. + pub fn tmp_files_enabled(&self) -> bool { + self.local_dirs.lock().is_some() + } + /// Return a temporary file from a randomized choice in the configured locations /// /// If the file can not be created for some reason, returns an @@ -198,6 +205,7 @@ mod tests { ); let dm = DiskManager::try_new(config)?; + assert!(dm.tmp_files_enabled()); let actual = dm.create_tmp_file("Testing")?; // the file should be in one of the specified local directories @@ -210,6 +218,7 @@ mod tests { fn test_disabled_disk_manager() { let config = DiskManagerConfig::Disabled; let manager = DiskManager::try_new(config).unwrap(); + assert!(!manager.tmp_files_enabled()); assert_eq!( manager.create_tmp_file("Testing").unwrap_err().to_string(), "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)", diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index bff7cb4da012..63c9c064bc52 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -57,6 +57,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | +| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level |