Skip to content

Commit f79371a

Browse files
Buddecloud-fan
authored andcommitted
[SPARK-19611][SQL] Introduce configurable table schema inference
## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde <[email protected]> Closes #16944 from budde/SPARK-19611.
1 parent cabe1df commit f79371a

File tree

11 files changed

+489
-159
lines changed

11 files changed

+489
-159
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
1919

2020
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
2121
import org.apache.spark.sql.catalyst.expressions.Expression
22-
22+
import org.apache.spark.sql.types.StructType
2323

2424
/**
2525
* Interface for the system catalog (of functions, partitions, tables, and databases).
@@ -104,6 +104,19 @@ abstract class ExternalCatalog {
104104
*/
105105
def alterTable(tableDefinition: CatalogTable): Unit
106106

107+
/**
108+
* Alter the schema of a table identified by the provided database and table name. The new schema
109+
* should still contain the existing bucket columns and partition columns used by the table. This
110+
* method will also update any Spark SQL-related parameters stored as Hive table properties (such
111+
* as the schema itself).
112+
*
113+
* @param db Database that table to alter schema for exists in
114+
* @param table Name of table to alter schema for
115+
* @param schema Updated schema to be used for the table (must contain existing partition and
116+
* bucket columns)
117+
*/
118+
def alterTableSchema(db: String, table: String, schema: StructType): Unit
119+
107120
def getTable(db: String, table: String): CatalogTable
108121

109122
def getTableOption(db: String, table: String): Option[CatalogTable]

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis._
3131
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
3232
import org.apache.spark.sql.catalyst.expressions.Expression
3333
import org.apache.spark.sql.catalyst.util.StringUtils
34+
import org.apache.spark.sql.types.StructType
3435

3536
/**
3637
* An in-memory (ephemeral) implementation of the system catalog.
@@ -297,6 +298,15 @@ class InMemoryCatalog(
297298
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
298299
}
299300

301+
override def alterTableSchema(
302+
db: String,
303+
table: String,
304+
schema: StructType): Unit = synchronized {
305+
requireTableExists(db, table)
306+
val origTable = catalog(db).tables(table).table
307+
catalog(db).tables(table).table = origTable.copy(schema = schema)
308+
}
309+
300310
override def getTable(db: String, table: String): CatalogTable = synchronized {
301311
requireTableExists(db, table)
302312
catalog(db).tables(table).table

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,11 @@ case class BucketSpec(
163163
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
164164
* catalog. If false, it is inferred automatically based on file
165165
* structure.
166+
* @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive.
167+
* When using a Hive Metastore, this flag is set to false if a case-
168+
* sensitive schema was unable to be read from the table properties.
169+
* Used to trigger case-sensitive schema inference at query time, when
170+
* configured.
166171
*/
167172
case class CatalogTable(
168173
identifier: TableIdentifier,
@@ -180,7 +185,8 @@ case class CatalogTable(
180185
viewText: Option[String] = None,
181186
comment: Option[String] = None,
182187
unsupportedFeatures: Seq[String] = Seq.empty,
183-
tracksPartitionsInCatalog: Boolean = false) {
188+
tracksPartitionsInCatalog: Boolean = false,
189+
schemaPreservesCase: Boolean = true) {
184190

185191
import CatalogTable._
186192

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.AnalysisException
2828
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
3030
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
31-
import org.apache.spark.sql.types.StructType
31+
import org.apache.spark.sql.types._
3232
import org.apache.spark.util.Utils
3333

3434

@@ -240,6 +240,19 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
240240
}
241241
}
242242

243+
test("alter table schema") {
244+
val catalog = newBasicCatalog()
245+
val tbl1 = catalog.getTable("db2", "tbl1")
246+
val newSchema = StructType(Seq(
247+
StructField("new_field_1", IntegerType),
248+
StructField("new_field_2", StringType),
249+
StructField("a", IntegerType),
250+
StructField("b", StringType)))
251+
catalog.alterTableSchema("db2", "tbl1", newSchema)
252+
val newTbl1 = catalog.getTable("db2", "tbl1")
253+
assert(newTbl1.schema == newSchema)
254+
}
255+
243256
test("get table") {
244257
assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
245258
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,8 @@ class TreeNodeSuite extends SparkFunSuite {
491491
"lastAccessTime" -> -1,
492492
"tracksPartitionsInCatalog" -> false,
493493
"properties" -> JNull,
494-
"unsupportedFeatures" -> List.empty[String]))
494+
"unsupportedFeatures" -> List.empty[String],
495+
"schemaPreservesCase" -> JBool(true)))
495496

496497
// For unknown case class, returns JNull.
497498
val bigValue = new Array[Int](10000)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -475,71 +475,6 @@ object ParquetFileFormat extends Logging {
475475
}
476476
}
477477

478-
/**
479-
* Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
480-
* schema and Parquet schema.
481-
*
482-
* Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
483-
* schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
484-
* distinguish binary and string). This method generates a correct schema by merging Metastore
485-
* schema data types and Parquet schema field names.
486-
*/
487-
def mergeMetastoreParquetSchema(
488-
metastoreSchema: StructType,
489-
parquetSchema: StructType): StructType = {
490-
def schemaConflictMessage: String =
491-
s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
492-
|${metastoreSchema.prettyJson}
493-
|
494-
|Parquet schema:
495-
|${parquetSchema.prettyJson}
496-
""".stripMargin
497-
498-
val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
499-
500-
assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
501-
502-
val ordinalMap = metastoreSchema.zipWithIndex.map {
503-
case (field, index) => field.name.toLowerCase -> index
504-
}.toMap
505-
506-
val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
507-
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
508-
509-
StructType(metastoreSchema.zip(reorderedParquetSchema).map {
510-
// Uses Parquet field names but retains Metastore data types.
511-
case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
512-
mSchema.copy(name = pSchema.name)
513-
case _ =>
514-
throw new SparkException(schemaConflictMessage)
515-
})
516-
}
517-
518-
/**
519-
* Returns the original schema from the Parquet file with any missing nullable fields from the
520-
* Hive Metastore schema merged in.
521-
*
522-
* When constructing a DataFrame from a collection of structured data, the resulting object has
523-
* a schema corresponding to the union of the fields present in each element of the collection.
524-
* Spark SQL simply assigns a null value to any field that isn't present for a particular row.
525-
* In some cases, it is possible that a given table partition stored as a Parquet file doesn't
526-
* contain a particular nullable field in its schema despite that field being present in the
527-
* table schema obtained from the Hive Metastore. This method returns a schema representing the
528-
* Parquet file schema along with any additional nullable fields from the Metastore schema
529-
* merged in.
530-
*/
531-
private[parquet] def mergeMissingNullableFields(
532-
metastoreSchema: StructType,
533-
parquetSchema: StructType): StructType = {
534-
val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
535-
val missingFields = metastoreSchema
536-
.map(_.name.toLowerCase)
537-
.diff(parquetSchema.map(_.name.toLowerCase))
538-
.map(fieldMap(_))
539-
.filter(_.nullable)
540-
StructType(parquetSchema ++ missingFields)
541-
}
542-
543478
/**
544479
* Reads Parquet footers in multi-threaded manner.
545480
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,25 @@ object SQLConf {
296296
.longConf
297297
.createWithDefault(250 * 1024 * 1024)
298298

299+
object HiveCaseSensitiveInferenceMode extends Enumeration {
300+
val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
301+
}
302+
303+
val HIVE_CASE_SENSITIVE_INFERENCE = buildConf("spark.sql.hive.caseSensitiveInferenceMode")
304+
.doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " +
305+
"table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " +
306+
"formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " +
307+
"any table backed by files containing case-sensitive field names or queries may not return " +
308+
"accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " +
309+
"case-sensitive schema from the underlying data files and write it back to the table " +
310+
"properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " +
311+
"properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " +
312+
"instead of inferring).")
313+
.stringConf
314+
.transform(_.toUpperCase())
315+
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
316+
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
317+
299318
val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
300319
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
301320
"to produce the partition columns instead of table scans. It applies when all the columns " +
@@ -792,6 +811,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
792811

793812
def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
794813

814+
def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
815+
HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))
816+
795817
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
796818

797819
def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -368,88 +368,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
368368
}
369369
}
370370

371-
test("merge with metastore schema") {
372-
// Field type conflict resolution
373-
assertResult(
374-
StructType(Seq(
375-
StructField("lowerCase", StringType),
376-
StructField("UPPERCase", DoubleType, nullable = false)))) {
377-
378-
ParquetFileFormat.mergeMetastoreParquetSchema(
379-
StructType(Seq(
380-
StructField("lowercase", StringType),
381-
StructField("uppercase", DoubleType, nullable = false))),
382-
383-
StructType(Seq(
384-
StructField("lowerCase", BinaryType),
385-
StructField("UPPERCase", IntegerType, nullable = true))))
386-
}
387-
388-
// MetaStore schema is subset of parquet schema
389-
assertResult(
390-
StructType(Seq(
391-
StructField("UPPERCase", DoubleType, nullable = false)))) {
392-
393-
ParquetFileFormat.mergeMetastoreParquetSchema(
394-
StructType(Seq(
395-
StructField("uppercase", DoubleType, nullable = false))),
396-
397-
StructType(Seq(
398-
StructField("lowerCase", BinaryType),
399-
StructField("UPPERCase", IntegerType, nullable = true))))
400-
}
401-
402-
// Metastore schema contains additional non-nullable fields.
403-
assert(intercept[Throwable] {
404-
ParquetFileFormat.mergeMetastoreParquetSchema(
405-
StructType(Seq(
406-
StructField("uppercase", DoubleType, nullable = false),
407-
StructField("lowerCase", BinaryType, nullable = false))),
408-
409-
StructType(Seq(
410-
StructField("UPPERCase", IntegerType, nullable = true))))
411-
}.getMessage.contains("detected conflicting schemas"))
412-
413-
// Conflicting non-nullable field names
414-
intercept[Throwable] {
415-
ParquetFileFormat.mergeMetastoreParquetSchema(
416-
StructType(Seq(StructField("lower", StringType, nullable = false))),
417-
StructType(Seq(StructField("lowerCase", BinaryType))))
418-
}
419-
}
420-
421-
test("merge missing nullable fields from Metastore schema") {
422-
// Standard case: Metastore schema contains additional nullable fields not present
423-
// in the Parquet file schema.
424-
assertResult(
425-
StructType(Seq(
426-
StructField("firstField", StringType, nullable = true),
427-
StructField("secondField", StringType, nullable = true),
428-
StructField("thirdfield", StringType, nullable = true)))) {
429-
ParquetFileFormat.mergeMetastoreParquetSchema(
430-
StructType(Seq(
431-
StructField("firstfield", StringType, nullable = true),
432-
StructField("secondfield", StringType, nullable = true),
433-
StructField("thirdfield", StringType, nullable = true))),
434-
StructType(Seq(
435-
StructField("firstField", StringType, nullable = true),
436-
StructField("secondField", StringType, nullable = true))))
437-
}
438-
439-
// Merge should fail if the Metastore contains any additional fields that are not
440-
// nullable.
441-
assert(intercept[Throwable] {
442-
ParquetFileFormat.mergeMetastoreParquetSchema(
443-
StructType(Seq(
444-
StructField("firstfield", StringType, nullable = true),
445-
StructField("secondfield", StringType, nullable = true),
446-
StructField("thirdfield", StringType, nullable = false))),
447-
StructType(Seq(
448-
StructField("firstField", StringType, nullable = true),
449-
StructField("secondField", StringType, nullable = true))))
450-
}.getMessage.contains("detected conflicting schemas"))
451-
}
452-
453371
test("schema merging failure error message") {
454372
import testImplicits._
455373

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,25 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
597597
}
598598
}
599599

600+
override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
601+
requireTableExists(db, table)
602+
val rawTable = getRawTable(db, table)
603+
val withNewSchema = rawTable.copy(schema = schema)
604+
// Add table metadata such as table schema, partition columns, etc. to table properties.
605+
val updatedTable = withNewSchema.copy(
606+
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
607+
try {
608+
client.alterTable(updatedTable)
609+
} catch {
610+
case NonFatal(e) =>
611+
val warningMessage =
612+
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
613+
"compatible way. Updating Hive metastore in Spark SQL specific format."
614+
logWarning(warningMessage, e)
615+
client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
616+
}
617+
}
618+
600619
override def getTable(db: String, table: String): CatalogTable = withClient {
601620
restoreTableMetadata(getRawTable(db, table))
602621
}
@@ -690,10 +709,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
690709
"different from the schema when this table was created by Spark SQL" +
691710
s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
692711
"from Hive metastore which is not case preserving.")
693-
hiveTable
712+
hiveTable.copy(schemaPreservesCase = false)
694713
}
695714
} else {
696-
hiveTable
715+
hiveTable.copy(schemaPreservesCase = false)
697716
}
698717
}
699718

0 commit comments

Comments
 (0)