Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2739,6 +2739,7 @@ pub struct Repartition {
/// Union multiple inputs
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Union {
// temp make private to help find usages
/// Inputs to merge
pub inputs: Vec<Arc<LogicalPlan>>,
/// Union schema. Should be the same for all inputs.
Expand All @@ -2747,9 +2748,12 @@ pub struct Union {

impl Union {
/// Constructs new Union instance deriving schema from inputs.
///
/// Note this will flatten inputs, so that if any input is itself a Union,
/// its inputs will be added to this Union's inputs.
fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
Ok(Union { inputs, schema })
Self::new_or_append(inputs, schema)
}

/// Constructs new Union instance deriving schema from inputs.
Expand All @@ -2758,7 +2762,7 @@ impl Union {
// TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
Ok(Union { inputs, schema })
Self::new_or_append(inputs, schema)
}

/// Constructs a new Union instance that combines rows from different tables by name,
Expand All @@ -2767,10 +2771,59 @@ impl Union {
pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
Self::new_or_append(inputs, schema)
}

/// Return a new Union instance, or append to an existing one if the first input is a Union
/// with a matching schema.
///
/// Specifically it handles
/// * Union(Union(A, B), C) -> Union(A, B, C)
/// * Union(A, Union(B, C)) -> Union(A, B, C)
fn new_or_append(mut inputs: Vec<Arc<LogicalPlan>>, schema: DFSchemaRef) -> Result<Self> {
if !inputs.is_empty() && Self::is_union_with_matching_schema(&inputs[0], &schema) {
let union = inputs.remove(0);
Self::append_to_union_front(union, inputs)
} else if !inputs.is_empty() && Self::is_union_with_matching_schema(&inputs[inputs.len() - 1], &schema) {
let union = inputs.pop().unwrap();
Self::append_to_union_back(union, inputs)
} else {
Ok(Union {
inputs,
schema,
})
}
}

fn is_union_with_matching_schema(plan: &Arc<LogicalPlan>, schema: &DFSchema) -> bool {
matches!(plan.as_ref(), LogicalPlan::Union(_) if plan.schema().as_ref() == schema)
}

// Unwraps a logical plan as a union and adds the input to it
fn append_to_union_front(union: Arc<LogicalPlan>, mut new_inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let LogicalPlan::Union(Union { schema, mut inputs }) =
Arc::unwrap_or_clone(union)
else {
return internal_err!("Expected a Union plan");
};
inputs.append(&mut new_inputs);

Ok(Union { inputs, schema })
}

// Unwraps a logical plan as a union and adds the input to it
fn append_to_union_back(union: Arc<LogicalPlan>, mut new_inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let LogicalPlan::Union(Union { schema, mut inputs }) =
Arc::unwrap_or_clone(union)
else {
return internal_err!("Expected a Union plan");
};
new_inputs.append(&mut inputs);

Ok(Union { inputs: new_inputs, schema })
}


/// When constructing a `UNION BY NAME`, we need to wrap inputs
/// in an additional `Projection` to account for absence of columns
/// in input schemas or differing projection orders.
Expand Down
Loading