Skip to content

Commit a7e97dd

Browse files
committed
remove unintended changes
1 parent 362c5ea commit a7e97dd

File tree

3 files changed

+24
-43
lines changed

3 files changed

+24
-43
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ case class HashAggregateExec(
6969
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"),
7070
"avgHashProbe" ->
7171
SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"),
72-
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of sort fallback tasks"))
72+
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext,
73+
"number of sort fallback tasks"))
7374
if (skipPartialAggregateEnabled) {
7475
metrics ++ Map("partialAggSkipped" -> SQLMetrics.createMetric(sparkContext,
7576
"number of skipped records for partial aggregates"))
@@ -78,23 +79,6 @@ case class HashAggregateExec(
7879
}
7980
}
8081

81-
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
82-
83-
override protected def outputExpressions: Seq[NamedExpression] = resultExpressions
84-
85-
override def producedAttributes: AttributeSet =
86-
AttributeSet(aggregateAttributes) ++
87-
AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++
88-
AttributeSet(aggregateBufferAttributes)
89-
90-
override def requiredChildDistribution: List[Distribution] = {
91-
requiredChildDistributionExpressions match {
92-
case Some(exprs) if exprs.isEmpty => AllTuples :: Nil
93-
case Some(exprs) if exprs.nonEmpty => ClusteredDistribution(exprs) :: Nil
94-
case None => UnspecifiedDistribution :: Nil
95-
}
96-
}
97-
9882
// This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash
9983
// map and/or the sort-based aggregation once it has processed a given number of input rows.
10084
private val testFallbackStartsAt: Option[(Int, Int)] = {
@@ -520,8 +504,6 @@ case class HashAggregateExec(
520504

521505
// merge the final hashMap into sorter
522506
numTasksFallBacked += 1
523-
sorter.merge(hashMap.destructAndCreateExternalSorter())
524-
hashMap.free()
525507
if (!skipPartialAggTerm) {
526508
sorter.merge(hashMap.destructAndCreateExternalSorter())
527509
hashMap.free()
@@ -724,8 +706,6 @@ case class HashAggregateExec(
724706

725707
private def doProduceWithKeys(ctx: CodegenContext): String = {
726708
val initAgg = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initAgg")
727-
if (conf.enableTwoLevelAggMap) {
728-
729709
var childrenConsumed: String = null
730710
if (skipPartialAggregateEnabled) {
731711
skipPartialAggTerm = ctx.
@@ -833,7 +813,8 @@ case class HashAggregateExec(
833813
}
834814

835815
val finishRegularHashMap = s"$iterTerm = $thisPlan.finishAggregate(" +
836-
s"$hashMapTerm, $sorterTerm, $peakMemory, $spillSize, $avgHashProbe, $numTasksFallBacked, $mapCleared);"
816+
s"$hashMapTerm, $sorterTerm, $peakMemory, $spillSize, $avgHashProbe, $numTasksFallBacked," +
817+
s" $mapCleared);"
837818
val finishHashMap = if (isFastHashMapEnabled) {
838819
s"""
839820
|$iterTermForFastHashMap = $fastHashMapTerm.rowIterator();

sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
2121
import org.apache.spark.sql.catalyst.expressions.aggregate.Partial
2222
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator}
2323
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
24-
import org.apache.spark.sql.execution.aggregate.{AggUtils, HashAggregateExec}
24+
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
2525
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
2626
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
2727
import org.apache.spark.sql.functions._
@@ -279,7 +279,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
279279
val twoJoinsDF = df1.join(df2, $"k1" < $"k2").crossJoin(df3)
280280
hasJoinInCodegen = twoJoinsDF.queryExecution.executedPlan.collect {
281281
case WholeStageCodegenExec(BroadcastNestedLoopJoinExec(
282-
_: BroadcastNestedLoopJoinExec, _, _, _, _)) => true
282+
_: BroadcastNestedLoopJoinExec, _, _, _, _)) => true
283283
}.size === 1
284284
assert(hasJoinInCodegen == codegenEnabled)
285285
checkAnswer(twoJoinsDF,
@@ -318,7 +318,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
318318
.join(df3, $"k1" <= $"k3", "left_outer")
319319
hasJoinInCodegen = twoJoinsDF.queryExecution.executedPlan.collect {
320320
case WholeStageCodegenExec(BroadcastNestedLoopJoinExec(
321-
_: BroadcastNestedLoopJoinExec, _, _, _, _)) => true
321+
_: BroadcastNestedLoopJoinExec, _, _, _, _)) => true
322322
}.size === 1
323323
assert(hasJoinInCodegen == codegenEnabled)
324324
checkAnswer(twoJoinsDF,
@@ -387,7 +387,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
387387
val plan = ds.queryExecution.executedPlan
388388
assert(plan.find(p =>
389389
p.isInstanceOf[WholeStageCodegenExec] &&
390-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined)
390+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined)
391391
assert(ds.collect() === 0.until(10).map(_.toString).toArray)
392392
}
393393

@@ -405,7 +405,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
405405
val plan = ds.queryExecution.executedPlan
406406
assert(plan.find(p =>
407407
p.isInstanceOf[WholeStageCodegenExec] &&
408-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]).isDefined)
408+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]).isDefined)
409409
assert(ds.collect() === Array(0, 6))
410410
}
411411

@@ -418,7 +418,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
418418
val planInt = dsIntFilter.queryExecution.executedPlan
419419
assert(planInt.collect {
420420
case WholeStageCodegenExec(FilterExec(_,
421-
ColumnarToRowExec(InputAdapter(_: InMemoryTableScanExec)))) => ()
421+
ColumnarToRowExec(InputAdapter(_: InMemoryTableScanExec)))) => ()
422422
}.length == 1)
423423
assert(dsIntFilter.collect() === Array(1, 2))
424424

@@ -556,7 +556,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
556556
.write.mode(SaveMode.Overwrite).parquet(path)
557557

558558
withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
559-
SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
559+
SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
560560
val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as newC$i")
561561
val df = spark.read.parquet(path).selectExpr(projection: _*)
562562

@@ -642,18 +642,18 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
642642
.join(baseTable, "idx")
643643
assert(distinctWithId.queryExecution.executedPlan.collectFirst {
644644
case WholeStageCodegenExec(
645-
ProjectExec(_, BroadcastHashJoinExec(_, _, _, _, _, _: HashAggregateExec, _, _))) => true
645+
ProjectExec(_, BroadcastHashJoinExec(_, _, _, _, _, _: HashAggregateExec, _, _))) => true
646646
}.isDefined)
647647
checkAnswer(distinctWithId, Seq(Row(1, 0), Row(1, 0)))
648648

649649
// BroadcastHashJoinExec with a HashAggregateExec child containing a Final mode aggregate
650650
// expression
651651
val groupByWithId =
652-
baseTable.groupBy("idx").sum().withColumn("id", monotonically_increasing_id())
652+
baseTable.groupBy("idx").sum().withColumn("id", monotonically_increasing_id())
653653
.join(baseTable, "idx")
654654
assert(groupByWithId.queryExecution.executedPlan.collectFirst {
655655
case WholeStageCodegenExec(
656-
ProjectExec(_, BroadcastHashJoinExec(_, _, _, _, _, _: HashAggregateExec, _, _))) => true
656+
ProjectExec(_, BroadcastHashJoinExec(_, _, _, _, _, _: HashAggregateExec, _, _))) => true
657657
}.isDefined)
658658
checkAnswer(groupByWithId, Seq(Row(1, 2, 0), Row(1, 2, 0)))
659659
}
@@ -682,17 +682,17 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
682682
assert(
683683
executedPlan.find {
684684
case WholeStageCodegenExec(
685-
HashAggregateExec(_, _, _, _, _, _, _: LocalTableScanExec)) => true
685+
HashAggregateExec(_, _, _, _, _, _, _: LocalTableScanExec)) => true
686686
case _ => false
687687
}.isDefined,
688688
"LocalTableScanExec should be within a WholeStageCodegen domain.")
689689
}
690690

691691
test("Give up splitting aggregate code if a parameter length goes over the limit") {
692692
withSQLConf(
693-
SQLConf.CODEGEN_SPLIT_AGGREGATE_FUNC.key -> "true",
694-
SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> "1",
695-
"spark.sql.CodeGenerator.validParamLength" -> "0") {
693+
SQLConf.CODEGEN_SPLIT_AGGREGATE_FUNC.key -> "true",
694+
SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> "1",
695+
"spark.sql.CodeGenerator.validParamLength" -> "0") {
696696
withTable("t") {
697697
val expectedErrMsg = "Failed to split aggregate code into small functions"
698698
Seq(
@@ -711,9 +711,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
711711

712712
test("Give up splitting subexpression code if a parameter length goes over the limit") {
713713
withSQLConf(
714-
SQLConf.CODEGEN_SPLIT_AGGREGATE_FUNC.key -> "false",
715-
SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> "1",
716-
"spark.sql.CodeGenerator.validParamLength" -> "0") {
714+
SQLConf.CODEGEN_SPLIT_AGGREGATE_FUNC.key -> "false",
715+
SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> "1",
716+
"spark.sql.CodeGenerator.validParamLength" -> "0") {
717717
withTable("t") {
718718
val expectedErrMsg = "Failed to split subexpression code into small functions"
719719
Seq(

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -657,9 +657,9 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
657657

658658
test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") {
659659
def checkFilterAndRangeMetrics(
660-
df: DataFrame,
661-
filterNumOutputs: Int,
662-
rangeNumOutputs: Int): Unit = {
660+
df: DataFrame,
661+
filterNumOutputs: Int,
662+
rangeNumOutputs: Int): Unit = {
663663
val plan = df.queryExecution.executedPlan
664664

665665
val filters = collectNodeWithinWholeStage[FilterExec](plan)

0 commit comments

Comments
 (0)