Skip to content

Commit 3593d1f

Browse files
authored
Add support for multiple partitions with SortExec (#362) (#378)
* Add support for multiple partitions with SortExec * make SortExec partitioning optional
1 parent d9b0447 commit 3593d1f

File tree

1 file changed

+39
-16
lines changed
  • datafusion/src/physical_plan

1 file changed

+39
-16
lines changed

datafusion/src/physical_plan/sort.rs

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ pub struct SortExec {
5555
output_rows: Arc<SQLMetric>,
5656
/// Time to sort batches
5757
sort_time_nanos: Arc<SQLMetric>,
58+
/// Preserve partitions of input plan
59+
preserve_partitioning: bool,
5860
}
5961

6062
impl SortExec {
@@ -63,12 +65,23 @@ impl SortExec {
6365
expr: Vec<PhysicalSortExpr>,
6466
input: Arc<dyn ExecutionPlan>,
6567
) -> Result<Self> {
66-
Ok(Self {
68+
Ok(Self::new_with_partitioning(expr, input, false))
69+
}
70+
71+
/// Create a new sort execution plan with the option to preserve
72+
/// the partitioning of the input plan
73+
pub fn new_with_partitioning(
74+
expr: Vec<PhysicalSortExpr>,
75+
input: Arc<dyn ExecutionPlan>,
76+
preserve_partitioning: bool,
77+
) -> Self {
78+
Self {
6779
expr,
6880
input,
81+
preserve_partitioning,
6982
output_rows: SQLMetric::counter(),
7083
sort_time_nanos: SQLMetric::time_nanos(),
71-
})
84+
}
7285
}
7386

7487
/// Input schema
@@ -99,11 +112,19 @@ impl ExecutionPlan for SortExec {
99112

100113
/// Get the output partitioning of this plan
101114
fn output_partitioning(&self) -> Partitioning {
102-
Partitioning::UnknownPartitioning(1)
115+
if self.preserve_partitioning {
116+
self.input.output_partitioning()
117+
} else {
118+
Partitioning::UnknownPartitioning(1)
119+
}
103120
}
104121

105122
fn required_child_distribution(&self) -> Distribution {
106-
Distribution::SinglePartition
123+
if self.preserve_partitioning {
124+
Distribution::UnspecifiedDistribution
125+
} else {
126+
Distribution::SinglePartition
127+
}
107128
}
108129

109130
fn with_new_children(
@@ -122,21 +143,23 @@ impl ExecutionPlan for SortExec {
122143
}
123144

124145
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
125-
if 0 != partition {
126-
return Err(DataFusionError::Internal(format!(
127-
"SortExec invalid partition {}",
128-
partition
129-
)));
130-
}
146+
if !self.preserve_partitioning {
147+
if 0 != partition {
148+
return Err(DataFusionError::Internal(format!(
149+
"SortExec invalid partition {}",
150+
partition
151+
)));
152+
}
131153

132-
// sort needs to operate on a single partition currently
133-
if 1 != self.input.output_partitioning().partition_count() {
134-
return Err(DataFusionError::Internal(
135-
"SortExec requires a single input partition".to_owned(),
136-
));
154+
// sort needs to operate on a single partition currently
155+
if 1 != self.input.output_partitioning().partition_count() {
156+
return Err(DataFusionError::Internal(
157+
"SortExec requires a single input partition".to_owned(),
158+
));
159+
}
137160
}
138161

139-
let input = self.input.execute(0).await?;
162+
let input = self.input.execute(partition).await?;
140163

141164
Ok(Box::pin(SortStream::new(
142165
input,

0 commit comments

Comments
 (0)