Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ def explain(self, extended=False):

>>> df.explain()
== Physical Plan ==
Scan ExistingRDD[age#0,name#1]
WholeStageCodegen
: +- Scan ExistingRDD[age#0,name#1]

>>> df.explain(True)
== Parsed Logical Plan ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.{AnalysisException, Row, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -102,7 +104,7 @@ private[sql] case class PhysicalRDD(
override val metadata: Map[String, String] = Map.empty,
isUnsafeRow: Boolean = false,
override val outputPartitioning: Partitioning = UnknownPartitioning(0))
extends LeafNode {
extends LeafNode with CodegenSupport {

private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
Expand All @@ -128,6 +130,36 @@ private[sql] case class PhysicalRDD(
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
}

override def upstreams(): Seq[RDD[InternalRow]] = {
rdd :: Nil
}

// Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
// never requires UnsafeRow as input.
override protected def doProduce(ctx: CodegenContext): String = {
val input = ctx.freshName("input")
// PhysicalRDD always just has one input
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")

val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
val row = ctx.freshName("row")
val numOutputRows = metricTerm(ctx, "numOutputRows")
ctx.INPUT_ROW = row
ctx.currentVars = null
val columns = exprs.map(_.gen(ctx))
s"""
| while ($input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| $numOutputRows.add(1);
| ${columns.map(_.code).mkString("\n").trim}
| ${consume(ctx, columns).trim}
| if (shouldStop()) {
| return;
| }
| }
""".stripMargin
}
}

private[sql] object PhysicalRDD {
Expand All @@ -140,8 +172,13 @@ private[sql] object PhysicalRDD {
rdd: RDD[InternalRow],
relation: BaseRelation,
metadata: Map[String, String] = Map.empty): PhysicalRDD = {
// All HadoopFsRelations output UnsafeRows
val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) {
// The vectorized parquet reader does not produce unsafe rows.
!SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
} else {
// All HadoopFsRelations output UnsafeRows
relation.isInstanceOf[HadoopFsRelation]
}

val bucketSpec = relation match {
case r: HadoopFsRelation => r.getBucketSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ trait CodegenSupport extends SparkPlan {
case _: TungstenAggregate => "agg"
case _: BroadcastHashJoin => "bhj"
case _: SortMergeJoin => "smj"
case _: PhysicalRDD => "rdd"
case _ => nodeName.toLowerCase
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ class JDBCSuite extends SparkFunSuite
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is removed in a physical plan and
// the plan only has PhysicalRDD to scan JDBCRelation.
assert(parentPlan.isInstanceOf[PhysicalRDD])
assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
assert(node.plan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
df
}
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,30 +304,38 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
expectedCount: Int,
requiredColumnNames: Set[String],
expectedUnhandledFilters: Set[Filter]): Unit = {

test(s"PushDown Returns $expectedCount: $sqlString") {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
}
val rawCount = rawPlan.execute().count()
assert(ColumnsRequired.set === requiredColumnNames)

val table = caseInsensitiveContext.table("oneToTenFiltered")
val relation = table.queryExecution.logical.collectFirst {
case LogicalRelation(r, _, _) => r
}.get

assert(
relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)

if (rawCount != expectedCount) {
fail(
s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" +
s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
queryExecution)
// These tests check a particular plan, disable whole stage codegen.
caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
}
val rawCount = rawPlan.execute().count()
assert(ColumnsRequired.set === requiredColumnNames)

val table = caseInsensitiveContext.table("oneToTenFiltered")
val relation = table.queryExecution.logical.collectFirst {
case LogicalRelation(r, _, _) => r
}.get

assert(
relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)

if (rawCount != expectedCount) {
fail(
s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" +
s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
queryExecution)
}
} finally {
caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,28 +117,35 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {

def testPruning(sqlString: String, expectedColumns: String*): Unit = {
test(s"Columns output ${expectedColumns.mkString(",")}: $sqlString") {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
}
val rawColumns = rawPlan.output.map(_.name)
val rawOutput = rawPlan.execute().first()

if (rawColumns != expectedColumns) {
fail(
s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" +
s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
queryExecution)
}

if (rawOutput.numFields != expectedColumns.size) {
fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
// These tests check a particular plan, disable whole stage codegen.
caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
}
val rawColumns = rawPlan.output.map(_.name)
val rawOutput = rawPlan.execute().first()

if (rawColumns != expectedColumns) {
fail(
s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" +
s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
queryExecution)
}

if (rawOutput.numFields != expectedColumns.size) {
fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
}
} finally {
caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
}
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -74,32 +74,34 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
bucketValues: Seq[Integer],
filterCondition: Column,
originalDataFrame: DataFrame): Unit = {
// This test verifies parts of the plan. Disable whole stage codegen.
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
// Limit: bucket pruning only works when the bucket column has one and only one column
assert(bucketColumnNames.length == 1)
val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
val matchedBuckets = new BitSet(numBuckets)
bucketValues.foreach { value =>
matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
}

val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
// Limit: bucket pruning only works when the bucket column has one and only one column
assert(bucketColumnNames.length == 1)
val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
val matchedBuckets = new BitSet(numBuckets)
bucketValues.foreach { value =>
matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
}
// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
assert(rdd.isDefined, plan)

// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val rdd = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
.find(_.isInstanceOf[PhysicalRDD])
assert(rdd.isDefined)
val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
}
// checking if all the pruned buckets are empty
assert(checkedResult.collect().forall(_ == true))

val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
checkAnswer(
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
}
// checking if all the pruned buckets are empty
assert(checkedResult.collect().forall(_ == true))

checkAnswer(
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
}

test("read partitioning bucketed tables with bucket pruning filters") {
Expand Down