Skip to content

Commit 418e5e4

Browse files
cloud-fanyhuai
authored andcommitted
[SPARK-10741] [SQL] Hive Query Having/OrderBy against Parquet table is not working
https://issues.apache.org/jira/browse/SPARK-10741 I choose the second approach: do not change output exprIds when convert MetastoreRelation to LogicalRelation Author: Wenchen Fan <[email protected]> Closes apache#8889 from cloud-fan/hot-bug.
1 parent 299b439 commit 418e5e4

File tree

15 files changed

+103
-86
lines changed

15 files changed

+103
-86
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20-
import org.apache.spark.sql.catalyst.rules.Rule
2120
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2221

2322
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,14 @@ case class AttributeReference(
236236
}
237237
}
238238

239+
def withExprId(newExprId: ExprId): AttributeReference = {
240+
if (exprId == newExprId) {
241+
this
242+
} else {
243+
AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifiers)
244+
}
245+
}
246+
239247
override def toString: String = s"$name#${exprId.id}$typeSuffix"
240248
}
241249

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1595,7 +1595,7 @@ class DataFrame private[sql](
15951595
*/
15961596
def inputFiles: Array[String] = {
15971597
val files: Seq[String] = logicalPlan.collect {
1598-
case LogicalRelation(fsBasedRelation: FileRelation) =>
1598+
case LogicalRelation(fsBasedRelation: FileRelation, _) =>
15991599
fsBasedRelation.inputFiles
16001600
case fr: FileRelation =>
16011601
fr.inputFiles

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,29 +38,29 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
3838
*/
3939
private[sql] object DataSourceStrategy extends Strategy with Logging {
4040
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
41-
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) =>
41+
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _)) =>
4242
pruneFilterProjectRaw(
4343
l,
4444
projects,
4545
filters,
4646
(a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil
4747

48-
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) =>
48+
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) =>
4949
pruneFilterProject(
5050
l,
5151
projects,
5252
filters,
5353
(a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil
5454

55-
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) =>
55+
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _)) =>
5656
pruneFilterProject(
5757
l,
5858
projects,
5959
filters,
6060
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
6161

6262
// Scanning partitioned HadoopFsRelation
63-
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
63+
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
6464
if t.partitionSpec.partitionColumns.nonEmpty =>
6565
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
6666

@@ -88,7 +88,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
8888
selectedPartitions) :: Nil
8989

9090
// Scanning non-partitioned HadoopFsRelation
91-
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
91+
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
9292
// See buildPartitionedTableScan for the reason that we need to create a shard
9393
// broadcast HadoopConf.
9494
val sharedHadoopConf = SparkHadoopUtil.get.conf
@@ -101,16 +101,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
101101
(a, f) =>
102102
toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil
103103

104-
case l @ LogicalRelation(baseRelation: TableScan) =>
104+
case l @ LogicalRelation(baseRelation: TableScan, _) =>
105105
execution.PhysicalRDD.createFromDataSource(
106106
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
107107

108-
case i @ logical.InsertIntoTable(
109-
l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
108+
case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _),
109+
part, query, overwrite, false) if part.isEmpty =>
110110
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
111111

112112
case i @ logical.InsertIntoTable(
113-
l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) =>
113+
l @ LogicalRelation(t: HadoopFsRelation, _), part, query, overwrite, false) =>
114114
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
115115
execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
116116

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,48 @@
1717
package org.apache.spark.sql.execution.datasources
1818

1919
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
20-
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
20+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
2121
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
2222
import org.apache.spark.sql.sources.BaseRelation
2323

2424
/**
2525
* Used to link a [[BaseRelation]] in to a logical query plan.
26+
*
27+
* Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without
28+
* changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for
29+
* this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details.
2630
*/
27-
private[sql] case class LogicalRelation(relation: BaseRelation)
28-
extends LeafNode
29-
with MultiInstanceRelation {
31+
private[sql] case class LogicalRelation(
32+
relation: BaseRelation,
33+
expectedOutputAttributes: Option[Seq[Attribute]] = None)
34+
extends LeafNode with MultiInstanceRelation {
3035

31-
override val output: Seq[AttributeReference] = relation.schema.toAttributes
36+
override val output: Seq[AttributeReference] = {
37+
val attrs = relation.schema.toAttributes
38+
expectedOutputAttributes.map { expectedAttrs =>
39+
assert(expectedAttrs.length == attrs.length)
40+
attrs.zip(expectedAttrs).map {
41+
// We should respect the attribute names provided by base relation and only use the
42+
// exprId in `expectedOutputAttributes`.
43+
// The reason is that, some relations(like parquet) will reconcile attribute names to
44+
// workaround case insensitivity issue.
45+
case (attr, expected) => attr.withExprId(expected.exprId)
46+
}
47+
}.getOrElse(attrs)
48+
}
3249

3350
// Logical Relations are distinct if they have different output for the sake of transformations.
3451
override def equals(other: Any): Boolean = other match {
35-
case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output
36-
case _ => false
52+
case l @ LogicalRelation(otherRelation, _) => relation == otherRelation && output == l.output
53+
case _ => false
3754
}
3855

3956
override def hashCode: Int = {
4057
com.google.common.base.Objects.hashCode(relation, output)
4158
}
4259

4360
override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
44-
case LogicalRelation(otherRelation) => relation == otherRelation
61+
case LogicalRelation(otherRelation, _) => relation == otherRelation
4562
case _ => false
4663
}
4764

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
3737

3838
// We are inserting into an InsertableRelation or HadoopFsRelation.
3939
case i @ InsertIntoTable(
40-
l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => {
40+
l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _), _, child, _, _) => {
4141
// First, make sure the data to be inserted have the same number of fields with the
4242
// schema of the relation.
4343
if (l.output.size != child.output.size) {
@@ -84,14 +84,14 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
8484
def apply(plan: LogicalPlan): Unit = {
8585
plan.foreach {
8686
case i @ logical.InsertIntoTable(
87-
l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
87+
l @ LogicalRelation(t: InsertableRelation, _), partition, query, overwrite, ifNotExists) =>
8888
// Right now, we do not support insert into a data source table with partition specs.
8989
if (partition.nonEmpty) {
9090
failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
9191
} else {
9292
// Get all input data source relations of the query.
9393
val srcRelations = query.collect {
94-
case LogicalRelation(src: BaseRelation) => src
94+
case LogicalRelation(src: BaseRelation, _) => src
9595
}
9696
if (srcRelations.contains(t)) {
9797
failAnalysis(
@@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
102102
}
103103

104104
case logical.InsertIntoTable(
105-
LogicalRelation(r: HadoopFsRelation), part, query, overwrite, _) =>
105+
LogicalRelation(r: HadoopFsRelation, _), part, query, overwrite, _) =>
106106
// We need to make sure the partition columns specified by users do match partition
107107
// columns of the relation.
108108
val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
@@ -120,7 +120,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
120120

121121
// Get all input data source relations of the query.
122122
val srcRelations = query.collect {
123-
case LogicalRelation(src: BaseRelation) => src
123+
case LogicalRelation(src: BaseRelation, _) => src
124124
}
125125
if (srcRelations.contains(r)) {
126126
failAnalysis(
@@ -148,10 +148,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
148148
EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match {
149149
// Only do the check if the table is a data source table
150150
// (the relation is a BaseRelation).
151-
case l @ LogicalRelation(dest: BaseRelation) =>
151+
case l @ LogicalRelation(dest: BaseRelation, _) =>
152152
// Get all input data source relations of the query.
153153
val srcRelations = query.collect {
154-
case LogicalRelation(src: BaseRelation) => src
154+
case LogicalRelation(src: BaseRelation, _) => src
155155
}
156156
if (srcRelations.contains(dest)) {
157157
failAnalysis(

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
5555
.where(Column(predicate))
5656

5757
val analyzedPredicate = query.queryExecution.optimizedPlan.collect {
58-
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation)) => filters
58+
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation, _)) => filters
5959
}.flatten
6060
assert(analyzedPredicate.nonEmpty)
6161

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
465465
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
466466
val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution
467467
queryExecution.analyzed.collectFirst {
468-
case LogicalRelation(relation: ParquetRelation) =>
468+
case LogicalRelation(relation: ParquetRelation, _) =>
469469
assert(relation.partitionSpec === PartitionSpec.emptySpec)
470470
}.getOrElse {
471471
fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 16 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
448448
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
449449
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
450450
case null => None // Cache miss
451-
case logical @ LogicalRelation(parquetRelation: ParquetRelation) =>
451+
case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) =>
452452
// If we have the same paths, same schema, and same partition spec,
453453
// we will use the cached Parquet Relation.
454454
val useCached =
@@ -514,7 +514,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
514514
parquetRelation
515515
}
516516

517-
result.newInstance()
517+
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
518518
}
519519

520520
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
@@ -553,60 +553,28 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
553553
return plan
554554
}
555555

556-
// Collects all `MetastoreRelation`s which should be replaced
557-
val toBeReplaced = plan.collect {
556+
plan transformUp {
558557
// Write path
559-
case InsertIntoTable(relation: MetastoreRelation, _, _, _, _)
560-
// Inserting into partitioned table is not supported in Parquet data source (yet).
561-
if !relation.hiveQlTable.isPartitioned &&
562-
hive.convertMetastoreParquet &&
563-
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
564-
val parquetRelation = convertToParquetRelation(relation)
565-
val attributedRewrites = relation.output.zip(parquetRelation.output)
566-
(relation, parquetRelation, attributedRewrites)
558+
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
559+
// Inserting into partitioned table is not supported in Parquet data source (yet).
560+
if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
561+
r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
562+
val parquetRelation = convertToParquetRelation(r)
563+
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
567564

568565
// Write path
569-
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _)
566+
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
570567
// Inserting into partitioned table is not supported in Parquet data source (yet).
571-
if !relation.hiveQlTable.isPartitioned &&
572-
hive.convertMetastoreParquet &&
573-
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
574-
val parquetRelation = convertToParquetRelation(relation)
575-
val attributedRewrites = relation.output.zip(parquetRelation.output)
576-
(relation, parquetRelation, attributedRewrites)
568+
if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
569+
r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
570+
val parquetRelation = convertToParquetRelation(r)
571+
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
577572

578573
// Read path
579574
case relation: MetastoreRelation if hive.convertMetastoreParquet &&
580-
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
575+
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
581576
val parquetRelation = convertToParquetRelation(relation)
582-
val attributedRewrites = relation.output.zip(parquetRelation.output)
583-
(relation, parquetRelation, attributedRewrites)
584-
}
585-
586-
val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
587-
val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
588-
589-
// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
590-
// attribute IDs referenced in other nodes.
591-
plan.transformUp {
592-
case r: MetastoreRelation if relationMap.contains(r) =>
593-
val parquetRelation = relationMap(r)
594-
val alias = r.alias.getOrElse(r.tableName)
595-
Subquery(alias, parquetRelation)
596-
597-
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
598-
if relationMap.contains(r) =>
599-
val parquetRelation = relationMap(r)
600-
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
601-
602-
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
603-
if relationMap.contains(r) =>
604-
val parquetRelation = relationMap(r)
605-
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
606-
607-
case other => other.transformExpressions {
608-
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
609-
}
577+
Subquery(relation.alias.getOrElse(relation.tableName), parquetRelation)
610578
}
611579
}
612580
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ case class CreateMetastoreDataSourceAsSelect(
235235
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
236236
val createdRelation = LogicalRelation(resolved.relation)
237237
EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match {
238-
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) =>
238+
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
239239
if (l.relation != createdRelation.relation) {
240240
val errorDescription =
241241
s"Cannot append to table $tableName because the resolved relation does not " +

0 commit comments

Comments
 (0)