Skip to content

Commit b2349e6

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-20160][SQL] Move ParquetConversions and OrcConversions Out Of HiveSessionCatalog
### What changes were proposed in this pull request? `ParquetConversions` and `OrcConversions` should be treated as regular `Analyzer` rules. It is not reasonable to be part of `HiveSessionCatalog`. This PR also combines two rules `ParquetConversions` and `OrcConversions` to build a new rule `RelationConversions `. After moving these two rules out of HiveSessionCatalog, the next step is to clean up, rename and move `HiveMetastoreCatalog` because it is not related to the hive package any more. ### How was this patch tested? The existing test cases Author: Xiao Li <[email protected]> Closes #17484 from gatorsmile/cleanup.
1 parent c4c03ee commit b2349e6

File tree

7 files changed

+70
-122
lines changed

7 files changed

+70
-122
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 8 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,7 @@ import org.apache.spark.sql.SparkSession
2828
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.catalog._
3030
import org.apache.spark.sql.catalyst.plans.logical._
31-
import org.apache.spark.sql.catalyst.rules._
32-
import org.apache.spark.sql.execution.command.DDLUtils
3331
import org.apache.spark.sql.execution.datasources._
34-
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
35-
import org.apache.spark.sql.hive.orc.OrcFileFormat
3632
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
3733
import org.apache.spark.sql.types._
3834

@@ -48,14 +44,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
4844
private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
4945
import HiveMetastoreCatalog._
5046

51-
private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase
52-
53-
def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
54-
QualifiedTableName(
55-
tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase,
56-
tableIdent.table.toLowerCase)
57-
}
58-
5947
/** These locks guard against multiple attempts to instantiate a table, which wastes memory. */
6048
private val tableCreationLocks = Striped.lazyWeakLock(100)
6149

@@ -68,11 +56,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
6856
}
6957
}
7058

71-
def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
72-
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
73-
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
74-
val dbLocation = sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri
75-
new Path(new Path(dbLocation), tblName).toString
59+
// For testing only
60+
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
61+
val key = QualifiedTableName(
62+
table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
63+
table.table.toLowerCase)
64+
tableRelationCache.getIfPresent(key)
7665
}
7766

7867
private def getCached(
@@ -122,7 +111,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
122111
}
123112
}
124113

125-
private def convertToLogicalRelation(
114+
def convertToLogicalRelation(
126115
relation: CatalogRelation,
127116
options: Map[String, String],
128117
fileFormatClass: Class[_ <: FileFormat],
@@ -273,78 +262,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
273262
case NonFatal(ex) =>
274263
logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex)
275264
}
276-
277-
/**
278-
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
279-
* data source relations for better performance.
280-
*/
281-
object ParquetConversions extends Rule[LogicalPlan] {
282-
private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
283-
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
284-
sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
285-
}
286-
287-
private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
288-
val fileFormatClass = classOf[ParquetFileFormat]
289-
val mergeSchema = sessionState.conf.getConf(
290-
HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
291-
val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)
292-
293-
convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
294-
}
295-
296-
override def apply(plan: LogicalPlan): LogicalPlan = {
297-
plan transformUp {
298-
// Write path
299-
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
300-
// Inserting into partitioned table is not supported in Parquet data source (yet).
301-
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
302-
!r.isPartitioned && shouldConvertMetastoreParquet(r) =>
303-
InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists)
304-
305-
// Read path
306-
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
307-
shouldConvertMetastoreParquet(relation) =>
308-
convertToParquetRelation(relation)
309-
}
310-
}
311-
}
312-
313-
/**
314-
* When scanning Metastore ORC tables, convert them to ORC data source relations
315-
* for better performance.
316-
*/
317-
object OrcConversions extends Rule[LogicalPlan] {
318-
private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
319-
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
320-
sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
321-
}
322-
323-
private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {
324-
val fileFormatClass = classOf[OrcFileFormat]
325-
val options = Map[String, String]()
326-
327-
convertToLogicalRelation(relation, options, fileFormatClass, "orc")
328-
}
329-
330-
override def apply(plan: LogicalPlan): LogicalPlan = {
331-
plan transformUp {
332-
// Write path
333-
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
334-
// Inserting into partitioned table is not supported in Orc data source (yet).
335-
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
336-
!r.isPartitioned && shouldConvertMetastoreOrc(r) =>
337-
InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists)
338-
339-
// Read path
340-
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
341-
shouldConvertMetastoreOrc(relation) =>
342-
convertToOrcRelation(relation)
343-
}
344-
}
345-
}
346265
}
347266

267+
348268
private[hive] object HiveMetastoreCatalog {
349269
def mergeWithMetastoreSchema(
350270
metastoreSchema: StructType,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@ import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
2626
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
2727

2828
import org.apache.spark.sql.AnalysisException
29-
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
29+
import org.apache.spark.sql.catalyst.FunctionIdentifier
3030
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
3131
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
3232
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
3333
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
3434
import org.apache.spark.sql.catalyst.parser.ParserInterface
35-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
36-
import org.apache.spark.sql.catalyst.rules.Rule
3735
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
3836
import org.apache.spark.sql.internal.SQLConf
3937
import org.apache.spark.sql.types.{DecimalType, DoubleType}
@@ -43,7 +41,7 @@ import org.apache.spark.util.Utils
4341
private[sql] class HiveSessionCatalog(
4442
externalCatalog: HiveExternalCatalog,
4543
globalTempViewManager: GlobalTempViewManager,
46-
private val metastoreCatalog: HiveMetastoreCatalog,
44+
val metastoreCatalog: HiveMetastoreCatalog,
4745
functionRegistry: FunctionRegistry,
4846
conf: SQLConf,
4947
hadoopConf: Configuration,
@@ -58,25 +56,6 @@ private[sql] class HiveSessionCatalog(
5856
parser,
5957
functionResourceLoader) {
6058

61-
// ----------------------------------------------------------------
62-
// | Methods and fields for interacting with HiveMetastoreCatalog |
63-
// ----------------------------------------------------------------
64-
65-
// These 2 rules must be run before all other DDL post-hoc resolution rules, i.e.
66-
// `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
67-
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
68-
val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
69-
70-
def hiveDefaultTableFilePath(name: TableIdentifier): String = {
71-
metastoreCatalog.hiveDefaultTableFilePath(name)
72-
}
73-
74-
// For testing only
75-
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
76-
val key = metastoreCatalog.getQualifiedTableName(table)
77-
tableRelationCache.getIfPresent(key)
78-
}
79-
8059
override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
8160
makeFunctionBuilder(funcName, Utils.classForName(className))
8261
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
7575

7676
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
7777
new DetermineTableStats(session) +:
78-
catalog.ParquetConversions +:
79-
catalog.OrcConversions +:
78+
RelationConversions(conf, catalog) +:
8079
PreprocessTableCreation(session) +:
8180
PreprocessTableInsertion(conf) +:
8281
DataSourceAnalysis(conf) +:

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.hive
1919

2020
import java.io.IOException
21-
import java.net.URI
2221

2322
import org.apache.hadoop.fs.{FileSystem, Path}
2423
import org.apache.hadoop.hive.common.StatsSetupConst
@@ -31,9 +30,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan
3130
import org.apache.spark.sql.catalyst.rules.Rule
3231
import org.apache.spark.sql.execution._
3332
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
34-
import org.apache.spark.sql.execution.datasources.CreateTable
33+
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
34+
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
3535
import org.apache.spark.sql.hive.execution._
36-
import org.apache.spark.sql.internal.HiveSerDe
36+
import org.apache.spark.sql.hive.orc.OrcFileFormat
37+
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
3738

3839

3940
/**
@@ -170,6 +171,55 @@ object HiveAnalysis extends Rule[LogicalPlan] {
170171
}
171172
}
172173

174+
/**
175+
* Relation conversion from metastore relations to data source relations for better performance
176+
*
177+
* - When writing to non-partitioned Hive-serde Parquet/Orc tables
178+
* - When scanning Hive-serde Parquet/ORC tables
179+
*
180+
* This rule must be run before all other DDL post-hoc resolution rules, i.e.
181+
* `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
182+
*/
183+
case class RelationConversions(
184+
conf: SQLConf,
185+
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
186+
private def isConvertible(relation: CatalogRelation): Boolean = {
187+
(relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
188+
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)) ||
189+
(relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
190+
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC))
191+
}
192+
193+
private def convert(relation: CatalogRelation): LogicalRelation = {
194+
if (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet")) {
195+
val options = Map(ParquetOptions.MERGE_SCHEMA ->
196+
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
197+
sessionCatalog.metastoreCatalog
198+
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
199+
} else {
200+
val options = Map[String, String]()
201+
sessionCatalog.metastoreCatalog
202+
.convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
203+
}
204+
}
205+
206+
override def apply(plan: LogicalPlan): LogicalPlan = {
207+
plan transformUp {
208+
// Write path
209+
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
210+
// Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
211+
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
212+
!r.isPartitioned && isConvertible(r) =>
213+
InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists)
214+
215+
// Read path
216+
case relation: CatalogRelation
217+
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
218+
convert(relation)
219+
}
220+
}
221+
}
222+
173223
private[hive] trait HiveStrategies {
174224
// Possibly being too clever with types here... or not clever enough.
175225
self: SparkPlanner =>

sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ public void setUp() throws IOException {
7272
path.delete();
7373
}
7474
HiveSessionCatalog catalog = (HiveSessionCatalog) sqlContext.sessionState().catalog();
75-
hiveManagedPath = new Path(
76-
catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
75+
hiveManagedPath = new Path(catalog.defaultTablePath(new TableIdentifier("javaSavedTable")));
7776
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
7877
fs.delete(hiveManagedPath, true);
7978

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
413413
}
414414
// Table lookup will make the table cached.
415415
spark.table(tableIndent)
416-
statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent)
416+
statsBeforeUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent)
417417
.asInstanceOf[LogicalRelation].catalogTable.get.stats.get
418418

419419
sql(s"INSERT INTO $tableName SELECT 2")
@@ -423,7 +423,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
423423
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
424424
}
425425
spark.table(tableIndent)
426-
statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent)
426+
statsAfterUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent)
427427
.asInstanceOf[LogicalRelation].catalogTable.get.stats.get
428428
}
429429
(statsBeforeUpdate, statsAfterUpdate)

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,8 +449,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
449449
}
450450
}
451451

452-
private def getCachedDataSourceTable(id: TableIdentifier): LogicalPlan = {
453-
sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCachedDataSourceTable(id)
452+
private def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
453+
sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
454+
.getCachedDataSourceTable(table)
454455
}
455456

456457
test("Caching converted data source Parquet Relations") {

0 commit comments

Comments
 (0)