Skip to content

Commit 161c6d3

Browse files
authored
Account for memory usage in SortPreservingMerge (#5885) (#7130)
* Account for memory usage in SortPreservingMerge * Review Comments: Improve documentation and comments * Review Comments: Improve documentation and comments
1 parent d8692b1 commit 161c6d3

File tree

14 files changed

+590
-74
lines changed

14 files changed

+590
-74
lines changed

datafusion/common/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,22 @@ config_namespace! {
235235
///
236236
/// Defaults to the number of CPU cores on the system
237237
pub planning_concurrency: usize, default = num_cpus::get()
238+
239+
/// Specifies the reserved memory for each spillable sort operation to
240+
/// facilitate an in-memory merge.
241+
///
242+
/// When a sort operation spills to disk, the in-memory data must be
243+
/// sorted and merged before being written to a file. This setting reserves
244+
/// a specific amount of memory for that in-memory sort/merge process.
245+
///
246+
/// Note: This setting is irrelevant if the sort operation cannot spill
247+
/// (i.e., if there's no `DiskManager` configured).
248+
pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
249+
250+
/// When sorting, below what size should data be concatenated
251+
/// and sorted in a single RecordBatch rather than sorted in
252+
/// batches and merged.
253+
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
238254
}
239255
}
240256

datafusion/core/src/physical_plan/repartition/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,14 +574,21 @@ impl ExecutionPlan for RepartitionExec {
574574

575575
// Get existing ordering:
576576
let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
577-
// Merge streams (while preserving ordering) coming from input partitions to this partition:
577+
578+
// Merge streams (while preserving ordering) coming from
579+
// input partitions to this partition:
580+
let fetch = None;
581+
let merge_reservation =
582+
MemoryConsumer::new(format!("{}[Merge {partition}]", self.name()))
583+
.register(context.memory_pool());
578584
streaming_merge(
579585
input_streams,
580586
self.schema(),
581587
sort_exprs,
582588
BaselineMetrics::new(&self.metrics, partition),
583589
context.session_config().batch_size(),
584-
None,
590+
fetch,
591+
merge_reservation,
585592
)
586593
} else {
587594
Ok(Box::pin(RepartitionStream {

datafusion/core/src/physical_plan/sorts/builder.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use arrow::compute::interleave;
1919
use arrow::datatypes::SchemaRef;
2020
use arrow::record_batch::RecordBatch;
2121
use datafusion_common::Result;
22+
use datafusion_execution::memory_pool::MemoryReservation;
2223

2324
#[derive(Debug, Copy, Clone, Default)]
2425
struct BatchCursor {
@@ -37,6 +38,9 @@ pub struct BatchBuilder {
3738
/// Maintain a list of [`RecordBatch`] and their corresponding stream
3839
batches: Vec<(usize, RecordBatch)>,
3940

41+
/// Accounts for memory used by buffered batches
42+
reservation: MemoryReservation,
43+
4044
/// The current [`BatchCursor`] for each stream
4145
cursors: Vec<BatchCursor>,
4246

@@ -47,23 +51,31 @@ pub struct BatchBuilder {
4751

4852
impl BatchBuilder {
4953
/// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
50-
pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
54+
pub fn new(
55+
schema: SchemaRef,
56+
stream_count: usize,
57+
batch_size: usize,
58+
reservation: MemoryReservation,
59+
) -> Self {
5160
Self {
5261
schema,
5362
batches: Vec::with_capacity(stream_count * 2),
5463
cursors: vec![BatchCursor::default(); stream_count],
5564
indices: Vec::with_capacity(batch_size),
65+
reservation,
5666
}
5767
}
5868

5969
/// Append a new batch in `stream_idx`
60-
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
70+
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
71+
self.reservation.try_grow(batch.get_array_memory_size())?;
6172
let batch_idx = self.batches.len();
6273
self.batches.push((stream_idx, batch));
6374
self.cursors[stream_idx] = BatchCursor {
6475
batch_idx,
6576
row_idx: 0,
66-
}
77+
};
78+
Ok(())
6779
}
6880

6981
/// Append the next row from `stream_idx`
@@ -119,14 +131,16 @@ impl BatchBuilder {
119131
// We can therefore drop all but the last batch for each stream
120132
let mut batch_idx = 0;
121133
let mut retained = 0;
122-
self.batches.retain(|(stream_idx, _)| {
134+
self.batches.retain(|(stream_idx, batch)| {
123135
let stream_cursor = &mut self.cursors[*stream_idx];
124136
let retain = stream_cursor.batch_idx == batch_idx;
125137
batch_idx += 1;
126138

127139
if retain {
128140
stream_cursor.batch_idx = retained;
129141
retained += 1;
142+
} else {
143+
self.reservation.shrink(batch.get_array_memory_size());
130144
}
131145
retain
132146
});

datafusion/core/src/physical_plan/sorts/cursor.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp;
2121
use arrow::row::{Row, Rows};
2222
use arrow_array::types::ByteArrayType;
2323
use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
24+
use datafusion_execution::memory_pool::MemoryReservation;
2425
use std::cmp::Ordering;
2526

2627
/// A [`Cursor`] for [`Rows`]
@@ -29,6 +30,11 @@ pub struct RowCursor {
2930
num_rows: usize,
3031

3132
rows: Rows,
33+
34+
/// Tracks for the memory used by in the `Rows` of this
35+
/// cursor. Freed on drop
36+
#[allow(dead_code)]
37+
reservation: MemoryReservation,
3238
}
3339

3440
impl std::fmt::Debug for RowCursor {
@@ -41,12 +47,22 @@ impl std::fmt::Debug for RowCursor {
4147
}
4248

4349
impl RowCursor {
44-
/// Create a new SortKeyCursor
45-
pub fn new(rows: Rows) -> Self {
50+
/// Create a new SortKeyCursor from `rows` and a `reservation`
51+
/// that tracks its memory.
52+
///
53+
/// Panic's if the reservation is not for exactly `rows.size()`
54+
/// bytes
55+
pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
56+
assert_eq!(
57+
rows.size(),
58+
reservation.size(),
59+
"memory reservation mismatch"
60+
);
4661
Self {
4762
cur_row: 0,
4863
num_rows: rows.num_rows(),
4964
rows,
65+
reservation,
5066
}
5167
}
5268

datafusion/core/src/physical_plan/sorts/merge.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use arrow::datatypes::{DataType, SchemaRef};
3131
use arrow::record_batch::RecordBatch;
3232
use arrow_array::*;
3333
use datafusion_common::Result;
34+
use datafusion_execution::memory_pool::MemoryReservation;
3435
use futures::Stream;
3536
use std::pin::Pin;
3637
use std::task::{ready, Context, Poll};
@@ -42,14 +43,15 @@ macro_rules! primitive_merge_helper {
4243
}
4344

4445
macro_rules! merge_helper {
45-
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident) => {{
46+
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{
4647
let streams = FieldCursorStream::<$t>::new($sort, $streams);
4748
return Ok(Box::pin(SortPreservingMergeStream::new(
4849
Box::new(streams),
4950
$schema,
5051
$tracking_metrics,
5152
$batch_size,
5253
$fetch,
54+
$reservation,
5355
)));
5456
}};
5557
}
@@ -63,28 +65,36 @@ pub fn streaming_merge(
6365
metrics: BaselineMetrics,
6466
batch_size: usize,
6567
fetch: Option<usize>,
68+
reservation: MemoryReservation,
6669
) -> Result<SendableRecordBatchStream> {
6770
// Special case single column comparisons with optimized cursor implementations
6871
if expressions.len() == 1 {
6972
let sort = expressions[0].clone();
7073
let data_type = sort.expr.data_type(schema.as_ref())?;
7174
downcast_primitive! {
72-
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch),
73-
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch)
74-
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch)
75-
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch)
76-
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch)
75+
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation),
76+
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
77+
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
78+
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
79+
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
7780
_ => {}
7881
}
7982
}
8083

81-
let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?;
84+
let streams = RowCursorStream::try_new(
85+
schema.as_ref(),
86+
expressions,
87+
streams,
88+
reservation.new_empty(),
89+
)?;
90+
8291
Ok(Box::pin(SortPreservingMergeStream::new(
8392
Box::new(streams),
8493
schema,
8594
metrics,
8695
batch_size,
8796
fetch,
97+
reservation,
8898
)))
8999
}
90100

@@ -162,11 +172,12 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
162172
metrics: BaselineMetrics,
163173
batch_size: usize,
164174
fetch: Option<usize>,
175+
reservation: MemoryReservation,
165176
) -> Self {
166177
let stream_count = streams.partitions();
167178

168179
Self {
169-
in_progress: BatchBuilder::new(schema, stream_count, batch_size),
180+
in_progress: BatchBuilder::new(schema, stream_count, batch_size, reservation),
170181
streams,
171182
metrics,
172183
aborted: false,
@@ -197,8 +208,7 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
197208
Some(Err(e)) => Poll::Ready(Err(e)),
198209
Some(Ok((cursor, batch))) => {
199210
self.cursors[idx] = Some(cursor);
200-
self.in_progress.push_batch(idx, batch);
201-
Poll::Ready(Ok(()))
211+
Poll::Ready(self.in_progress.push_batch(idx, batch))
202212
}
203213
}
204214
}

0 commit comments

Comments
 (0)