Skip to content

Commit 6a59b42

Browse files
committed
address comments.
1 parent fc96d84 commit 6a59b42

File tree

2 files changed

+6
-10
lines changed

2 files changed

+6
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -320,12 +320,15 @@ object ColumnPruning extends Rule[LogicalPlan] {
320320
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
321321

322322
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
323-
// Prunes the unused columns from project list of Project/Aggregate/Expand
323+
// Prunes the unused columns from project list of Project/Aggregate/Window/Expand
324324
case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
325325
p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains)))
326326
case p @ Project(_, a: Aggregate) if (a.outputSet -- p.references).nonEmpty =>
327327
p.copy(
328328
child = a.copy(aggregateExpressions = a.aggregateExpressions.filter(p.references.contains)))
329+
case p @ Project(_, w: Window) if (w.windowOutputSet -- p.references).nonEmpty =>
330+
p.copy(child = w.copy(
331+
windowExpressions = w.windowExpressions.filter(p.references.contains)))
329332
case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty =>
330333
val newOutput = e.output.filter(a.references.contains(_))
331334
val newProjects = e.projections.map { proj =>
@@ -384,15 +387,6 @@ object ColumnPruning extends Rule[LogicalPlan] {
384387
// Can't prune the columns on LeafNode
385388
case p @ Project(_, l: LeafNode) => p
386389

387-
// Prune windowExpressions and child of Window
388-
case p @ Project(_, w: Window) if (w.outputSet -- p.references).nonEmpty =>
389-
val newWindowExprs = w.windowExpressions.filter(p.references.contains)
390-
val newGrandChild =
391-
prunedChild(w.child, p.references ++ AttributeSet(newWindowExprs.flatMap(_.references)))
392-
p.copy(child = w.copy(
393-
windowExpressions = newWindowExprs,
394-
child = newGrandChild))
395-
396390
// for all other logical plans that inherits the output from it's children
397391
case p @ Project(_, child) =>
398392
val required = child.references ++ p.references

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,8 @@ case class Window(
441441

442442
override def output: Seq[Attribute] =
443443
child.output ++ windowExpressions.map(_.toAttribute)
444+
445+
def windowOutputSet: AttributeSet = AttributeSet(windowExpressions.map(_.toAttribute))
444446
}
445447

446448
private[sql] object Expand {

0 commit comments

Comments
 (0)