@@ -56,7 +56,7 @@ use datafusion_common::config::ConfigOptions;
5656use datafusion_common:: plan_err;
5757use datafusion_common:: tree_node:: { Transformed , TransformedResult , TreeNode } ;
5858use datafusion_common:: Result ;
59- use datafusion_physical_expr:: { Distribution , Partitioning } ;
59+ use datafusion_physical_expr:: Distribution ;
6060use datafusion_physical_expr_common:: sort_expr:: { LexOrdering , LexRequirement } ;
6161use datafusion_physical_plan:: coalesce_partitions:: CoalescePartitionsExec ;
6262use datafusion_physical_plan:: limit:: { GlobalLimitExec , LocalLimitExec } ;
@@ -126,29 +126,67 @@ fn update_sort_ctx_children(
126126/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data
127127/// attribute stores whether the plan is a `CoalescePartitionsExec` or is
128128/// connected to a `CoalescePartitionsExec` via its children.
129+ ///
130+ /// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce).
131+ ///
132+ /// This requires a bottom-up traversal was previously performed, updating the
133+ /// children previously.
129134pub type PlanWithCorrespondingCoalescePartitions = PlanContext < bool > ;
130135
136+ /// Determines if the coalesce may be safely removed.
137+ fn is_coalesce_to_remove (
138+ node : & Arc < dyn ExecutionPlan > ,
139+ parent : & Arc < dyn ExecutionPlan > ,
140+ ) -> bool {
141+ node. as_any ( )
142+ . downcast_ref :: < CoalescePartitionsExec > ( )
143+ . map ( |_coalesce| {
144+ // TODO(wiedld): find a more generalized approach that does not rely on
145+ // pattern matching the structure of the DAG
146+ // Note that the `Partitioning::satisfy()` (parent vs. coalesce.child) cannot be used for cases of:
147+ // * Repartition -> Coalesce -> Repartition
148+
149+ let parent_req_single_partition = matches ! (
150+ parent. required_input_distribution( ) [ 0 ] ,
151+ Distribution :: SinglePartition
152+ ) ;
153+
154+ // node above does not require single distribution
155+ !parent_req_single_partition
156+ // it doesn't immediately repartition
157+ || is_repartition ( parent)
158+ // any adjacent Coalesce->Sort can be replaced
159+ || is_sort ( parent)
160+ } )
161+ . unwrap_or ( false )
162+ }
163+
131164fn update_coalesce_ctx_children (
132165 coalesce_context : & mut PlanWithCorrespondingCoalescePartitions ,
133166) {
134- let children = & coalesce_context. children ;
135- coalesce_context. data = if children. is_empty ( ) {
136- // Plan has no children, it cannot be a `CoalescePartitionsExec`.
137- false
138- } else if is_coalesce_partitions ( & coalesce_context. plan ) {
139- // Initiate a connection:
140- true
141- } else {
142- children. iter ( ) . enumerate ( ) . any ( |( idx, node) | {
143- // Only consider operators that don't require a single partition,
144- // and connected to some `CoalescePartitionsExec`:
145- node. data
146- && !matches ! (
147- coalesce_context. plan. required_input_distribution( ) [ idx] ,
148- Distribution :: SinglePartition
149- )
150- } )
151- } ;
167+ // perform lookahead(1) during bottom up traversal
168+ // since we are checking distribution requirements after the coalesce occurs
169+ let parent = & coalesce_context. plan ;
170+
171+ for child_context in coalesce_context. children . iter_mut ( ) {
172+ // determine if child, or it's descendents, are a coalesce to be removed
173+ child_context. data = if child_context. children . is_empty ( ) {
174+ // Plan has no children, it cannot be a `CoalescePartitionsExec`.
175+ false
176+ } else if is_coalesce_to_remove ( & child_context. plan , parent) {
177+ // Initiate a connection:
178+ true
179+ } else if is_sort ( & child_context. plan ) {
180+ // halt coalesce removals at the sort
181+ false
182+ } else {
183+ // propogate
184+ child_context
185+ . children
186+ . iter ( )
187+ . any ( |grandchild| grandchild. data )
188+ } ;
189+ }
152190}
153191
154192/// The boolean flag `repartition_sorts` defined in the config indicates
@@ -246,32 +284,50 @@ fn replace_with_partial_sort(
246284/// This function turns plans of the form
247285/// ```text
248286/// "SortExec: expr=\[a@0 ASC\]",
249- /// " CoalescePartitionsExec",
250- /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
287+ /// " ...nodes..."
288+ /// " CoalescePartitionsExec",
289+ /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
251290/// ```
252291/// to
253292/// ```text
254293/// "SortPreservingMergeExec: \[a@0 ASC\]",
255294/// " SortExec: expr=\[a@0 ASC\]",
256- /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
295+ /// " ...nodes..."
296+ /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
257297/// ```
258298/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
259299/// By performing sorting in parallel, we can increase performance in some scenarios.
300+ ///
301+ /// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
302+ /// which require single partitioning. Do not parallelize when the following scenario occurs:
303+ /// ```text
304+ /// "SortExec: expr=\[a@0 ASC\]",
305+ /// " ...nodes requiring single partitioning..."
306+ /// " CoalescePartitionsExec",
307+ /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
308+ /// ```
260309pub fn parallelize_sorts (
261310 mut requirements : PlanWithCorrespondingCoalescePartitions ,
262311) -> Result < Transformed < PlanWithCorrespondingCoalescePartitions > > {
312+ requirements = requirements. update_plan_from_children ( ) ?;
263313 update_coalesce_ctx_children ( & mut requirements) ;
314+ let coalesce_can_be_removed = requirements. children . iter ( ) . any ( |child| child. data ) ;
315+
316+ let should_parallelize_sort = ( is_sort ( & requirements. plan )
317+ || is_sort_preserving_merge ( & requirements. plan ) )
318+ && requirements. plan . output_partitioning ( ) . partition_count ( ) <= 1
319+ && coalesce_can_be_removed;
320+
321+ // Repartition -> Coalesce -> Repartition
322+ let unneeded_coalesce = is_repartition ( & requirements. plan ) && coalesce_can_be_removed;
264323
265324 if requirements. children . is_empty ( ) || !requirements. children [ 0 ] . data {
266325 // We only take an action when the plan is either a `SortExec`, a
267326 // `SortPreservingMergeExec` or a `CoalescePartitionsExec`, and they
268327 // all have a single child. Therefore, if the first child has no
269328 // connection, we can return immediately.
270329 Ok ( Transformed :: no ( requirements) )
271- } else if ( is_sort ( & requirements. plan )
272- || is_sort_preserving_merge ( & requirements. plan ) )
273- && requirements. plan . output_partitioning ( ) . partition_count ( ) <= 1
274- {
330+ } else if should_parallelize_sort {
275331 // Take the initial sort expressions and requirements
276332 let ( sort_exprs, fetch) = get_sort_exprs ( & requirements. plan ) ?;
277333 let sort_reqs = LexRequirement :: from ( sort_exprs. clone ( ) ) ;
@@ -286,8 +342,11 @@ pub fn parallelize_sorts(
286342 // We also need to remove the self node since `remove_corresponding_coalesce_in_sub_plan`
287343 // deals with the children and their children and so on.
288344 requirements = requirements. children . swap_remove ( 0 ) ;
345+ // sync the requirements.plan.children with the mutated requirements.children
346+ requirements = requirements. update_plan_from_children ( ) ?;
289347
290348 requirements = add_sort_above_with_check ( requirements, sort_reqs, fetch) ;
349+ requirements = requirements. update_plan_from_children ( ) ?;
291350
292351 let spm =
293352 SortPreservingMergeExec :: new ( sort_exprs, Arc :: clone ( & requirements. plan ) ) ;
@@ -298,20 +357,11 @@ pub fn parallelize_sorts(
298357 vec ! [ requirements] ,
299358 ) ,
300359 ) )
301- } else if is_coalesce_partitions ( & requirements. plan ) {
302- // There is an unnecessary `CoalescePartitionsExec` in the plan.
303- // This will handle the recursive `CoalescePartitionsExec` plans.
360+ } else if unneeded_coalesce {
304361 requirements = remove_bottleneck_in_subplan ( requirements) ?;
305- // For the removal of self node which is also a `CoalescePartitionsExec`.
306- requirements = requirements. children . swap_remove ( 0 ) ;
362+ requirements = requirements. update_plan_from_children ( ) ?;
307363
308- Ok ( Transformed :: yes (
309- PlanWithCorrespondingCoalescePartitions :: new (
310- Arc :: new ( CoalescePartitionsExec :: new ( Arc :: clone ( & requirements. plan ) ) ) ,
311- false ,
312- vec ! [ requirements] ,
313- ) ,
314- ) )
364+ Ok ( Transformed :: yes ( requirements) )
315365 } else {
316366 Ok ( Transformed :: yes ( requirements) )
317367 }
@@ -546,19 +596,7 @@ fn remove_bottleneck_in_subplan(
546596 } )
547597 . collect :: < Result < _ > > ( ) ?;
548598 }
549- let mut new_reqs = requirements. update_plan_from_children ( ) ?;
550- if let Some ( repartition) = new_reqs. plan . as_any ( ) . downcast_ref :: < RepartitionExec > ( ) {
551- let input_partitioning = repartition. input ( ) . output_partitioning ( ) ;
552- // We can remove this repartitioning operator if it is now a no-op:
553- let mut can_remove = input_partitioning. eq ( repartition. partitioning ( ) ) ;
554- // We can also remove it if we ended up with an ineffective RR:
555- if let Partitioning :: RoundRobinBatch ( n_out) = repartition. partitioning ( ) {
556- can_remove |= * n_out == input_partitioning. partition_count ( ) ;
557- }
558- if can_remove {
559- new_reqs = new_reqs. children . swap_remove ( 0 )
560- }
561- }
599+ let new_reqs = requirements. update_plan_from_children ( ) ?;
562600 Ok ( new_reqs)
563601}
564602
0 commit comments