-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tables #16626
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
52ca902
f498fa6
522443e
1af2654
ec57ee9
ec74849
8fca889
4a17529
9699128
9860e5c
dfff364
9f23254
180092f
5a8aa80
d3860e6
55577aa
6fa913a
7231efe
e4e9ecf
75e7441
9847030
1a383bb
f994ce9
5bf7360
599c45e
b3edfea
7d8a515
e895278
e171ac4
4391edd
a3fef12
1eb7cd3
04ce8f4
7d8437d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ | |
| import org.apache.spark.sql.catalyst.parser.CatalystSqlParser | ||
| import org.apache.spark.sql.catalyst.plans.PlanTest | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
| class InMemorySessionCatalogSuite extends SessionCatalogSuite { | ||
| protected val utils = new CatalogTestUtils { | ||
|
|
@@ -450,6 +451,34 @@ abstract class SessionCatalogSuite extends PlanTest { | |
| } | ||
| } | ||
|
|
||
| test("alter table add columns") { | ||
|
||
| withBasicCatalog { sessionCatalog => | ||
| sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) | ||
| val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") | ||
| sessionCatalog.alterTableSchema( | ||
| TableIdentifier("t1", Some("default")), | ||
| StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema)) | ||
|
|
||
| val newTab = sessionCatalog.externalCatalog.getTable("default", "t1") | ||
| // construct the expected table schema | ||
| val expectedTableSchema = StructType(oldTab.dataSchema.fields ++ | ||
| Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema) | ||
| assert(newTab.schema == expectedTableSchema) | ||
| } | ||
| } | ||
|
|
||
| test("alter table drop columns") { | ||
| withBasicCatalog { sessionCatalog => | ||
| sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) | ||
| val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") | ||
| val e = intercept[AnalysisException] { | ||
| sessionCatalog.alterTableSchema( | ||
| TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1))) | ||
| }.getMessage | ||
| assert(e.contains("We don't support dropping columns yet.")) | ||
| } | ||
| } | ||
|
|
||
| test("get table") { | ||
| withBasicCatalog { catalog => | ||
| assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ | |
| import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} | ||
| import org.apache.spark.sql.catalyst.util.quoteIdentifier | ||
| import org.apache.spark.sql.execution.datasources.PartitioningUtils | ||
| import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils} | ||
| import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat | ||
| import org.apache.spark.sql.execution.datasources.json.JsonFileFormat | ||
| import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -174,6 +177,77 @@ case class AlterTableRenameCommand( | |
|
|
||
| } | ||
|
|
||
| /** | ||
| * A command that add columns to a table | ||
| * The syntax of using this command in SQL is: | ||
| * {{{ | ||
| * ALTER TABLE table_identifier | ||
| * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...); | ||
|
||
| * }}} | ||
| */ | ||
| case class AlterTableAddColumnsCommand( | ||
| table: TableIdentifier, | ||
| columns: Seq[StructField]) extends RunnableCommand { | ||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| val catalog = sparkSession.sessionState.catalog | ||
| val catalogTable = verifyAlterTableAddColumn(catalog, table) | ||
|
|
||
| try { | ||
| sparkSession.catalog.uncacheTable(table.quotedString) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e) | ||
| } | ||
| catalog.refreshTable(table) | ||
|
|
||
| // make sure any partition columns are at the end of the fields | ||
| val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema | ||
| catalog.alterTableSchema( | ||
| table, catalogTable.schema.copy(fields = reorderedSchema.toArray)) | ||
|
|
||
| Seq.empty[Row] | ||
| } | ||
|
|
||
| /** | ||
| * ALTER TABLE ADD COLUMNS command does not support temporary view/table, | ||
| * view, or datasource table with text, orc formats or external provider. | ||
|
||
| * For datasource table, it currently only supports parquet, json, csv. | ||
| */ | ||
| private def verifyAlterTableAddColumn( | ||
| catalog: SessionCatalog, | ||
| table: TableIdentifier): CatalogTable = { | ||
| val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) | ||
|
|
||
| if (catalogTable.tableType == CatalogTableType.VIEW) { | ||
| throw new AnalysisException( | ||
| s""" | ||
| |ALTER ADD COLUMNS does not support views. | ||
| |You must drop and re-create the views for adding the new columns. Views: $table | ||
| """.stripMargin) | ||
| } | ||
|
|
||
| if (DDLUtils.isDatasourceTable(catalogTable)) { | ||
| DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match { | ||
| // For datasource table, this command can only support the following File format. | ||
| // TextFileFormat only default to one column "value" | ||
| // OrcFileFormat can not handle difference between user-specified schema and | ||
| // inferred schema yet. TODO, once this issue is resolved , we can add Orc back. | ||
| // Hive type is already considered as hive serde table, so the logic will not | ||
| // come in here. | ||
| case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat => | ||
| case s => | ||
| throw new AnalysisException( | ||
| s""" | ||
| |ALTER ADD COLUMNS does not support datasource table with type $s. | ||
| |You must drop and re-create the table for adding the new columns. Tables: $table | ||
| """.stripMargin) | ||
| } | ||
| } | ||
| catalogTable | ||
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * A command that loads data into a Hive table. | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2178,4 +2178,126 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { | |
| } | ||
| } | ||
| } | ||
|
|
||
| val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv") | ||
|
|
||
| supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider => | ||
| test(s"alter datasource table add columns - $provider") { | ||
| withTable("t1") { | ||
| sql(s"CREATE TABLE t1 (c1 int) USING $provider") | ||
| sql("INSERT INTO t1 VALUES (1)") | ||
| sql("ALTER TABLE t1 ADD COLUMNS (c2 int)") | ||
| checkAnswer( | ||
| spark.table("t1"), | ||
| Seq(Row(1, null)) | ||
| ) | ||
| checkAnswer( | ||
| sql("SELECT * FROM t1 WHERE c2 is null"), | ||
| Seq(Row(1, null)) | ||
| ) | ||
|
|
||
| sql("INSERT INTO t1 VALUES (3, 2)") | ||
| checkAnswer( | ||
| sql("SELECT * FROM t1 WHERE c2 = 2"), | ||
| Seq(Row(3, 2)) | ||
| ) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider => | ||
| test(s"alter datasource table add columns - partitioned - $provider") { | ||
| withTable("t1") { | ||
| sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)") | ||
| sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)") | ||
| sql("ALTER TABLE t1 ADD COLUMNS (c3 int)") | ||
| checkAnswer( | ||
| spark.table("t1"), | ||
| Seq(Row(1, null, 2)) | ||
| ) | ||
| checkAnswer( | ||
| sql("SELECT * FROM t1 WHERE c3 is null"), | ||
| Seq(Row(1, null, 2)) | ||
| ) | ||
| sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)") | ||
| checkAnswer( | ||
| sql("SELECT * FROM t1 WHERE c3 = 3"), | ||
| Seq(Row(2, 3, 1)) | ||
| ) | ||
| checkAnswer( | ||
| sql("SELECT * FROM t1 WHERE c2 = 1"), | ||
| Seq(Row(2, 3, 1)) | ||
| ) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("alter datasource table add columns - text format not supported") { | ||
| withTable("t1") { | ||
| sql("CREATE TABLE t1 (c1 int) USING text") | ||
| val e = intercept[AnalysisException] { | ||
| sql("ALTER TABLE t1 ADD COLUMNS (c2 int)") | ||
| }.getMessage | ||
| assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type")) | ||
| } | ||
| } | ||
|
|
||
| test("alter table add columns -- not support temp view") { | ||
| withTempView("tmp_v") { | ||
| sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2") | ||
| val e = intercept[AnalysisException] { | ||
| sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)") | ||
| } | ||
| assert(e.message.contains("ALTER ADD COLUMNS does not support views")) | ||
| } | ||
| } | ||
|
|
||
| test("alter table add columns -- not support view") { | ||
| withView("v1") { | ||
| sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2") | ||
| val e = intercept[AnalysisException] { | ||
| sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)") | ||
| } | ||
| assert(e.message.contains("ALTER ADD COLUMNS does not support views")) | ||
| } | ||
| } | ||
|
|
||
| test("alter table add columns with existing column name") { | ||
| withTable("t1") { | ||
| sql("CREATE TABLE t1 (c1 int) USING PARQUET") | ||
| val e = intercept[AnalysisException] { | ||
| sql("ALTER TABLE t1 ADD COLUMNS (c1 string)") | ||
| }.getMessage | ||
| assert(e.contains("Found duplicate column(s)")) | ||
| } | ||
| } | ||
|
|
||
| Seq(true, false).foreach { caseSensitive => | ||
| test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") { | ||
| withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") { | ||
| withTable("t1") { | ||
| sql("CREATE TABLE t1 (c1 int) USING PARQUET") | ||
| if (!caseSensitive) { | ||
| val e = intercept[AnalysisException] { | ||
| sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") | ||
| }.getMessage | ||
| assert(e.contains("Found duplicate column(s)")) | ||
| } else { | ||
| if (isUsingHiveMetastore) { | ||
| // hive catalog will still complains that c1 is duplicate column name because hive | ||
| // identifiers are case insensitive. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually we can fix this, as we store the schema in table properties. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan I just tested the data source table, like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah right, for hive, we can only make it case-preserving, not case-sensitive, I was wrong |
||
| val e = intercept[AnalysisException] { | ||
| sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") | ||
| }.getMessage | ||
| assert(e.contains("HiveException")) | ||
| } else { | ||
| sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") | ||
| assert(spark.table("t1").schema | ||
| .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move these functions to Line 164