Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
52ca902
alter_add_col: initial changes
xwu0226 Nov 21, 2016
f498fa6
add testcases
xwu0226 Dec 1, 2016
522443e
negative testcases
xwu0226 Dec 1, 2016
1af2654
remove non-support testcase
xwu0226 Dec 5, 2016
ec57ee9
fix testcase
xwu0226 Dec 5, 2016
ec74849
update testcases
xwu0226 Dec 7, 2016
8fca889
update testcases
xwu0226 Dec 7, 2016
4a17529
update testcases
xwu0226 Jan 13, 2017
9699128
comments for command caseclass
xwu0226 Jan 20, 2017
9860e5c
udate comments based on review
xwu0226 Jan 21, 2017
dfff364
SPARK-19261: update to support datasource table and add new testcases
xwu0226 Feb 3, 2017
9f23254
remove workaournd for parquet issue since parquet-1.8.2 is now supported
xwu0226 Feb 4, 2017
180092f
SPARK-19261: using white list for datasource table types that support…
xwu0226 Feb 7, 2017
5a8aa80
fix code style
xwu0226 Feb 7, 2017
d3860e6
fix coding style
xwu0226 Feb 7, 2017
55577aa
update upon review
xwu0226 Feb 24, 2017
6fa913a
refactor code from alterTable function
xwu0226 Feb 25, 2017
7231efe
rebase and resolve conflict
xwu0226 Mar 6, 2017
e4e9ecf
resolve conflicts
xwu0226 Mar 9, 2017
75e7441
using ExternalCatalog.alterTableSchema
xwu0226 Mar 14, 2017
9847030
add InMemoryCatalog testcases
xwu0226 Mar 15, 2017
1a383bb
revert change in HiveExernalCatalog.scala
xwu0226 Mar 15, 2017
f994ce9
update upon review
xwu0226 Mar 16, 2017
5bf7360
add checking for duplicate column names
xwu0226 Mar 16, 2017
599c45e
add case sensativity for duplicate name checking and new testcases
xwu0226 Mar 16, 2017
b3edfea
typo
xwu0226 Mar 16, 2017
7d8a515
resolve conflicts and modify testcases
xwu0226 Mar 17, 2017
e895278
update testcases
xwu0226 Mar 17, 2017
e171ac4
move checkduplicate and schema arrangement to SessionCatalog.alterTab…
xwu0226 Mar 17, 2017
4391edd
change SessionCatalog.alterTableAddColumn back to alterTableSchema
xwu0226 Mar 18, 2017
a3fef12
update upon review comments
xwu0226 Mar 18, 2017
1eb7cd3
some minor updates upon review comments
xwu0226 Mar 19, 2017
04ce8f4
update based on review
xwu0226 Mar 21, 2017
7d8437d
update on minor comments
xwu0226 Mar 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ statement
LIKE source=tableIdentifier locationSpec? #createTableLike
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq)? #analyze
| ALTER TABLE tableIdentifier
ADD COLUMNS '(' columns=colTypeList ')' #addTableColumns
| ALTER (TABLE | VIEW) from=tableIdentifier
RENAME TO to=tableIdentifier #renameTable
| ALTER (TABLE | VIEW) tableIdentifier
Expand Down Expand Up @@ -198,7 +200,6 @@ unsupportedHiveNativeCommands
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
| kw1=START kw2=TRANSACTION
| kw1=COMMIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.types.{StructField, StructType}

object SessionCatalog {
val DEFAULT_DATABASE = "default"
Expand Down Expand Up @@ -161,6 +162,20 @@ class SessionCatalog(
throw new TableAlreadyExistsException(db = db, table = name.table)
}
}

private def checkDuplication(fields: Seq[StructField]): Unit = {
val columnNames = if (conf.caseSensitiveAnalysis) {
fields.map(_.name)
} else {
fields.map(_.name.toLowerCase)
}
if (columnNames.distinct.length != columnNames.length) {
val duplicateColumns = columnNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => x
}
throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
}
}
// ----------------------------------------------------------------------------
// Databases
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -295,6 +310,47 @@ class SessionCatalog(
externalCatalog.alterTable(newTableDefinition)
}

/**
* Alter the schema of a table identified by the provided table identifier. The new schema
* should still contain the existing bucket columns and partition columns used by the table. This
* method will also update any Spark SQL-related parameters stored as Hive table properties (such
* as the schema itself).
*
* @param identifier TableIdentifier
* @param newSchema Updated schema to be used for the table (must contain existing partition and
* bucket columns, and partition columns need to be at the end)
*/
def alterTableSchema(
identifier: TableIdentifier,
newSchema: StructType): Unit = {
val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(identifier.table)
val tableIdentifier = TableIdentifier(table, Some(db))
requireDbExists(db)
requireTableExists(tableIdentifier)
checkDuplication(newSchema)

val catalogTable = externalCatalog.getTable(db, table)
val oldSchema = catalogTable.schema

// not supporting dropping columns yet
val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
if (nonExistentColumnNames.nonEmpty) {
throw new AnalysisException(
s"""
|Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
|not present in the new schema. We don't support dropping columns yet.
""".stripMargin)
}

// assuming the newSchema has all partition columns at the end as required
externalCatalog.alterTableSchema(db, table, newSchema)
}

private def columnNameResolved(schema: StructType, colName: String): Boolean = {
schema.fields.map(_.name).exists(conf.resolver(_, colName))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these functions to Line 164


/**
* Return whether a table/view with the specified name exists. If no database is specified, check
* with current database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
import org.apache.spark.sql.types._

class InMemorySessionCatalogSuite extends SessionCatalogSuite {
protected val utils = new CatalogTestUtils {
Expand Down Expand Up @@ -450,6 +451,34 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}

test("alter table add columns") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a negative test case for dropping columns, although we do not support it now.

withBasicCatalog { sessionCatalog =>
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
sessionCatalog.alterTableSchema(
TableIdentifier("t1", Some("default")),
StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema))

val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
// construct the expected table schema
val expectedTableSchema = StructType(oldTab.dataSchema.fields ++
Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)
assert(newTab.schema == expectedTableSchema)
}
}

test("alter table drop columns") {
withBasicCatalog { sessionCatalog =>
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
val e = intercept[AnalysisException] {
sessionCatalog.alterTableSchema(
TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1)))
}.getMessage
assert(e.contains("We don't support dropping columns yet."))
}
}

test("get table") {
withBasicCatalog { catalog =>
assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.VIEW != null)
}

/**
* Create a [[AlterTableAddColumnsCommand]] command.
*
* For example:
* {{{
* ALTER TABLE table1
* ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
* }}}
*/
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
AlterTableAddColumnsCommand(
visitTableIdentifier(ctx.tableIdentifier),
visitColTypeList(ctx.columns)
)
}

/**
* Create an [[AlterTableSetPropertiesCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -174,6 +177,77 @@ case class AlterTableRenameCommand(

}

/**
* A command that add columns to a table
* The syntax of using this command in SQL is:
* {{{
* ALTER TABLE table_identifier
* ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is better. Please copy this to SparkSqlParser.scala

* }}}
*/
case class AlterTableAddColumnsCommand(
table: TableIdentifier,
columns: Seq[StructField]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val catalogTable = verifyAlterTableAddColumn(catalog, table)

try {
sparkSession.catalog.uncacheTable(table.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
}
catalog.refreshTable(table)

// make sure any partition columns are at the end of the fields
val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
catalog.alterTableSchema(
table, catalogTable.schema.copy(fields = reorderedSchema.toArray))

Seq.empty[Row]
}

/**
* ALTER TABLE ADD COLUMNS command does not support temporary view/table,
* view, or datasource table with text, orc formats or external provider.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to explain what are supported too.

* For datasource table, it currently only supports parquet, json, csv.
*/
private def verifyAlterTableAddColumn(
catalog: SessionCatalog,
table: TableIdentifier): CatalogTable = {
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)

if (catalogTable.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"""
|ALTER ADD COLUMNS does not support views.
|You must drop and re-create the views for adding the new columns. Views: $table
""".stripMargin)
}

if (DDLUtils.isDatasourceTable(catalogTable)) {
DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
// For datasource table, this command can only support the following File format.
// TextFileFormat only default to one column "value"
// OrcFileFormat can not handle difference between user-specified schema and
// inferred schema yet. TODO, once this issue is resolved , we can add Orc back.
// Hive type is already considered as hive serde table, so the logic will not
// come in here.
case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat =>
case s =>
throw new AnalysisException(
s"""
|ALTER ADD COLUMNS does not support datasource table with type $s.
|You must drop and re-create the table for adding the new columns. Tables: $table
""".stripMargin)
}
}
catalogTable
}
}


/**
* A command that loads data into a Hive table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,13 +780,7 @@ class DDLCommandSuite extends PlanTest {
assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES")
}

test("alter table: add/replace columns (not allowed)") {
assertUnsupported(
"""
|ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
|ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
|COMMENT 'test_comment2') CASCADE
""".stripMargin)
test("alter table: replace columns (not allowed)") {
assertUnsupported(
"""
|ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2178,4 +2178,126 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}

val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv")

supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
test(s"alter datasource table add columns - $provider") {
withTable("t1") {
sql(s"CREATE TABLE t1 (c1 int) USING $provider")
sql("INSERT INTO t1 VALUES (1)")
sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
checkAnswer(
spark.table("t1"),
Seq(Row(1, null))
)
checkAnswer(
sql("SELECT * FROM t1 WHERE c2 is null"),
Seq(Row(1, null))
)

sql("INSERT INTO t1 VALUES (3, 2)")
checkAnswer(
sql("SELECT * FROM t1 WHERE c2 = 2"),
Seq(Row(3, 2))
)
}
}
}

supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
test(s"alter datasource table add columns - partitioned - $provider") {
withTable("t1") {
sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
checkAnswer(
spark.table("t1"),
Seq(Row(1, null, 2))
)
checkAnswer(
sql("SELECT * FROM t1 WHERE c3 is null"),
Seq(Row(1, null, 2))
)
sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
checkAnswer(
sql("SELECT * FROM t1 WHERE c3 = 3"),
Seq(Row(2, 3, 1))
)
checkAnswer(
sql("SELECT * FROM t1 WHERE c2 = 1"),
Seq(Row(2, 3, 1))
)
}
}
}

test("alter datasource table add columns - text format not supported") {
withTable("t1") {
sql("CREATE TABLE t1 (c1 int) USING text")
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
}.getMessage
assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type"))
}
}

test("alter table add columns -- not support temp view") {
withTempView("tmp_v") {
sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2")
val e = intercept[AnalysisException] {
sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)")
}
assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
}
}

test("alter table add columns -- not support view") {
withView("v1") {
sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2")
val e = intercept[AnalysisException] {
sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
}
assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
}
}

test("alter table add columns with existing column name") {
withTable("t1") {
sql("CREATE TABLE t1 (c1 int) USING PARQUET")
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (c1 string)")
}.getMessage
assert(e.contains("Found duplicate column(s)"))
}
}

Seq(true, false).foreach { caseSensitive =>
test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
withTable("t1") {
sql("CREATE TABLE t1 (c1 int) USING PARQUET")
if (!caseSensitive) {
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
}.getMessage
assert(e.contains("Found duplicate column(s)"))
} else {
if (isUsingHiveMetastore) {
// hive catalog will still complains that c1 is duplicate column name because hive
// identifiers are case insensitive.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we can fix this, as we store the schema in table properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I just tested the data source table, like create table t1 (c1 int, C1 int) using parquet with spark.sql.caseSensitive = true, spark sql does not complain.. it just bounce back the exception from hive, but logged as WARN message. And the table was created successfully and I am able to insert and select. But if i create a hive serde table with create table t2 (c1 int, C1 int) stored as parquet, hive will complain and fail to create the table. So for the data source case, should we fix anything regarding the WARN message? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right, for hive, we can only make it case-preserving, not case-sensitive, I was wrong

val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
}.getMessage
assert(e.contains("HiveException"))
} else {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
}
}
}
}
}
}
}
Loading