Skip to content

Commit 149458c

Browse files
cloud-fandongjoon-hyun
authored andcommitted
[SPARK-42049][SQL][FOLLOWUP] Always filter away invalid ordering/partitioning
### What changes were proposed in this pull request? This is a follow-up of #37525 . When the project list has aliases, we go to the `projectExpression` branch which filters away invalid partitioning/ordering that reference non-existing attributes in the current plan node. However, this filtering is missing when the project list has no alias, where we directly return the child partitioning/ordering. This PR fixes it. ### Why are the changes needed? to make sure we always return valid output partitioning/ordering. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #40137 from cloud-fan/alias. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 72922ad) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 8b7a073 commit 149458c

File tree

4 files changed

+46
-13
lines changed

4 files changed

+46
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
9595
}
9696

9797
override final def outputOrdering: Seq[SortOrder] = {
98-
if (hasAlias) {
98+
val newOrdering: Iterator[Option[SortOrder]] = if (hasAlias) {
9999
// Take the first `SortOrder`s only until they can be projected.
100100
// E.g. we have child ordering `Seq(SortOrder(a), SortOrder(b))` then
101101
// if only `a AS x` can be projected then we can return Seq(SortOrder(x))`
@@ -112,9 +112,21 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
112112
} else {
113113
None
114114
}
115-
}.takeWhile(_.isDefined).flatten.toSeq
115+
}
116116
} else {
117-
orderingExpressions
117+
// Make sure the returned ordering are valid (only reference output attributes of the current
118+
// plan node). Same as above (the if branch), we take the first ordering expressions that are
119+
// all valid.
120+
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
121+
orderingExpressions.iterator.map { order =>
122+
val validChildren = order.children.filter(_.references.subsetOf(outputSet))
123+
if (validChildren.nonEmpty) {
124+
Some(order.copy(child = validChildren.head, sameOrderExpressions = validChildren.tail))
125+
} else {
126+
None
127+
}
128+
}
118129
}
130+
newOrdering.takeWhile(_.isDefined).flatten.toSeq
119131
}
120132
}

sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
1818

1919
import scala.collection.mutable
2020

21-
import org.apache.spark.sql.catalyst.expressions.Expression
21+
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression}
2222
import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering}
2323
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning}
2424

@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC
2929
trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
3030
with AliasAwareOutputExpression {
3131
final override def outputPartitioning: Partitioning = {
32-
if (hasAlias) {
32+
val partitionings: Seq[Partitioning] = if (hasAlias) {
3333
flattenPartitioning(child.outputPartitioning).flatMap {
3434
case e: Expression =>
3535
// We need unique partitionings but if the input partitioning is
@@ -44,13 +44,19 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
4444
.take(aliasCandidateLimit)
4545
.asInstanceOf[Stream[Partitioning]]
4646
case o => Seq(o)
47-
} match {
48-
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
49-
case Seq(p) => p
50-
case ps => PartitioningCollection(ps)
5147
}
5248
} else {
53-
child.outputPartitioning
49+
// Filter valid partitiongs (only reference output attributes of the current plan node)
50+
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
51+
flattenPartitioning(child.outputPartitioning).filter {
52+
case e: Expression => e.references.subsetOf(outputSet)
53+
case _ => true
54+
}
55+
}
56+
partitionings match {
57+
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
58+
case Seq(p) => p
59+
case ps => PartitioningCollection(ps)
5460
}
5561
}
5662

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,9 +1129,10 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
11291129
assert(sortNodes.size == 3)
11301130
val outputOrdering = planned.outputOrdering
11311131
assert(outputOrdering.size == 1)
1132-
// Sort order should have 3 childrens, not 4. This is because t1.id*2 and 2*t1.id are same
1133-
assert(outputOrdering.head.children.size == 3)
1134-
assert(outputOrdering.head.children.count(_.isInstanceOf[AttributeReference]) == 2)
1132+
// Sort order should have 2 childrens, not 4. This is because t1.id*2 and 2*t1.id are same
1133+
// and t2.id is not a valid ordering (the final plan doesn't output t2.id)
1134+
assert(outputOrdering.head.children.size == 2)
1135+
assert(outputOrdering.head.children.count(_.isInstanceOf[AttributeReference]) == 1)
11351136
assert(outputOrdering.head.children.count(_.isInstanceOf[Multiply]) == 1)
11361137
}
11371138
}

sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,4 +177,18 @@ class ProjectedOrderingAndPartitioningSuite
177177
val outputOrdering3 = df3.queryExecution.optimizedPlan.outputOrdering
178178
assert(outputOrdering3.size == 0)
179179
}
180+
181+
test("SPARK-42049: Improve AliasAwareOutputExpression - no alias but still prune expressions") {
182+
val df = spark.range(2).select($"id" + 1 as "a", $"id" + 2 as "b")
183+
184+
val df1 = df.repartition($"a", $"b").selectExpr("a")
185+
val outputPartitioning = stripAQEPlan(df1.queryExecution.executedPlan).outputPartitioning
186+
assert(outputPartitioning.isInstanceOf[UnknownPartitioning])
187+
188+
val df2 = df.orderBy("a", "b").select("a")
189+
val outputOrdering = df2.queryExecution.optimizedPlan.outputOrdering
190+
assert(outputOrdering.size == 1)
191+
assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a")
192+
assert(outputOrdering.head.sameOrderExpressions.size == 0)
193+
}
180194
}

0 commit comments

Comments
 (0)