From 147273d7460aa86583cfd41601b4414fda50dc95 Mon Sep 17 00:00:00 2001 From: mertak-synnada Date: Wed, 2 Oct 2024 14:30:12 +0300 Subject: [PATCH 1/3] simplify streaming_merge function parameters --- .../sort_preserving_repartition_fuzz.rs | 16 ++--- datafusion/core/tests/memory_limit/mod.rs | 3 +- .../physical-plan/src/aggregates/row_hash.rs | 16 ++--- .../physical-plan/src/repartition/mod.rs | 18 ++--- datafusion/physical-plan/src/sorts/mod.rs | 1 - datafusion/physical-plan/src/sorts/sort.rs | 32 ++++----- .../src/sorts/sort_preserving_merge.rs | 28 ++++---- .../src/sorts/streaming_merge.rs | 69 ++++++++++--------- 8 files changed, 94 insertions(+), 89 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index 408cadc35f48..c51dba0657ea 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -29,7 +29,7 @@ mod sp_repartition_fuzz_tests { metrics::{BaselineMetrics, ExecutionPlanMetricsSet}, repartition::RepartitionExec, sorts::sort_preserving_merge::SortPreservingMergeExec, - sorts::streaming_merge::streaming_merge, + sorts::streaming_merge::{streaming_merge, StreamingMergeConfig}, stream::RecordBatchStreamAdapter, ExecutionPlan, Partitioning, }; @@ -246,15 +246,15 @@ mod sp_repartition_fuzz_tests { MemoryConsumer::new("test".to_string()).register(context.memory_pool()); // Internally SortPreservingMergeExec uses this function for merging. - let res = streaming_merge( + let res = streaming_merge(StreamingMergeConfig { streams, schema, - &exprs, - BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0), - 1, - None, - mem_reservation, - )?; + expressions: &exprs, + metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0), + batch_size: 1, + fetch: None, + reservation: mem_reservation, + })?; let res = collect(res).await?; // Contains the merged result. let res = concat_batches(&res[0].schema(), &res)?; diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index ec66df45c7ba..e1913b9ef4d4 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -301,7 +301,8 @@ async fn sort_spill_reservation() { test.clone() .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:", + "ExternalSorterMerge" ]) .with_config(config) .run() diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index a043905765ec..33b3965056df 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -29,7 +29,7 @@ use crate::aggregates::{ }; use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; use crate::sorts::sort::sort_batch; -use crate::sorts::streaming_merge; +use crate::sorts::streaming_merge::{streaming_merge, StreamingMergeConfig}; use crate::spill::{read_spill_as_stream, spill_record_batch_by_size}; use crate::stream::RecordBatchStreamAdapter; use crate::{aggregates, metrics, ExecutionPlan, PhysicalExpr}; @@ -1001,15 +1001,15 @@ impl GroupedHashAggregateStream { streams.push(stream); } self.spill_state.is_stream_merging = true; - self.input = streaming_merge( + self.input = streaming_merge(StreamingMergeConfig { streams, schema, - &self.spill_state.spill_expr, - self.baseline_metrics.clone(), - self.batch_size, - None, - self.reservation.new_empty(), - )?; + expressions: &self.spill_state.spill_expr, + metrics: self.baseline_metrics.clone(), + batch_size: self.batch_size, + fetch: None, + reservation: self.reservation.new_empty(), + })?; self.input_done = false; self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); Ok(()) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 4fd364cca4d0..83d2f7886bbe 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -34,7 +34,7 @@ use crate::metrics::BaselineMetrics; use crate::repartition::distributor_channels::{ channels, partition_aware_channels, DistributionReceiver, DistributionSender, }; -use crate::sorts::streaming_merge; +use crate::sorts::streaming_merge::{streaming_merge, StreamingMergeConfig}; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; @@ -637,15 +637,15 @@ impl ExecutionPlan for RepartitionExec { let merge_reservation = MemoryConsumer::new(format!("{}[Merge {partition}]", name)) .register(context.memory_pool()); - streaming_merge( - input_streams, - schema_captured, - &sort_exprs, - BaselineMetrics::new(&metrics, partition), - context.session_config().batch_size(), + streaming_merge(StreamingMergeConfig { + streams: input_streams, + schema: schema_captured, + expressions: &sort_exprs, + metrics: BaselineMetrics::new(&metrics, partition), + batch_size: context.session_config().batch_size(), fetch, - merge_reservation, - ) + reservation: merge_reservation, + }) } else { Ok(Box::pin(RepartitionStream { num_input_partitions, diff --git a/datafusion/physical-plan/src/sorts/mod.rs b/datafusion/physical-plan/src/sorts/mod.rs index 7c084761fdc3..ab5df37ed327 100644 --- a/datafusion/physical-plan/src/sorts/mod.rs +++ b/datafusion/physical-plan/src/sorts/mod.rs @@ -28,4 +28,3 @@ mod stream; pub mod streaming_merge; pub use index::RowIndex; -pub(crate) use streaming_merge::streaming_merge; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 91816713c6c3..f5d20316a2c8 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -30,7 +30,7 @@ use crate::limit::LimitStream; use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; -use crate::sorts::streaming_merge::streaming_merge; +use crate::sorts::streaming_merge::{streaming_merge, StreamingMergeConfig}; use crate::spill::{read_spill_as_stream, spill_record_batches}; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; @@ -342,15 +342,15 @@ impl ExternalSorter { streams.push(stream); } - streaming_merge( + streaming_merge(StreamingMergeConfig { streams, - Arc::clone(&self.schema), - &self.expr, - self.metrics.baseline.clone(), - self.batch_size, - self.fetch, - self.reservation.new_empty(), - ) + schema: Arc::clone(&self.schema), + expressions: &self.expr, + metrics: self.metrics.baseline.clone(), + batch_size: self.batch_size, + fetch: self.fetch, + reservation: self.reservation.new_empty(), + }) } else { self.in_mem_sort_stream(self.metrics.baseline.clone()) } @@ -534,15 +534,15 @@ impl ExternalSorter { }) .collect::>()?; - streaming_merge( + streaming_merge(StreamingMergeConfig { streams, - Arc::clone(&self.schema), - &self.expr, + schema: Arc::clone(&self.schema), + expressions: &self.expr, metrics, - self.batch_size, - self.fetch, - self.merge_reservation.new_empty(), - ) + batch_size: self.batch_size, + fetch: self.fetch, + reservation: self.merge_reservation.new_empty(), + }) } /// Sorts a single `RecordBatch` into a single stream. diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b00a11a5355f..34c19d6b9671 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -24,7 +24,7 @@ use crate::common::spawn_buffered; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; -use crate::sorts::streaming_merge; +use crate::sorts::streaming_merge::{streaming_merge, StreamingMergeConfig}; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, @@ -273,15 +273,15 @@ impl ExecutionPlan for SortPreservingMergeExec { debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute"); - let result = streaming_merge( - receivers, + let result = streaming_merge(StreamingMergeConfig { + streams: receivers, schema, - &self.expr, - BaselineMetrics::new(&self.metrics, partition), - context.session_config().batch_size(), - self.fetch, + expressions: &self.expr, + metrics: BaselineMetrics::new(&self.metrics, partition), + batch_size: context.session_config().batch_size(), + fetch: self.fetch, reservation, - )?; + })?; debug!("Got stream result from SortPreservingMergeStream::new_from_receivers"); @@ -960,15 +960,15 @@ mod tests { MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool); let fetch = None; - let merge_stream = streaming_merge( + let merge_stream = streaming_merge(StreamingMergeConfig { streams, - batches.schema(), - sort.as_slice(), - BaselineMetrics::new(&metrics, 0), - task_ctx.session_config().batch_size(), + schema: batches.schema(), + expressions: sort.as_slice(), + metrics: BaselineMetrics::new(&metrics, 0), + batch_size: task_ctx.session_config().batch_size(), fetch, reservation, - ) + }) .unwrap(); let mut merged = common::collect(merge_stream).await.unwrap(); diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 9e6618dd1af5..937de82be533 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -36,62 +36,67 @@ macro_rules! primitive_merge_helper { } macro_rules! merge_helper { - ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{ - let streams = FieldCursorStream::<$t>::new($sort, $streams); + ($t:ty, $sort:ident, $config:ident) => {{ + let streams = FieldCursorStream::<$t>::new($sort, $config.streams); return Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), - $schema, - $tracking_metrics, - $batch_size, - $fetch, - $reservation, + $config.schema, + $config.metrics, + $config.batch_size, + $config.fetch, + $config.reservation, ))); }}; } +/// Configuration parameters to initialize a `SortPreservingMergeStream` +pub struct StreamingMergeConfig<'a> { + pub streams: Vec, + pub schema: SchemaRef, + pub expressions: &'a [PhysicalSortExpr], + pub metrics: BaselineMetrics, + pub batch_size: usize, + pub fetch: Option, + pub reservation: MemoryReservation, +} + /// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions /// while preserving order. pub fn streaming_merge( - streams: Vec, - schema: SchemaRef, - expressions: &[PhysicalSortExpr], - metrics: BaselineMetrics, - batch_size: usize, - fetch: Option, - reservation: MemoryReservation, + config: StreamingMergeConfig, ) -> Result { // If there are no sort expressions, preserving the order // doesn't mean anything (and result in infinite loops) - if expressions.is_empty() { + if config.expressions.is_empty() { return internal_err!("Sort expressions cannot be empty for streaming merge"); } // Special case single column comparisons with optimized cursor implementations - if expressions.len() == 1 { - let sort = expressions[0].clone(); - let data_type = sort.expr.data_type(schema.as_ref())?; + if config.expressions.len() == 1 { + let sort = config.expressions[0].clone(); + let data_type = sort.expr.data_type(config.schema.as_ref())?; downcast_primitive! { - data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation), - DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) - DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) - DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) - DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + data_type => (primitive_merge_helper, sort, config), + DataType::Utf8 => merge_helper!(StringArray, sort, config) + DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, config) + DataType::Binary => merge_helper!(BinaryArray, sort, config) + DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, config) _ => {} } } let streams = RowCursorStream::try_new( - schema.as_ref(), - expressions, - streams, - reservation.new_empty(), + config.schema.as_ref(), + config.expressions, + config.streams, + config.reservation.new_empty(), )?; Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), - schema, - metrics, - batch_size, - fetch, - reservation, + config.schema, + config.metrics, + config.batch_size, + config.fetch, + config.reservation, ))) } From a1fd4d4f1ef83f26a9d95622b4810f2a33becd1c Mon Sep 17 00:00:00 2001 From: mertak-synnada Date: Thu, 3 Oct 2024 11:24:56 +0300 Subject: [PATCH 2/3] revert test change --- datafusion/core/tests/memory_limit/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index e1913b9ef4d4..ec66df45c7ba 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -301,8 +301,7 @@ async fn sort_spill_reservation() { test.clone() .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:", - "ExternalSorterMerge" + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge", ]) .with_config(config) .run() From 0ae69f5c8dc802e25ca3689aa4ba71dcf3183065 Mon Sep 17 00:00:00 2001 From: mertak-synnada Date: Fri, 4 Oct 2024 11:30:09 +0300 Subject: [PATCH 3/3] change StreamingMergeConfig into builder pattern --- .../sort_preserving_repartition_fuzz.rs | 19 +- .../physical-plan/src/aggregates/row_hash.rs | 19 +- .../physical-plan/src/repartition/mod.rs | 20 +-- datafusion/physical-plan/src/sorts/sort.rs | 38 ++-- .../src/sorts/sort_preserving_merge.rs | 39 ++-- .../src/sorts/streaming_merge.rs | 170 ++++++++++++------ 6 files changed, 184 insertions(+), 121 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index c51dba0657ea..0cd702372f7c 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -29,7 +29,7 @@ mod sp_repartition_fuzz_tests { metrics::{BaselineMetrics, ExecutionPlanMetricsSet}, repartition::RepartitionExec, sorts::sort_preserving_merge::SortPreservingMergeExec, - sorts::streaming_merge::{streaming_merge, StreamingMergeConfig}, + sorts::streaming_merge::StreamingMergeBuilder, stream::RecordBatchStreamAdapter, ExecutionPlan, Partitioning, }; @@ -246,15 +246,14 @@ mod sp_repartition_fuzz_tests { MemoryConsumer::new("test".to_string()).register(context.memory_pool()); // Internally SortPreservingMergeExec uses this function for merging. - let res = streaming_merge(StreamingMergeConfig { - streams, - schema, - expressions: &exprs, - metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0), - batch_size: 1, - fetch: None, - reservation: mem_reservation, - })?; + let res = StreamingMergeBuilder::new() + .with_streams(streams) + .with_schema(schema) + .with_expressions(&exprs) + .with_metrics(BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0)) + .with_batch_size(1) + .with_reservation(mem_reservation) + .build()?; let res = collect(res).await?; // Contains the merged result. let res = concat_batches(&res[0].schema(), &res)?; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 33b3965056df..74bc3aa3ca73 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -29,7 +29,7 @@ use crate::aggregates::{ }; use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; use crate::sorts::sort::sort_batch; -use crate::sorts::streaming_merge::{streaming_merge, StreamingMergeConfig}; +use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::{read_spill_as_stream, spill_record_batch_by_size}; use crate::stream::RecordBatchStreamAdapter; use crate::{aggregates, metrics, ExecutionPlan, PhysicalExpr}; @@ -1001,15 +1001,14 @@ impl GroupedHashAggregateStream { streams.push(stream); } self.spill_state.is_stream_merging = true; - self.input = streaming_merge(StreamingMergeConfig { - streams, - schema, - expressions: &self.spill_state.spill_expr, - metrics: self.baseline_metrics.clone(), - batch_size: self.batch_size, - fetch: None, - reservation: self.reservation.new_empty(), - })?; + self.input = StreamingMergeBuilder::new() + .with_streams(streams) + .with_schema(schema) + .with_expressions(&self.spill_state.spill_expr) + .with_metrics(self.baseline_metrics.clone()) + .with_batch_size(self.batch_size) + .with_reservation(self.reservation.new_empty()) + .build()?; self.input_done = false; self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); Ok(()) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 83d2f7886bbe..f0f198319ee3 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -34,7 +34,7 @@ use crate::metrics::BaselineMetrics; use crate::repartition::distributor_channels::{ channels, partition_aware_channels, DistributionReceiver, DistributionSender, }; -use crate::sorts::streaming_merge::{streaming_merge, StreamingMergeConfig}; +use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; @@ -637,15 +637,15 @@ impl ExecutionPlan for RepartitionExec { let merge_reservation = MemoryConsumer::new(format!("{}[Merge {partition}]", name)) .register(context.memory_pool()); - streaming_merge(StreamingMergeConfig { - streams: input_streams, - schema: schema_captured, - expressions: &sort_exprs, - metrics: BaselineMetrics::new(&metrics, partition), - batch_size: context.session_config().batch_size(), - fetch, - reservation: merge_reservation, - }) + StreamingMergeBuilder::new() + .with_streams(input_streams) + .with_schema(schema_captured) + .with_expressions(&sort_exprs) + .with_metrics(BaselineMetrics::new(&metrics, partition)) + .with_batch_size(context.session_config().batch_size()) + .with_fetch(fetch) + .with_reservation(merge_reservation) + .build() } else { Ok(Box::pin(RepartitionStream { num_input_partitions, diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index f5d20316a2c8..50f6f4a93097 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -30,7 +30,7 @@ use crate::limit::LimitStream; use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; -use crate::sorts::streaming_merge::{streaming_merge, StreamingMergeConfig}; +use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::{read_spill_as_stream, spill_record_batches}; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; @@ -342,15 +342,15 @@ impl ExternalSorter { streams.push(stream); } - streaming_merge(StreamingMergeConfig { - streams, - schema: Arc::clone(&self.schema), - expressions: &self.expr, - metrics: self.metrics.baseline.clone(), - batch_size: self.batch_size, - fetch: self.fetch, - reservation: self.reservation.new_empty(), - }) + StreamingMergeBuilder::new() + .with_streams(streams) + .with_schema(Arc::clone(&self.schema)) + .with_expressions(&self.expr) + .with_metrics(self.metrics.baseline.clone()) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .with_reservation(self.reservation.new_empty()) + .build() } else { self.in_mem_sort_stream(self.metrics.baseline.clone()) } @@ -534,15 +534,15 @@ impl ExternalSorter { }) .collect::>()?; - streaming_merge(StreamingMergeConfig { - streams, - schema: Arc::clone(&self.schema), - expressions: &self.expr, - metrics, - batch_size: self.batch_size, - fetch: self.fetch, - reservation: self.merge_reservation.new_empty(), - }) + StreamingMergeBuilder::new() + .with_streams(streams) + .with_schema(Arc::clone(&self.schema)) + .with_expressions(&self.expr) + .with_metrics(metrics) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .with_reservation(self.merge_reservation.new_empty()) + .build() } /// Sorts a single `RecordBatch` into a single stream. diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 34c19d6b9671..3d3f9dcb98ee 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -24,7 +24,7 @@ use crate::common::spawn_buffered; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; -use crate::sorts::streaming_merge::{streaming_merge, StreamingMergeConfig}; +use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, @@ -273,15 +273,15 @@ impl ExecutionPlan for SortPreservingMergeExec { debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute"); - let result = streaming_merge(StreamingMergeConfig { - streams: receivers, - schema, - expressions: &self.expr, - metrics: BaselineMetrics::new(&self.metrics, partition), - batch_size: context.session_config().batch_size(), - fetch: self.fetch, - reservation, - })?; + let result = StreamingMergeBuilder::new() + .with_streams(receivers) + .with_schema(schema) + .with_expressions(&self.expr) + .with_metrics(BaselineMetrics::new(&self.metrics, partition)) + .with_batch_size(context.session_config().batch_size()) + .with_fetch(self.fetch) + .with_reservation(reservation) + .build()?; debug!("Got stream result from SortPreservingMergeStream::new_from_receivers"); @@ -960,16 +960,15 @@ mod tests { MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool); let fetch = None; - let merge_stream = streaming_merge(StreamingMergeConfig { - streams, - schema: batches.schema(), - expressions: sort.as_slice(), - metrics: BaselineMetrics::new(&metrics, 0), - batch_size: task_ctx.session_config().batch_size(), - fetch, - reservation, - }) - .unwrap(); + let merge_stream = StreamingMergeBuilder::new() + .with_streams(streams) + .with_schema(batches.schema()) + .with_expressions(sort.as_slice()) + .with_metrics(BaselineMetrics::new(&metrics, 0)) + .with_batch_size(task_ctx.session_config().batch_size()) + .with_fetch(fetch) + .with_reservation(reservation) + .build()?; let mut merged = common::collect(merge_stream).await.unwrap(); diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 937de82be533..ad640d8e8470 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -36,67 +36,133 @@ macro_rules! primitive_merge_helper { } macro_rules! merge_helper { - ($t:ty, $sort:ident, $config:ident) => {{ - let streams = FieldCursorStream::<$t>::new($sort, $config.streams); + ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{ + let streams = FieldCursorStream::<$t>::new($sort, $streams); return Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), - $config.schema, - $config.metrics, - $config.batch_size, - $config.fetch, - $config.reservation, + $schema, + $tracking_metrics, + $batch_size, + $fetch, + $reservation, ))); }}; } -/// Configuration parameters to initialize a `SortPreservingMergeStream` -pub struct StreamingMergeConfig<'a> { - pub streams: Vec, - pub schema: SchemaRef, - pub expressions: &'a [PhysicalSortExpr], - pub metrics: BaselineMetrics, - pub batch_size: usize, - pub fetch: Option, - pub reservation: MemoryReservation, +#[derive(Default)] +pub struct StreamingMergeBuilder<'a> { + streams: Vec, + schema: Option, + expressions: &'a [PhysicalSortExpr], + metrics: Option, + batch_size: Option, + fetch: Option, + reservation: Option, } -/// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions -/// while preserving order. -pub fn streaming_merge( - config: StreamingMergeConfig, -) -> Result { - // If there are no sort expressions, preserving the order - // doesn't mean anything (and result in infinite loops) - if config.expressions.is_empty() { - return internal_err!("Sort expressions cannot be empty for streaming merge"); +impl<'a> StreamingMergeBuilder<'a> { + pub fn new() -> Self { + Self::default() } - // Special case single column comparisons with optimized cursor implementations - if config.expressions.len() == 1 { - let sort = config.expressions[0].clone(); - let data_type = sort.expr.data_type(config.schema.as_ref())?; - downcast_primitive! { - data_type => (primitive_merge_helper, sort, config), - DataType::Utf8 => merge_helper!(StringArray, sort, config) - DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, config) - DataType::Binary => merge_helper!(BinaryArray, sort, config) - DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, config) - _ => {} - } + + pub fn with_streams(mut self, streams: Vec) -> Self { + self.streams = streams; + self + } + + pub fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } + + pub fn with_expressions(mut self, expressions: &'a [PhysicalSortExpr]) -> Self { + self.expressions = expressions; + self + } + + pub fn with_metrics(mut self, metrics: BaselineMetrics) -> Self { + self.metrics = Some(metrics); + self + } + + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = Some(batch_size); + self + } + + pub fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self } - let streams = RowCursorStream::try_new( - config.schema.as_ref(), - config.expressions, - config.streams, - config.reservation.new_empty(), - )?; - - Ok(Box::pin(SortPreservingMergeStream::new( - Box::new(streams), - config.schema, - config.metrics, - config.batch_size, - config.fetch, - config.reservation, - ))) + pub fn with_reservation(mut self, reservation: MemoryReservation) -> Self { + self.reservation = Some(reservation); + self + } + + pub fn build(self) -> Result { + let Self { + streams, + schema, + metrics, + batch_size, + reservation, + fetch, + expressions, + } = self; + + // Early return if streams or expressions are empty + let checks = [ + ( + streams.is_empty(), + "Streams cannot be empty for streaming merge", + ), + ( + expressions.is_empty(), + "Sort expressions cannot be empty for streaming merge", + ), + ]; + + if let Some((_, error_message)) = checks.iter().find(|(condition, _)| *condition) + { + return internal_err!("{}", error_message); + } + + // Unwrapping mandatory fields + let schema = schema.expect("Schema cannot be empty for streaming merge"); + let metrics = metrics.expect("Metrics cannot be empty for streaming merge"); + let batch_size = + batch_size.expect("Batch size cannot be empty for streaming merge"); + let reservation = + reservation.expect("Reservation cannot be empty for streaming merge"); + + // Special case single column comparisons with optimized cursor implementations + if expressions.len() == 1 { + let sort = expressions[0].clone(); + let data_type = sort.expr.data_type(schema.as_ref())?; + downcast_primitive! { + data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation), + DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + _ => {} + } + } + + let streams = RowCursorStream::try_new( + schema.as_ref(), + expressions, + streams, + reservation.new_empty(), + )?; + Ok(Box::pin(SortPreservingMergeStream::new( + Box::new(streams), + schema, + metrics, + batch_size, + fetch, + reservation, + ))) + } }