@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
3434import org .apache .spark .sql .execution .datasources .parquet .{ParquetFileFormat , ParquetOptions }
3535import org .apache .spark .sql .hive .execution ._
3636import org .apache .spark .sql .hive .orc .OrcFileFormat
37- import org .apache .spark .sql .internal .HiveSerDe
37+ import org .apache .spark .sql .internal .{ HiveSerDe , SQLConf }
3838
3939
4040/**
@@ -172,74 +172,50 @@ object HiveAnalysis extends Rule[LogicalPlan] {
172172}
173173
174174/**
175- * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
176- * data source relations for better performance.
175+ * Relation conversion from metastore relations to data source relations for better performance
176+ *
177+ * - When writing to non-partitioned Hive-serde Parquet/Orc tables
178+ * - When scanning Hive-serde Parquet/ORC tables
179+ *
180+ * This rule must be run before all other DDL post-hoc resolution rules, i.e.
181+ * `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
177182 */
178- case class ParquetConversions (sparkSession : SparkSession ) extends Rule [LogicalPlan ] {
179- private def shouldConvertMetastoreParquet (relation : CatalogRelation ): Boolean = {
180- relation.tableMeta.storage.serde.getOrElse(" " ).toLowerCase.contains(" parquet" ) &&
181- sparkSession.sessionState.conf.getConf(HiveUtils .CONVERT_METASTORE_PARQUET )
182- }
183-
184- private def convertToParquetRelation (relation : CatalogRelation ): LogicalRelation = {
185- val fileFormatClass = classOf [ParquetFileFormat ]
186- val mergeSchema = sparkSession.sessionState.conf.getConf(
187- HiveUtils .CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING )
188- val options = Map (ParquetOptions .MERGE_SCHEMA -> mergeSchema.toString)
189-
190- sparkSession.sessionState.catalog.asInstanceOf [HiveSessionCatalog ].metastoreCatalog
191- .convertToLogicalRelation(relation, options, fileFormatClass, " parquet" )
183+ case class RelationConversions (
184+ conf : SQLConf ,
185+ sessionCatalog : HiveSessionCatalog ) extends Rule [LogicalPlan ] {
186+ private def isConvertible (relation : CatalogRelation ): Boolean = {
187+ (relation.tableMeta.storage.serde.getOrElse(" " ).toLowerCase.contains(" parquet" ) &&
188+ conf.getConf(HiveUtils .CONVERT_METASTORE_PARQUET )) ||
189+ (relation.tableMeta.storage.serde.getOrElse(" " ).toLowerCase.contains(" orc" ) &&
190+ conf.getConf(HiveUtils .CONVERT_METASTORE_ORC ))
192191 }
193192
194- override def apply (plan : LogicalPlan ): LogicalPlan = {
195- plan transformUp {
196- // Write path
197- case InsertIntoTable (r : CatalogRelation , partition, query, overwrite, ifNotExists)
198- // Inserting into partitioned table is not supported in Parquet data source (yet).
199- if query.resolved && DDLUtils .isHiveTable(r.tableMeta) &&
200- ! r.isPartitioned && shouldConvertMetastoreParquet(r) =>
201- InsertIntoTable (convertToParquetRelation(r), partition, query, overwrite, ifNotExists)
202-
203- // Read path
204- case relation : CatalogRelation if DDLUtils .isHiveTable(relation.tableMeta) &&
205- shouldConvertMetastoreParquet(relation) =>
206- convertToParquetRelation(relation)
193+ private def convert (relation : CatalogRelation ): LogicalRelation = {
194+ if (relation.tableMeta.storage.serde.getOrElse(" " ).toLowerCase.contains(" parquet" )) {
195+ val options = Map (ParquetOptions .MERGE_SCHEMA ->
196+ conf.getConf(HiveUtils .CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING ).toString)
197+ sessionCatalog.metastoreCatalog
198+ .convertToLogicalRelation(relation, options, classOf [ParquetFileFormat ], " parquet" )
199+ } else {
200+ val options = Map [String , String ]()
201+ sessionCatalog.metastoreCatalog
202+ .convertToLogicalRelation(relation, options, classOf [OrcFileFormat ], " orc" )
207203 }
208204 }
209- }
210-
211-
212- /**
213- * When scanning Metastore ORC tables, convert them to ORC data source relations
214- * for better performance.
215- */
216- case class OrcConversions (sparkSession : SparkSession ) extends Rule [LogicalPlan ] {
217- private def shouldConvertMetastoreOrc (relation : CatalogRelation ): Boolean = {
218- relation.tableMeta.storage.serde.getOrElse(" " ).toLowerCase.contains(" orc" ) &&
219- sparkSession.sessionState.conf.getConf(HiveUtils .CONVERT_METASTORE_ORC )
220- }
221-
222- private def convertToOrcRelation (relation : CatalogRelation ): LogicalRelation = {
223- val fileFormatClass = classOf [OrcFileFormat ]
224- val options = Map [String , String ]()
225-
226- sparkSession.sessionState.catalog.asInstanceOf [HiveSessionCatalog ].metastoreCatalog
227- .convertToLogicalRelation(relation, options, fileFormatClass, " orc" )
228- }
229205
230206 override def apply (plan : LogicalPlan ): LogicalPlan = {
231207 plan transformUp {
232208 // Write path
233209 case InsertIntoTable (r : CatalogRelation , partition, query, overwrite, ifNotExists)
234- // Inserting into partitioned table is not supported in Orc data source (yet).
210+ // Inserting into partitioned table is not supported in Parquet/ Orc data source (yet).
235211 if query.resolved && DDLUtils .isHiveTable(r.tableMeta) &&
236- ! r.isPartitioned && shouldConvertMetastoreOrc (r) =>
237- InsertIntoTable (convertToOrcRelation (r), partition, query, overwrite, ifNotExists)
212+ ! r.isPartitioned && isConvertible (r) =>
213+ InsertIntoTable (convert (r), partition, query, overwrite, ifNotExists)
238214
239215 // Read path
240- case relation : CatalogRelation if DDLUtils .isHiveTable(relation.tableMeta) &&
241- shouldConvertMetastoreOrc (relation) =>
242- convertToOrcRelation (relation)
216+ case relation : CatalogRelation
217+ if DDLUtils .isHiveTable(relation.tableMeta) && isConvertible (relation) =>
218+ convert (relation)
243219 }
244220 }
245221}
0 commit comments