Skip to content

Commit 7d819d1

Browse files
authored
Consolidate sort and external_sort (#1596)
* Change SPMS to use heap sort, use SPMS instead of in-mem-sort as well * Incorporate metrics, external_sort pass all sort tests * Remove the original sort, substitute with external sort * Fix different batch_size setting in SPMS test * Change to use combine and sort for in memory N-way merge * Resolve comments on async and doc * Update sort to avoid deadlock during spilling * Fix spill hanging
1 parent 3c5a679 commit 7d819d1

File tree

10 files changed

+641
-1168
lines changed

10 files changed

+641
-1168
lines changed

datafusion/src/execution/context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,6 +1211,7 @@ impl FunctionRegistry for ExecutionContextState {
12111211
#[cfg(test)]
12121212
mod tests {
12131213
use super::*;
1214+
use crate::execution::context::QueryPlanner;
12141215
use crate::from_slice::FromSlice;
12151216
use crate::logical_plan::plan::Projection;
12161217
use crate::logical_plan::TableScan;

datafusion/src/physical_plan/common.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use super::{RecordBatchStream, SendableRecordBatchStream};
2121
use crate::error::{DataFusionError, Result};
2222
use crate::execution::runtime_env::RuntimeEnv;
23+
use crate::physical_plan::metrics::BaselineMetrics;
2324
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
2425
use arrow::compute::concat;
2526
use arrow::datatypes::{Schema, SchemaRef};
@@ -41,15 +42,21 @@ pub struct SizedRecordBatchStream {
4142
schema: SchemaRef,
4243
batches: Vec<Arc<RecordBatch>>,
4344
index: usize,
45+
baseline_metrics: BaselineMetrics,
4446
}
4547

4648
impl SizedRecordBatchStream {
4749
/// Create a new RecordBatchIterator
48-
pub fn new(schema: SchemaRef, batches: Vec<Arc<RecordBatch>>) -> Self {
50+
pub fn new(
51+
schema: SchemaRef,
52+
batches: Vec<Arc<RecordBatch>>,
53+
baseline_metrics: BaselineMetrics,
54+
) -> Self {
4955
SizedRecordBatchStream {
5056
schema,
5157
index: 0,
5258
batches,
59+
baseline_metrics,
5360
}
5461
}
5562
}
@@ -61,12 +68,13 @@ impl Stream for SizedRecordBatchStream {
6168
mut self: std::pin::Pin<&mut Self>,
6269
_: &mut Context<'_>,
6370
) -> Poll<Option<Self::Item>> {
64-
Poll::Ready(if self.index < self.batches.len() {
71+
let poll = Poll::Ready(if self.index < self.batches.len() {
6572
self.index += 1;
6673
Some(Ok(self.batches[self.index - 1].as_ref().clone()))
6774
} else {
6875
None
69-
})
76+
});
77+
self.baseline_metrics.record_poll(poll)
7078
}
7179
}
7280

datafusion/src/physical_plan/explain.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc
3232

3333
use super::SendableRecordBatchStream;
3434
use crate::execution::runtime_env::RuntimeEnv;
35+
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
3536
use async_trait::async_trait;
3637

3738
/// Explain execution plan operator. This operator contains the string
@@ -146,9 +147,13 @@ impl ExecutionPlan for ExplainExec {
146147
],
147148
)?;
148149

150+
let metrics = ExecutionPlanMetricsSet::new();
151+
let baseline_metrics = BaselineMetrics::new(&metrics, partition);
152+
149153
Ok(Box::pin(SizedRecordBatchStream::new(
150154
self.schema.clone(),
151155
vec![Arc::new(record_batch)],
156+
baseline_metrics,
152157
)))
153158
}
154159

0 commit comments

Comments
 (0)