Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,22 @@ config_namespace! {
///
/// Defaults to the number of CPU cores on the system
pub planning_concurrency: usize, default = num_cpus::get()

/// Specifies the reserved memory for each spillable sort operation to
/// facilitate an in-memory merge.
///
/// When a sort operation spills to disk, the in-memory data must be
/// sorted and merged before being written to a file. This setting reserves
/// a specific amount of memory for that in-memory sort/merge process.
///
/// Note: This setting is irrelevant if the sort operation cannot spill
/// (i.e., if there's no `DiskManager` configured).
pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024

/// When sorting, below what size should data be concatenated
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a behavior change. This constant was hard coded in sort.rs -- I have just pulled it out into its own config setting so I can write tests

}
}

Expand Down
11 changes: 9 additions & 2 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,14 +574,21 @@ impl ExecutionPlan for RepartitionExec {

// Get existing ordering:
let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
// Merge streams (while preserving ordering) coming from input partitions to this partition:

// Merge streams (while preserving ordering) coming from
// input partitions to this partition:
let fetch = None;
let merge_reservation =
MemoryConsumer::new(format!("{}[Merge {partition}]", self.name()))
.register(context.memory_pool());
streaming_merge(
input_streams,
self.schema(),
sort_exprs,
BaselineMetrics::new(&self.metrics, partition),
context.session_config().batch_size(),
None,
fetch,
merge_reservation,
)
} else {
Ok(Box::pin(RepartitionStream {
Expand Down
22 changes: 18 additions & 4 deletions datafusion/core/src/physical_plan/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;

#[derive(Debug, Copy, Clone, Default)]
struct BatchCursor {
Expand All @@ -37,6 +38,9 @@ pub struct BatchBuilder {
/// Maintain a list of [`RecordBatch`] and their corresponding stream
batches: Vec<(usize, RecordBatch)>,

/// Accounts for memory used by buffered batches
reservation: MemoryReservation,

/// The current [`BatchCursor`] for each stream
cursors: Vec<BatchCursor>,

Expand All @@ -47,23 +51,31 @@ pub struct BatchBuilder {

impl BatchBuilder {
/// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
pub fn new(
schema: SchemaRef,
stream_count: usize,
batch_size: usize,
reservation: MemoryReservation,
) -> Self {
Self {
schema,
batches: Vec::with_capacity(stream_count * 2),
cursors: vec![BatchCursor::default(); stream_count],
indices: Vec::with_capacity(batch_size),
reservation,
}
}

/// Append a new batch in `stream_idx`
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
self.reservation.try_grow(batch.get_array_memory_size())?;
let batch_idx = self.batches.len();
self.batches.push((stream_idx, batch));
self.cursors[stream_idx] = BatchCursor {
batch_idx,
row_idx: 0,
}
};
Ok(())
}

/// Append the next row from `stream_idx`
Expand Down Expand Up @@ -119,14 +131,16 @@ impl BatchBuilder {
// We can therefore drop all but the last batch for each stream
let mut batch_idx = 0;
let mut retained = 0;
self.batches.retain(|(stream_idx, _)| {
self.batches.retain(|(stream_idx, batch)| {
let stream_cursor = &mut self.cursors[*stream_idx];
let retain = stream_cursor.batch_idx == batch_idx;
batch_idx += 1;

if retain {
stream_cursor.batch_idx = retained;
retained += 1;
} else {
self.reservation.shrink(batch.get_array_memory_size());
}
retain
});
Expand Down
20 changes: 18 additions & 2 deletions datafusion/core/src/physical_plan/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp;
use arrow::row::{Row, Rows};
use arrow_array::types::ByteArrayType;
use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
use datafusion_execution::memory_pool::MemoryReservation;
use std::cmp::Ordering;

/// A [`Cursor`] for [`Rows`]
Expand All @@ -29,6 +30,11 @@ pub struct RowCursor {
num_rows: usize,

rows: Rows,

/// Tracks for the memory used by in the `Rows` of this
/// cursor. Freed on drop
#[allow(dead_code)]
reservation: MemoryReservation,
}

impl std::fmt::Debug for RowCursor {
Expand All @@ -41,12 +47,22 @@ impl std::fmt::Debug for RowCursor {
}

impl RowCursor {
/// Create a new SortKeyCursor
pub fn new(rows: Rows) -> Self {
/// Create a new SortKeyCursor from `rows` and a `reservation`
/// that tracks its memory.
///
/// Panic's if the reservation is not for exactly `rows.size()`
/// bytes
pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
assert_eq!(
rows.size(),
reservation.size(),
"memory reservation mismatch"
);
Self {
cur_row: 0,
num_rows: rows.num_rows(),
rows,
reservation,
}
}

Expand Down
30 changes: 20 additions & 10 deletions datafusion/core/src/physical_plan/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::*;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
use futures::Stream;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
Expand All @@ -42,14 +43,15 @@ macro_rules! primitive_merge_helper {
}

macro_rules! merge_helper {
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident) => {{
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{
let streams = FieldCursorStream::<$t>::new($sort, $streams);
return Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
$schema,
$tracking_metrics,
$batch_size,
$fetch,
$reservation,
)));
}};
}
Expand All @@ -63,28 +65,36 @@ pub fn streaming_merge(
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
// Special case single column comparisons with optimized cursor implementations
if expressions.len() == 1 {
let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch)
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
_ => {}
}
}

let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?;
let streams = RowCursorStream::try_new(
schema.as_ref(),
expressions,
streams,
reservation.new_empty(),
)?;

Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
schema,
metrics,
batch_size,
fetch,
reservation,
)))
}

Expand Down Expand Up @@ -162,11 +172,12 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Self {
let stream_count = streams.partitions();

Self {
in_progress: BatchBuilder::new(schema, stream_count, batch_size),
in_progress: BatchBuilder::new(schema, stream_count, batch_size, reservation),
streams,
metrics,
aborted: false,
Expand Down Expand Up @@ -197,8 +208,7 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
Some(Err(e)) => Poll::Ready(Err(e)),
Some(Ok((cursor, batch))) => {
self.cursors[idx] = Some(cursor);
self.in_progress.push_batch(idx, batch);
Poll::Ready(Ok(()))
Poll::Ready(self.in_progress.push_batch(idx, batch))
}
}
}
Expand Down
Loading