Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def explain(self, extended=False):

>>> df.explain()
== Physical Plan ==
Scan PhysicalRDD[age#0,name#1]
Scan ExistingRDD[age#0,name#1]

>>> df.explain(True)
== Parsed Logical Plan ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,31 @@ private[sql] case class LogicalRDD(
private[sql] case class PhysicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
extraInformation: String,
override val nodeName: String,
override val metadata: Map[String, String] = Map.empty,
override val outputsUnsafeRows: Boolean = false)
extends LeafNode {

protected override def doExecute(): RDD[InternalRow] = rdd

override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
override def simpleString: String = {
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
}
}

private[sql] object PhysicalRDD {
// Metadata keys
val INPUT_PATHS = "InputPaths"
val PUSHED_FILTERS = "PushedFilters"

def createFromDataSource(
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation,
extraInformation: String = ""): PhysicalRDD = {
PhysicalRDD(output, rdd, relation.toString + extraInformation,
relation.isInstanceOf[HadoopFsRelation])
metadata: Map[String, String] = Map.empty): PhysicalRDD = {
// All HadoopFsRelations output UnsafeRows
val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
super.makeCopy(newArgs)
}

/**
* Return all metadata that describes more details of this SparkPlan.
*/
private[sql] def metadata: Map[String, String] = Map.empty

/**
* Return all metrics containing metrics of this SparkPlan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class SparkPlanInfo(
val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metadata: Map[String, String],
val metrics: Seq[SQLMetricInfo])

private[sql] object SparkPlanInfo {
Expand All @@ -41,6 +42,6 @@ private[sql] object SparkPlanInfo {
}
val children = plan.children.map(fromSparkPlan)

new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
case e @ EvaluatePython(udf, child, _) =>
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "ExistingRDD") :: Nil
case BroadcastHint(child) => apply(child)
case _ => Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
Expand All @@ -25,6 +27,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
Expand Down Expand Up @@ -315,7 +318,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)

val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ")
val metadata: Map[String, String] = {
val pairs = ArrayBuffer.empty[(String, String)]

if (pushedFilters.nonEmpty) {
pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
}

relation.relation match {
case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.paths.mkString(", ")
case _ =>
}

pairs.toMap
}

if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
Expand All @@ -334,7 +350,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.PhysicalRDD.createFromDataSource(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, pushedFiltersString)
relation.relation, metadata)
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
Expand All @@ -344,7 +360,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.PhysicalRDD.createFromDataSource(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, pushedFiltersString)
relation.relation, metadata)
execution.Project(
projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ private[sql] class ParquetRelation(
meta
}

override def toString: String = {
parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map { tableName =>
s"${getClass.getSimpleName}: $tableName"
}.getOrElse(super.toString)
}

override def equals(other: Any): Boolean = other match {
case that: ParquetRelation =>
val schemaEquality = if (shouldMergeSchemas) {
Expand Down Expand Up @@ -521,6 +527,10 @@ private[sql] object ParquetRelation extends Logging {
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"

// If a ParquetRelation is converted from a Hive metastore table, this option is set to the
// original Hive table name.
private[sql] val METASTORE_TABLE_NAME = "metastoreTableName"

/**
* If parquet's block size (row group size) setting is larger than the min split size,
* we use parquet's block size setting as the min split size. Otherwise, we will create
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ private[sql] object SparkPlanGraph {
SQLMetrics.getMetricParam(metric.metricParam))
}
val node = SparkPlanGraphNode(
nodeIdGenerator.getAndIncrement(), planInfo.nodeName, planInfo.simpleString, metrics)
nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
planInfo.simpleString, planInfo.metadata, metrics)

nodes += node
val childrenNodes = planInfo.children.map(
child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
Expand All @@ -85,26 +87,33 @@ private[sql] object SparkPlanGraph {
* @param metrics metrics that this SparkPlan node will track
*/
private[ui] case class SparkPlanGraphNode(
id: Long, name: String, desc: String, metrics: Seq[SQLPlanMetric]) {
id: Long,
name: String,
desc: String,
metadata: Map[String, String],
metrics: Seq[SQLPlanMetric]) {

def makeDotNode(metricsValue: Map[Long, String]): String = {
val values = {
for (metric <- metrics;
value <- metricsValue.get(metric.accumulatorId)) yield {
metric.name + ": " + value
}
val builder = new mutable.StringBuilder(name)

val values = for {
metric <- metrics
value <- metricsValue.get(metric.accumulatorId)
} yield {
metric.name + ": " + value
}
val label = if (values.isEmpty) {
name
} else {
// If there are metrics, display all metrics in a separate line. We should use an escaped
// "\n" here to follow the dot syntax.
//
// Note: whitespace between two "\n"s is to create an empty line between the name of
// SparkPlan and metrics. If removing it, it won't display the empty line in UI.
name + "\\n \\n" + values.mkString("\\n")
}
s""" $id [label="$label"];"""

if (values.nonEmpty) {
// If there are metrics, display each entry in a separate line. We should use an escaped
// "\n" here to follow the dot syntax.
//
// Note: whitespace between two "\n"s is to create an empty line between the name of
// SparkPlan and metrics. If removing it, it won't display the empty line in UI.
builder ++= "\\n \\n"
builder ++= values.mkString("\\n")
}

s""" $id [label="${builder.toString()}"];"""
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ abstract class HadoopFsRelation private[sql](
parameters: Map[String, String])
extends BaseRelation with FileRelation with Logging {

override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
override def toString: String = getClass.getSimpleName
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the paths string because it's now shown as part of the metadata in both simpleString and visualized plan node.


def this() = this(None, Map.empty[String, String])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class PlannerSuite extends SharedSQLContext {

withTempTable("testPushed") {
val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan
assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]"))
assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,12 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// evil case insensitivity issue, which is reconciled within `ParquetRelation`.
val parquetOptions = Map(
ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
metastoreRelation.tableName,
Some(metastoreRelation.databaseName)
).unquotedString
)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)

Expand Down