From 28bf2bcdb21adb8f59f2316b8ac969a56058a8da Mon Sep 17 00:00:00 2001 From: Nong Li Date: Wed, 10 Feb 2016 15:49:59 -0800 Subject: [PATCH 01/10] [SPARK-13250][SQL] Update PhysicallRDD to convert to UnsafeRow if using the vectorized scanner. Some parts of the engine rely on UnsafeRow which the vectorized parquet scanner does not want to produce. This add a conversion in Physical RDD. In the case where codegen is used (and the scan is the start of the pipeline), there is no requirement to use UnsafeRow. This patch adds update PhysicallRDD to support codegen, which eliminates the need for the UnsafeRow conversion in all cases. The result of these changes for TPCDS-Q19 at the 10gb sf reduces the query time from 9.5 seconds to 6.5 seconds. --- .../spark/sql/execution/ExistingRDD.scala | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index cad7e25a32788..8bb82fc5ffa25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.{SQLConf, AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ @@ -102,13 +103,15 @@ private[sql] case class PhysicalRDD( override val metadata: Map[String, String] = Map.empty, isUnsafeRow: Boolean = false, override val outputPartitioning: Partitioning = UnknownPartitioning(0)) - extends LeafNode { + extends LeafNode with CodegenSupport { private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { - val unsafeRow = if (isUnsafeRow) { + val conf = SQLContext.getActive().get + // The vectorized reader does not produce UnsafeRows. In this case we will convert. + val unsafeRow = if (isUnsafeRow && !conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)) { rdd } else { rdd.mapPartitionsInternal { iter => @@ -128,6 +131,32 @@ private[sql] case class PhysicalRDD( val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } + + // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen + // never requires UnsafeRow as input. + override def supportCodegen: Boolean = true + + override def upstream(): RDD[InternalRow] = { + rdd + } + + override protected def doProduce(ctx: CodegenContext): String = { + val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) + val row = ctx.freshName("row") + ctx.INPUT_ROW = row + ctx.currentVars = null + val columns = exprs.map(_.gen(ctx)) + s""" + | while (input.hasNext()) { + | InternalRow $row = (InternalRow) input.next(); + | ${columns.map(_.code).mkString("\n").trim} + | ${consume(ctx, columns).trim} + | if (shouldStop()) { + | return; + | } + | } + """.stripMargin + } } private[sql] object PhysicalRDD { From a83038097eb42a72cef38b7684d1010f4f1244d8 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Wed, 10 Feb 2016 16:42:03 -0800 Subject: [PATCH 02/10] CR --- .../apache/spark/sql/execution/ExistingRDD.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 8bb82fc5ffa25..1ef342a090627 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.{SQLConf, AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation @@ -132,14 +133,12 @@ private[sql] case class PhysicalRDD( s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } - // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen - // never requires UnsafeRow as input. - override def supportCodegen: Boolean = true - override def upstream(): RDD[InternalRow] = { rdd } + // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen + // never requires UnsafeRow as input. override protected def doProduce(ctx: CodegenContext): String = { val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) val row = ctx.freshName("row") @@ -169,8 +168,13 @@ private[sql] object PhysicalRDD { rdd: RDD[InternalRow], relation: BaseRelation, metadata: Map[String, String] = Map.empty): PhysicalRDD = { - // All HadoopFsRelations output UnsafeRows - val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation] + val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) { + // The vectorized parquet reader does not produce unsafe rows. + !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + } else { + // All HadoopFsRelations output UnsafeRows + relation.isInstanceOf[HadoopFsRelation] + } val bucketSpec = relation match { case r: HadoopFsRelation => r.getBucketSpec From 52033aef2b26f13c81ddece95586f11318777cc1 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Wed, 10 Feb 2016 17:04:58 -0800 Subject: [PATCH 03/10] Fix import ordering. --- .../scala/org/apache/spark/sql/execution/ExistingRDD.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 1ef342a090627..9e579030105fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -18,15 +18,15 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext -import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation -import org.apache.spark.sql.{SQLConf, AnalysisException, Row, SQLContext} +import org.apache.spark.sql.{AnalysisException, Row, SQLConf, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation} import org.apache.spark.sql.types.DataType From 6263b17859aceb3e4e47037b5c8e650df1652c2b Mon Sep 17 00:00:00 2001 From: Nong Li Date: Wed, 10 Feb 2016 21:01:20 -0800 Subject: [PATCH 04/10] Fix tests. --- .../sql/execution/WholeStageCodegen.scala | 1 + .../spark/sql/sources/BucketedReadSuite.scala | 46 ++++++++++--------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 08c52e5f4399f..afaddcf35775c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -42,6 +42,7 @@ trait CodegenSupport extends SparkPlan { case _: TungstenAggregate => "agg" case _: BroadcastHashJoin => "bhj" case _: SortMergeJoin => "smj" + case _: PhysicalRDD => "rdd" case _ => nodeName.toLowerCase } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 0ef42f45e3b78..62f6b998e6f2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -74,32 +74,34 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet bucketValues: Seq[Integer], filterCondition: Column, originalDataFrame: DataFrame): Unit = { + // This test verifies parts of the plan. Disable whole stage codegen. + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k") + val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec + // Limit: bucket pruning only works when the bucket column has one and only one column + assert(bucketColumnNames.length == 1) + val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head) + val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex) + val matchedBuckets = new BitSet(numBuckets) + bucketValues.foreach { value => + matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value)) + } - val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k") - val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec - // Limit: bucket pruning only works when the bucket column has one and only one column - assert(bucketColumnNames.length == 1) - val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head) - val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex) - val matchedBuckets = new BitSet(numBuckets) - bucketValues.foreach { value => - matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value)) - } + // Filter could hide the bug in bucket pruning. Thus, skipping all the filters + val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan + val rdd = plan.find(_.isInstanceOf[PhysicalRDD]) + assert(rdd.isDefined, plan) - // Filter could hide the bug in bucket pruning. Thus, skipping all the filters - val rdd = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan - .find(_.isInstanceOf[PhysicalRDD]) - assert(rdd.isDefined) + val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => + if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty) + } + // checking if all the pruned buckets are empty + assert(checkedResult.collect().forall(_ == true)) - val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => - if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty) + checkAnswer( + bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), + originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")) } - // checking if all the pruned buckets are empty - assert(checkedResult.collect().forall(_ == true)) - - checkAnswer( - bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), - originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")) } test("read partitioning bucketed tables with bucket pruning filters") { From 04c81f42ff9f19894aede7586a97117d065bf1df Mon Sep 17 00:00:00 2001 From: Nong Li Date: Fri, 12 Feb 2016 15:54:53 -0500 Subject: [PATCH 05/10] Fix a few tests that check query plan to disable whole stage codegen. --- .../spark/sql/sources/FilteredScanSuite.scala | 54 +++++++++++-------- .../spark/sql/sources/PrunedScanSuite.scala | 45 +++++++++------- 2 files changed, 57 insertions(+), 42 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 7196b6dc13394..4b578a6c10ec4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -304,30 +304,38 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic expectedCount: Int, requiredColumnNames: Set[String], expectedUnhandledFilters: Set[Filter]): Unit = { + test(s"PushDown Returns $expectedCount: $sqlString") { - val queryExecution = sql(sqlString).queryExecution - val rawPlan = queryExecution.executedPlan.collect { - case p: execution.PhysicalRDD => p - } match { - case Seq(p) => p - case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") - } - val rawCount = rawPlan.execute().count() - assert(ColumnsRequired.set === requiredColumnNames) - - val table = caseInsensitiveContext.table("oneToTenFiltered") - val relation = table.queryExecution.logical.collectFirst { - case LogicalRelation(r, _, _) => r - }.get - - assert( - relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters) - - if (rawCount != expectedCount) { - fail( - s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" + - s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + - queryExecution) + // These tests check a particular plan, disable whole stage codegen. + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + try { + val queryExecution = sql(sqlString).queryExecution + val rawPlan = queryExecution.executedPlan.collect { + case p: execution.PhysicalRDD => p + } match { + case Seq(p) => p + case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") + } + val rawCount = rawPlan.execute().count() + assert(ColumnsRequired.set === requiredColumnNames) + + val table = caseInsensitiveContext.table("oneToTenFiltered") + val relation = table.queryExecution.logical.collectFirst { + case LogicalRelation(r, _, _) => r + }.get + + assert( + relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters) + + if (rawCount != expectedCount) { + fail( + s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" + + s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + + queryExecution) + } + } finally { + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index a89c5f8007e78..02e8ab1c51746 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -117,28 +117,35 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext { def testPruning(sqlString: String, expectedColumns: String*): Unit = { test(s"Columns output ${expectedColumns.mkString(",")}: $sqlString") { - val queryExecution = sql(sqlString).queryExecution - val rawPlan = queryExecution.executedPlan.collect { - case p: execution.PhysicalRDD => p - } match { - case Seq(p) => p - case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") - } - val rawColumns = rawPlan.output.map(_.name) - val rawOutput = rawPlan.execute().first() - - if (rawColumns != expectedColumns) { - fail( - s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" + - s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + - queryExecution) - } - if (rawOutput.numFields != expectedColumns.size) { - fail(s"Wrong output row. Got $rawOutput\n$queryExecution") + // These tests check a particular plan, disable whole stage codegen. + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + try { + val queryExecution = sql(sqlString).queryExecution + val rawPlan = queryExecution.executedPlan.collect { + case p: execution.PhysicalRDD => p + } match { + case Seq(p) => p + case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") + } + val rawColumns = rawPlan.output.map(_.name) + val rawOutput = rawPlan.execute().first() + + if (rawColumns != expectedColumns) { + fail( + s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" + + s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + + queryExecution) + } + + if (rawOutput.numFields != expectedColumns.size) { + fail(s"Wrong output row. Got $rawOutput\n$queryExecution") + } + } finally { + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get) } } } - } From 3d918da737ac1ed9e3d416cd3befa2a7a13838d7 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Mon, 22 Feb 2016 18:05:26 -0800 Subject: [PATCH 06/10] Fix python plan test. --- python/pyspark/sql/dataframe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index bf43452e08c3b..7275e69353485 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -173,7 +173,8 @@ def explain(self, extended=False): >>> df.explain() == Physical Plan == - Scan ExistingRDD[age#0,name#1] + WholeStageCodegen + : +- Scan ExistingRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan == From 54d7b6c097a227f91b72f709477fb159093371c6 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Tue, 23 Feb 2016 11:57:40 -0800 Subject: [PATCH 07/10] Merge fixes --- .../org/apache/spark/sql/execution/ExistingRDD.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 9e579030105fa..ab0841e657840 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} -import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation} import org.apache.spark.sql.types.DataType @@ -110,9 +110,7 @@ private[sql] case class PhysicalRDD( "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { - val conf = SQLContext.getActive().get - // The vectorized reader does not produce UnsafeRows. In this case we will convert. - val unsafeRow = if (isUnsafeRow && !conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)) { + val unsafeRow = if (isUnsafeRow) { rdd } else { rdd.mapPartitionsInternal { iter => @@ -142,12 +140,14 @@ private[sql] case class PhysicalRDD( override protected def doProduce(ctx: CodegenContext): String = { val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) val row = ctx.freshName("row") + val numOutputRows = metricTerm(ctx, "numOutputRows") ctx.INPUT_ROW = row ctx.currentVars = null val columns = exprs.map(_.gen(ctx)) s""" | while (input.hasNext()) { | InternalRow $row = (InternalRow) input.next(); + | $numOutputRows.add(1); | ${columns.map(_.code).mkString("\n").trim} | ${consume(ctx, columns).trim} | if (shouldStop()) { From 708716cf58b4666889a3671f747d56abdcdadcec Mon Sep 17 00:00:00 2001 From: Nong Li Date: Tue, 23 Feb 2016 16:18:12 -0800 Subject: [PATCH 08/10] Update test. --- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7a0f7abaa1baf..f8a9a95c873ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -187,8 +187,10 @@ class JDBCSuite extends SparkFunSuite val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is removed in a physical plan and // the plan only has PhysicalRDD to scan JDBCRelation. - assert(parentPlan.isInstanceOf[PhysicalRDD]) - assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]) + val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen] + assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD]) + assert(node.plan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) df } assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) From 955e9e4673848d4783e46eb594d3f3f3d9fbc8cc Mon Sep 17 00:00:00 2001 From: Nong Li Date: Wed, 24 Feb 2016 10:28:22 -0800 Subject: [PATCH 09/10] Merge fix --- .../scala/org/apache/spark/sql/execution/ExistingRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ab0841e657840..d22ba9f684142 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -131,8 +131,8 @@ private[sql] case class PhysicalRDD( s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } - override def upstream(): RDD[InternalRow] = { - rdd + override def upstreams(): Seq[RDD[InternalRow]] = { + rdd :: Nil } // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen From 731c30e1d6653f050d480f7fd8be60e54cd6ffff Mon Sep 17 00:00:00 2001 From: Nong Li Date: Wed, 24 Feb 2016 11:07:53 -0800 Subject: [PATCH 10/10] Merge conflicts. --- .../org/apache/spark/sql/execution/ExistingRDD.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index d22ba9f684142..8649d2d69b62e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -138,6 +138,10 @@ private[sql] case class PhysicalRDD( // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen // never requires UnsafeRow as input. override protected def doProduce(ctx: CodegenContext): String = { + val input = ctx.freshName("input") + // PhysicalRDD always just has one input + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) val row = ctx.freshName("row") val numOutputRows = metricTerm(ctx, "numOutputRows") @@ -145,8 +149,8 @@ private[sql] case class PhysicalRDD( ctx.currentVars = null val columns = exprs.map(_.gen(ctx)) s""" - | while (input.hasNext()) { - | InternalRow $row = (InternalRow) input.next(); + | while ($input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); | $numOutputRows.add(1); | ${columns.map(_.code).mkString("\n").trim} | ${consume(ctx, columns).trim}