diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index bf43452e08c3b..7275e69353485 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -173,7 +173,8 @@ def explain(self, extended=False): >>> df.explain() == Physical Plan == - Scan ExistingRDD[age#0,name#1] + WholeStageCodegen + : +- Scan ExistingRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan == diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index cad7e25a32788..8649d2d69b62e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SQLContext} +import org.apache.spark.sql.{AnalysisException, Row, SQLConf, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation} import org.apache.spark.sql.types.DataType @@ -102,7 +104,7 @@ private[sql] case class PhysicalRDD( override val metadata: Map[String, String] = Map.empty, isUnsafeRow: Boolean = false, override val outputPartitioning: Partitioning = UnknownPartitioning(0)) - extends LeafNode { + extends LeafNode with CodegenSupport { private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) @@ -128,6 +130,36 @@ private[sql] case class PhysicalRDD( val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } + + override def upstreams(): Seq[RDD[InternalRow]] = { + rdd :: Nil + } + + // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen + // never requires UnsafeRow as input. + override protected def doProduce(ctx: CodegenContext): String = { + val input = ctx.freshName("input") + // PhysicalRDD always just has one input + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + + val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) + val row = ctx.freshName("row") + val numOutputRows = metricTerm(ctx, "numOutputRows") + ctx.INPUT_ROW = row + ctx.currentVars = null + val columns = exprs.map(_.gen(ctx)) + s""" + | while ($input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | $numOutputRows.add(1); + | ${columns.map(_.code).mkString("\n").trim} + | ${consume(ctx, columns).trim} + | if (shouldStop()) { + | return; + | } + | } + """.stripMargin + } } private[sql] object PhysicalRDD { @@ -140,8 +172,13 @@ private[sql] object PhysicalRDD { rdd: RDD[InternalRow], relation: BaseRelation, metadata: Map[String, String] = Map.empty): PhysicalRDD = { - // All HadoopFsRelations output UnsafeRows - val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation] + val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) { + // The vectorized parquet reader does not produce unsafe rows. + !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + } else { + // All HadoopFsRelations output UnsafeRows + relation.isInstanceOf[HadoopFsRelation] + } val bucketSpec = relation match { case r: HadoopFsRelation => r.getBucketSpec diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 08c52e5f4399f..afaddcf35775c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -42,6 +42,7 @@ trait CodegenSupport extends SparkPlan { case _: TungstenAggregate => "agg" case _: BroadcastHashJoin => "bhj" case _: SortMergeJoin => "smj" + case _: PhysicalRDD => "rdd" case _ => nodeName.toLowerCase } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7a0f7abaa1baf..f8a9a95c873ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -187,8 +187,10 @@ class JDBCSuite extends SparkFunSuite val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is removed in a physical plan and // the plan only has PhysicalRDD to scan JDBCRelation. - assert(parentPlan.isInstanceOf[PhysicalRDD]) - assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]) + val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen] + assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD]) + assert(node.plan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) df } assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 7196b6dc13394..4b578a6c10ec4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -304,30 +304,38 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic expectedCount: Int, requiredColumnNames: Set[String], expectedUnhandledFilters: Set[Filter]): Unit = { + test(s"PushDown Returns $expectedCount: $sqlString") { - val queryExecution = sql(sqlString).queryExecution - val rawPlan = queryExecution.executedPlan.collect { - case p: execution.PhysicalRDD => p - } match { - case Seq(p) => p - case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") - } - val rawCount = rawPlan.execute().count() - assert(ColumnsRequired.set === requiredColumnNames) - - val table = caseInsensitiveContext.table("oneToTenFiltered") - val relation = table.queryExecution.logical.collectFirst { - case LogicalRelation(r, _, _) => r - }.get - - assert( - relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters) - - if (rawCount != expectedCount) { - fail( - s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" + - s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + - queryExecution) + // These tests check a particular plan, disable whole stage codegen. + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + try { + val queryExecution = sql(sqlString).queryExecution + val rawPlan = queryExecution.executedPlan.collect { + case p: execution.PhysicalRDD => p + } match { + case Seq(p) => p + case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") + } + val rawCount = rawPlan.execute().count() + assert(ColumnsRequired.set === requiredColumnNames) + + val table = caseInsensitiveContext.table("oneToTenFiltered") + val relation = table.queryExecution.logical.collectFirst { + case LogicalRelation(r, _, _) => r + }.get + + assert( + relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters) + + if (rawCount != expectedCount) { + fail( + s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" + + s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + + queryExecution) + } + } finally { + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index a89c5f8007e78..02e8ab1c51746 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -117,28 +117,35 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext { def testPruning(sqlString: String, expectedColumns: String*): Unit = { test(s"Columns output ${expectedColumns.mkString(",")}: $sqlString") { - val queryExecution = sql(sqlString).queryExecution - val rawPlan = queryExecution.executedPlan.collect { - case p: execution.PhysicalRDD => p - } match { - case Seq(p) => p - case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") - } - val rawColumns = rawPlan.output.map(_.name) - val rawOutput = rawPlan.execute().first() - - if (rawColumns != expectedColumns) { - fail( - s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" + - s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + - queryExecution) - } - if (rawOutput.numFields != expectedColumns.size) { - fail(s"Wrong output row. Got $rawOutput\n$queryExecution") + // These tests check a particular plan, disable whole stage codegen. + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + try { + val queryExecution = sql(sqlString).queryExecution + val rawPlan = queryExecution.executedPlan.collect { + case p: execution.PhysicalRDD => p + } match { + case Seq(p) => p + case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") + } + val rawColumns = rawPlan.output.map(_.name) + val rawOutput = rawPlan.execute().first() + + if (rawColumns != expectedColumns) { + fail( + s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" + + s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + + queryExecution) + } + + if (rawOutput.numFields != expectedColumns.size) { + fail(s"Wrong output row. Got $rawOutput\n$queryExecution") + } + } finally { + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get) } } } - } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 0ef42f45e3b78..62f6b998e6f2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -74,32 +74,34 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet bucketValues: Seq[Integer], filterCondition: Column, originalDataFrame: DataFrame): Unit = { + // This test verifies parts of the plan. Disable whole stage codegen. + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k") + val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec + // Limit: bucket pruning only works when the bucket column has one and only one column + assert(bucketColumnNames.length == 1) + val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head) + val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex) + val matchedBuckets = new BitSet(numBuckets) + bucketValues.foreach { value => + matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value)) + } - val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k") - val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec - // Limit: bucket pruning only works when the bucket column has one and only one column - assert(bucketColumnNames.length == 1) - val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head) - val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex) - val matchedBuckets = new BitSet(numBuckets) - bucketValues.foreach { value => - matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value)) - } + // Filter could hide the bug in bucket pruning. Thus, skipping all the filters + val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan + val rdd = plan.find(_.isInstanceOf[PhysicalRDD]) + assert(rdd.isDefined, plan) - // Filter could hide the bug in bucket pruning. Thus, skipping all the filters - val rdd = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan - .find(_.isInstanceOf[PhysicalRDD]) - assert(rdd.isDefined) + val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => + if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty) + } + // checking if all the pruned buckets are empty + assert(checkedResult.collect().forall(_ == true)) - val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => - if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty) + checkAnswer( + bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), + originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")) } - // checking if all the pruned buckets are empty - assert(checkedResult.collect().forall(_ == true)) - - checkAnswer( - bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), - originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")) } test("read partitioning bucketed tables with bucket pruning filters") {