diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 3cc032277405..29f2f5a1490c 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2739,6 +2739,7 @@ pub struct Repartition { /// Union multiple inputs #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Union { + // temp make private to help find usages /// Inputs to merge pub inputs: Vec>, /// Union schema. Should be the same for all inputs. @@ -2747,9 +2748,12 @@ pub struct Union { impl Union { /// Constructs new Union instance deriving schema from inputs. + /// + /// Note this will flatten inputs, so that if any input is itself a Union, + /// its inputs will be added to this Union's inputs. fn try_new(inputs: Vec>) -> Result { let schema = Self::derive_schema_from_inputs(&inputs, false, false)?; - Ok(Union { inputs, schema }) + Self::new_or_append(inputs, schema) } /// Constructs new Union instance deriving schema from inputs. @@ -2758,7 +2762,7 @@ impl Union { // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all. pub fn try_new_with_loose_types(inputs: Vec>) -> Result { let schema = Self::derive_schema_from_inputs(&inputs, true, false)?; - Ok(Union { inputs, schema }) + Self::new_or_append(inputs, schema) } /// Constructs a new Union instance that combines rows from different tables by name, @@ -2767,10 +2771,59 @@ impl Union { pub fn try_new_by_name(inputs: Vec>) -> Result { let schema = Self::derive_schema_from_inputs(&inputs, true, true)?; let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?; + Self::new_or_append(inputs, schema) + } + + /// Return a new Union instance, or append to an existing one if the first input is a Union + /// with a matching schema. + /// + /// Specifically it handles + /// * Union(Union(A, B), C) -> Union(A, B, C) + /// * Union(A, Union(B, C)) -> Union(A, B, C) + fn new_or_append(mut inputs: Vec>, schema: DFSchemaRef) -> Result { + if !inputs.is_empty() && Self::is_union_with_matching_schema(&inputs[0], &schema) { + let union = inputs.remove(0); + Self::append_to_union_front(union, inputs) + } else if !inputs.is_empty() && Self::is_union_with_matching_schema(&inputs[inputs.len() - 1], &schema) { + let union = inputs.pop().unwrap(); + Self::append_to_union_back(union, inputs) + } else { + Ok(Union { + inputs, + schema, + }) + } + } + + fn is_union_with_matching_schema(plan: &Arc, schema: &DFSchema) -> bool { + matches!(plan.as_ref(), LogicalPlan::Union(_) if plan.schema().as_ref() == schema) + } + + // Unwraps a logical plan as a union and adds the input to it + fn append_to_union_front(union: Arc, mut new_inputs: Vec>) -> Result { + let LogicalPlan::Union(Union { schema, mut inputs }) = + Arc::unwrap_or_clone(union) + else { + return internal_err!("Expected a Union plan"); + }; + inputs.append(&mut new_inputs); Ok(Union { inputs, schema }) } + // Unwraps a logical plan as a union and adds the input to it + fn append_to_union_back(union: Arc, mut new_inputs: Vec>) -> Result { + let LogicalPlan::Union(Union { schema, mut inputs }) = + Arc::unwrap_or_clone(union) + else { + return internal_err!("Expected a Union plan"); + }; + new_inputs.append(&mut inputs); + + Ok(Union { inputs: new_inputs, schema }) + } + + /// When constructing a `UNION BY NAME`, we need to wrap inputs /// in an additional `Projection` to account for absence of columns /// in input schemas or differing projection orders.