Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,14 @@ case class AttributeReference(
}
}

def withExprId(newExprId: ExprId): AttributeReference = {
if (exprId == newExprId) {
this
} else {
AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifiers)
}
}

override def toString: String = s"$name#${exprId.id}$typeSuffix"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,7 @@ class DataFrame private[sql](
*/
def inputFiles: Array[String] = {
val files: Seq[String] = logicalPlan.collect {
case LogicalRelation(fsBasedRelation: FileRelation) =>
case LogicalRelation(fsBasedRelation: FileRelation, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,29 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
*/
private[sql] object DataSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _)) =>
pruneFilterProjectRaw(
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil

case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil

case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil

// Scanning partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray

Expand Down Expand Up @@ -88,7 +88,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
selectedPartitions) :: Nil

// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
// See buildPartitionedTableScan for the reason that we need to create a shard
// broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf
Expand All @@ -101,16 +101,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(a, f) =>
toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil

case l @ LogicalRelation(baseRelation: TableScan) =>
case l @ LogicalRelation(baseRelation: TableScan, _) =>
execution.PhysicalRDD.createFromDataSource(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil

case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _),
part, query, overwrite, false) if part.isEmpty =>
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil

case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) =>
l @ LogicalRelation(t: HadoopFsRelation, _), part, query, overwrite, false) =>
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,48 @@
package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation

/**
* Used to link a [[BaseRelation]] in to a logical query plan.
*
* Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without
* changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for
* this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details.
*/
private[sql] case class LogicalRelation(relation: BaseRelation)
extends LeafNode
with MultiInstanceRelation {
private[sql] case class LogicalRelation(
relation: BaseRelation,
expectedOutputAttributes: Option[Seq[Attribute]] = None)
extends LeafNode with MultiInstanceRelation {

override val output: Seq[AttributeReference] = relation.schema.toAttributes
override val output: Seq[AttributeReference] = {
val attrs = relation.schema.toAttributes
expectedOutputAttributes.map { expectedAttrs =>
assert(expectedAttrs.length == attrs.length)
attrs.zip(expectedAttrs).map {
// We should respect the attribute names provided by base relation and only use the
// exprId in `expectedOutputAttributes`.
// The reason is that, some relations(like parquet) will reconcile attribute names to
// workaround case insensitivity issue.
case (attr, expected) => attr.withExprId(expected.exprId)
}
}.getOrElse(attrs)
}

// Logical Relations are distinct if they have different output for the sake of transformations.
override def equals(other: Any): Boolean = other match {
case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output
case _ => false
case l @ LogicalRelation(otherRelation, _) => relation == otherRelation && output == l.output
case _ => false
}

override def hashCode: Int = {
com.google.common.base.Objects.hashCode(relation, output)
}

override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
case LogicalRelation(otherRelation) => relation == otherRelation
case LogicalRelation(otherRelation, _) => relation == otherRelation
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {

// We are inserting into an InsertableRelation or HadoopFsRelation.
case i @ InsertIntoTable(
l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => {
l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _), _, child, _, _) => {
// First, make sure the data to be inserted have the same number of fields with the
// schema of the relation.
if (l.output.size != child.output.size) {
Expand Down Expand Up @@ -84,14 +84,14 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
l @ LogicalRelation(t: InsertableRelation, _), partition, query, overwrite, ifNotExists) =>
// Right now, we do not support insert into a data source table with partition specs.
if (partition.nonEmpty) {
failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
} else {
// Get all input data source relations of the query.
val srcRelations = query.collect {
case LogicalRelation(src: BaseRelation) => src
case LogicalRelation(src: BaseRelation, _) => src
}
if (srcRelations.contains(t)) {
failAnalysis(
Expand All @@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}

case logical.InsertIntoTable(
LogicalRelation(r: HadoopFsRelation), part, query, overwrite, _) =>
LogicalRelation(r: HadoopFsRelation, _), part, query, overwrite, _) =>
// We need to make sure the partition columns specified by users do match partition
// columns of the relation.
val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
Expand All @@ -120,7 +120,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>

// Get all input data source relations of the query.
val srcRelations = query.collect {
case LogicalRelation(src: BaseRelation) => src
case LogicalRelation(src: BaseRelation, _) => src
}
if (srcRelations.contains(r)) {
failAnalysis(
Expand Down Expand Up @@ -148,10 +148,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
case l @ LogicalRelation(dest: BaseRelation) =>
case l @ LogicalRelation(dest: BaseRelation, _) =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
case LogicalRelation(src: BaseRelation) => src
case LogicalRelation(src: BaseRelation, _) => src
}
if (srcRelations.contains(dest)) {
failAnalysis(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
.where(Column(predicate))

val analyzedPredicate = query.queryExecution.optimizedPlan.collect {
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation)) => filters
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation, _)) => filters
}.flatten
assert(analyzedPredicate.nonEmpty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
case LogicalRelation(relation: ParquetRelation) =>
case LogicalRelation(relation: ParquetRelation, _) =>
assert(relation.partitionSpec === PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(parquetRelation: ParquetRelation) =>
case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
Expand Down Expand Up @@ -514,7 +514,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
parquetRelation
}

result.newInstance()
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}

override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
Expand Down Expand Up @@ -553,60 +553,28 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
return plan
}

// Collects all `MetastoreRelation`s which should be replaced
val toBeReplaced = plan.collect {
plan transformUp {
// Write path
case InsertIntoTable(relation: MetastoreRelation, _, _, _, _)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(r)
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)

// Write path
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _)
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(r)
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we may preserve i and replace this line with i.copy(table = parquetRelation).


// Read path
case relation: MetastoreRelation if hive.convertMetastoreParquet &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
}

val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))

// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
// attribute IDs referenced in other nodes.
plan.transformUp {
case r: MetastoreRelation if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
val alias = r.alias.getOrElse(r.tableName)
Subquery(alias, parquetRelation)

case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)

case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)

case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
Subquery(relation.alias.getOrElse(relation.tableName), parquetRelation)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ case class CreateMetastoreDataSourceAsSelect(
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) =>
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
if (l.relation != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
test("Double create fails when allowExisting = false") {
sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")

val message = intercept[QueryExecutionException] {
intercept[QueryExecutionException] {
sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
}.getMessage
}
}

test("Double create does not fail when allowExisting = true") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
Row(3) :: Row(4) :: Nil)

table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation) => // OK
case LogicalRelation(p: ParquetRelation, _) => // OK
case _ =>
fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation]}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
case LogicalRelation(r: ParquetRelation) =>
case LogicalRelation(r: ParquetRelation, _) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
Expand Down Expand Up @@ -1223,4 +1223,29 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

checkAnswer(df, (0 until 5).map(i => Row(i + "#", i + "#")))
}

test("SPARK-10741: Sort on Aggregate using parquet") {
withTable("test10741") {
withTempTable("src") {
Seq("a" -> 5, "a" -> 9, "b" -> 6).toDF().registerTempTable("src")
sql("CREATE TABLE test10741(c1 STRING, c2 INT) STORED AS PARQUET AS SELECT * FROM src")
}

checkAnswer(sql(
"""
|SELECT c1, AVG(c2) AS c_avg
|FROM test10741
|GROUP BY c1
|HAVING (AVG(c2) > 5) ORDER BY c1
""".stripMargin), Row("a", 7.0) :: Row("b", 6.0) :: Nil)

checkAnswer(sql(
"""
|SELECT c1, AVG(c2) AS c_avg
|FROM test10741
|GROUP BY c1
|ORDER BY AVG(c2)
""".stripMargin), Row("b", 6.0) :: Row("a", 7.0) :: Nil)
}
}
}
Loading