From 119260f5480b75b8ede51336ab64a1e141b9e9b1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 17 Nov 2020 03:31:21 +0800 Subject: [PATCH 1/3] address review comments --- .../sql/catalyst/parser/AstBuilder.scala | 124 +++++---- .../catalyst/plans/logical/statements.scala | 24 +- .../sql/connector/catalog/CatalogV2Util.scala | 9 +- .../sql/catalyst/parser/DDLParserSuite.scala | 19 ++ .../analysis/ResolveSessionCatalog.scala | 256 +++++++----------- .../spark/sql/execution/SparkSqlParser.scala | 119 +++----- .../datasources/v2/V2SessionCatalog.scala | 8 +- 7 files changed, 251 insertions(+), 308 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index ac3228d49cea9..508846742a382 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2446,7 +2446,18 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging /** * Type to keep track of table clauses: - * (partTransforms, partCols, bucketSpec, properties, options, location, comment, serde). + * - partition transforms + * - partition columns + * - bucketSpec + * - properties + * - options + * - location + * - comment + * - serde + * + * Note: Partition transforms are based on existing table schema definition. It can be simple + * column names, or functions like `year(date_col)`. Partition columns are column names with data + * types like `i INT`, which should be appended to the existing table schema. */ type TableClauses = ( Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String], @@ -2802,8 +2813,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * [NULL DEFINED AS char] * }}} */ - def visitRowFormat( - ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) { + def visitRowFormat(ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) { ctx match { case serde: RowFormatSerdeContext => visitRowFormatSerde(serde) case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited) @@ -2923,16 +2933,19 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val location = visitLocationSpecList(ctx.locationSpec()) val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location) val comment = visitCommentSpecList(ctx.commentSpec()) - - validateRowFormatFileFormat( - ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx) - val fileFormatSerdeInfo = ctx.createFileFormat.asScala.map(visitCreateFileFormat) - val rowFormatSerdeInfo = ctx.rowFormat.asScala.map(visitRowFormat) - val serdeInfo = - (fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((x, y) => x.merge(y)) - + val serdeInfo = getSerdeInfo(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx) (partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, newLocation, comment, - serdeInfo) + serdeInfo) + } + + protected def getSerdeInfo( + rowFormatCtx: Seq[RowFormatContext], + createFileFormatCtx: Seq[CreateFileFormatContext], + ctx: ParserRuleContext): Option[SerdeInfo] = { + validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx) + val rowFormatSerdeInfo = rowFormatCtx.map(visitRowFormat) + val fileFormatSerdeInfo = createFileFormatCtx.map(visitCreateFileFormat) + (fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption(_ merge _) } private def partitionExpressions( @@ -2943,8 +2956,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (partCols.nonEmpty) { val references = partTransforms.map(_.describe()).mkString(", ") val columns = partCols - .map(field => s"${field.name} ${field.dataType.simpleString}") - .mkString(", ") + .map(field => s"${field.name} ${field.dataType.simpleString}") + .mkString(", ") operationNotAllowed( s"""PARTITION BY: Cannot mix partition expressions and partition columns: |Expressions: $references @@ -2966,12 +2979,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Expected format: * {{{ * CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name - * USING table_provider + * [USING table_provider] * create_table_clauses * [[AS] select_statement]; * * create_table_clauses (order insensitive): - * partition_clauses + * [PARTITIONED BY (partition_fields)] * [OPTIONS table_property_list] * [ROW FORMAT row_format] * [STORED AS file_format] @@ -2982,15 +2995,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * [LOCATION path] * [COMMENT table_comment] * [TBLPROPERTIES (property_name=property_value, ...)] - * partition_clauses: - * [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)] | - * [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)] + * + * partition_fields: + * col_name, transform(col_name), transform(constant, col_name), ... | + * col_name data_type [NOT NULL] [COMMENT col_comment], ... * }}} */ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) - val columns = Option(ctx.colTypeList()).map(visitColTypeList) + val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil) val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText) val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) = visitCreateTableClauses(ctx.createTableClauses()) @@ -2999,37 +3013,34 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx) } - val schema = columns - .map(dataCols => StructType(dataCols ++ partCols)) - .getOrElse(StructType(partCols)) + if (temp) { + operationNotAllowed("CREATE TEMPORARY TABLE is not supported yet. " + + "Please use CREATE TEMPORARY VIEW as an alternative.", ctx) + } + val partitioning = partitionExpressions(partTransforms, partCols, ctx) Option(ctx.query).map(plan) match { - case Some(_) if temp => - operationNotAllowed( - "CREATE TEMPORARY TABLE ... AS ..., use CREATE TEMPORARY VIEW instead", - ctx) - - case Some(_) if columns.isDefined => + case Some(_) if columns.nonEmpty => operationNotAllowed( "Schema may not be specified in a Create Table As Select (CTAS) statement", ctx) case Some(_) if partCols.nonEmpty => // non-reference partition columns are not allowed because schema can't be specified - operationNotAllowed( - "Partition column types may not be specified in Create Table As Select (CTAS)", - ctx) + val errorMessage = "Create Partitioned Table As Select cannot specify data type for " + + "the partition columns of the target table." + operationNotAllowed(errorMessage, ctx) case Some(query) => CreateTableAsSelectStatement( table, query, partitioning, bucketSpec, properties, provider, options, location, comment, writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists) - case None if temp => - operationNotAllowed("CREATE TEMPORARY TABLE", ctx) - case _ => + // Note: table schema includes both the table columns list and the partition columns + // with data type. + val schema = StructType(columns ++ partCols) CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider, options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists) } @@ -3041,13 +3052,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Expected format: * {{{ * [CREATE OR] REPLACE TABLE [db_name.]table_name - * USING table_provider + * [USING table_provider] * replace_table_clauses * [[AS] select_statement]; * * replace_table_clauses (order insensitive): * [OPTIONS table_property_list] - * [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)] + * [PARTITIONED BY (partition_fields)] * [CLUSTERED BY (col_name, col_name, ...) * [SORTED BY (col_name [ASC|DESC], ...)] * INTO num_buckets BUCKETS @@ -3055,47 +3066,38 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * [LOCATION path] * [COMMENT table_comment] * [TBLPROPERTIES (property_name=property_value, ...)] + * + * partition_fields: + * col_name, transform(col_name), transform(constant, col_name), ... | + * col_name data_type [NOT NULL] [COMMENT col_comment], ... * }}} */ override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitReplaceTableHeader(ctx.replaceTableHeader) - if (temp) { - operationNotAllowed( - "CREATE OR REPLACE TEMPORARY TABLE ..., use CREATE TEMPORARY VIEW instead", - ctx) - } - - if (external) { - operationNotAllowed("REPLACE EXTERNAL TABLE ...", ctx) - } - - if (ifNotExists) { - operationNotAllowed("REPLACE ... IF NOT EXISTS, use CREATE IF NOT EXISTS instead", ctx) - } - + assert(!temp && !ifNotExists && !external) val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) = visitCreateTableClauses(ctx.createTableClauses()) - val columns = Option(ctx.colTypeList()).map(visitColTypeList) + val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil) val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText) if (provider.isDefined && serdeInfo.isDefined) { operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx) } - val schema = columns.map(dataCols => StructType(dataCols ++ partCols)) val partitioning = partitionExpressions(partTransforms, partCols, ctx) val orCreate = ctx.replaceTableHeader().CREATE() != null Option(ctx.query).map(plan) match { - case Some(_) if schema.isDefined => + case Some(_) if columns.nonEmpty => operationNotAllowed( "Schema may not be specified in a Replace Table As Select (RTAS) statement", ctx) case Some(_) if partCols.nonEmpty => - operationNotAllowed( - "Partition column types may not be specified in Replace Table As Select (RTAS)", - ctx) + // non-reference partition columns are not allowed because schema can't be specified + val errorMessage = "Replace Partitioned Table As Select cannot specify data type for " + + "the partition columns of the target table." + operationNotAllowed(errorMessage, ctx) case Some(query) => ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties, @@ -3103,9 +3105,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging orCreate = orCreate) case _ => - ReplaceTableStatement(table, schema.getOrElse(new StructType), partitioning, - bucketSpec, properties, provider, options, location, comment, serdeInfo, - orCreate = orCreate) + // Note: table schema includes both the table columns list and the partition columns + // with data type. + val schema = StructType(columns ++ partCols) + ReplaceTableStatement(table, schema, partitioning, bucketSpec, properties, provider, + options, location, comment, serdeInfo, orCreate = orCreate) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index e6186946e7d16..2d7111dd75606 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.ViewType import org.apache.spark.sql.catalyst.catalog.{BucketSpec, FunctionResource} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -63,11 +64,11 @@ case class SerdeInfo( serdeProperties: Map[String, String] = Map.empty) { // this uses assertions because validation is done in validateRowFormatFileFormat etc. assert(storedAs.isEmpty || formatClasses.isEmpty, - s"Conflicting STORED AS $storedAs and INPUTFORMAT/OUTPUTFORMAT $formatClasses values") + "Cannot specify both STORED AS and INPUTFORMAT/OUTPUTFORMAT") def describe: String = { val serdeString = if (serde.isDefined || serdeProperties.nonEmpty) { - "ROW FORMAT" + serde.map(sd => s" SERDE $sd").getOrElse(" DELIMITED") + "ROW FORMAT " + serde.map(sd => s"SERDE $sd").getOrElse("DELIMITED") } else { "" } @@ -76,7 +77,7 @@ case class SerdeInfo( case SerdeInfo(Some(format), _, _, _) => s"STORED AS $format $serdeString" case SerdeInfo(_, Some((inFormat, outFormat)), _, _) => - s"INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat $serdeString" + s"STORED AS INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat $serdeString" case _ => serdeString } @@ -85,7 +86,7 @@ case class SerdeInfo( def merge(other: SerdeInfo): SerdeInfo = { def getOnly[T](desc: String, left: Option[T], right: Option[T]): Option[T] = { (left, right) match { - case (Some(l), Some(r)) if l != r => + case (Some(l), Some(r)) => assert(l == r, s"Conflicting $desc values: $l != $r") left case (Some(_), _) => @@ -97,6 +98,7 @@ case class SerdeInfo( } } + SerdeInfo.checkSerdePropMerging(serdeProperties, other.serdeProperties) SerdeInfo( getOnly("STORED AS", storedAs, other.storedAs), getOnly("INPUTFORMAT/OUTPUTFORMAT", formatClasses, other.formatClasses), @@ -106,8 +108,18 @@ case class SerdeInfo( } object SerdeInfo { - val empty: SerdeInfo = { - SerdeInfo(None, None, None, Map.empty) + val empty: SerdeInfo = SerdeInfo(None, None, None, Map.empty) + + def checkSerdePropMerging( + props1: Map[String, String], props2: Map[String, String]): Unit = { + if (props1.keySet.intersect(props2.keySet).nonEmpty) { + throw new UnsupportedOperationException( + s""" + |Cannot safely merge SERDEPROPERTIES: + |${props1.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")} + |${props2.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")} + |""".stripMargin) + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 45bb5c6d46029..f935ff78d0725 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -325,12 +325,19 @@ private[sql] object CatalogV2Util { options ++ // to make the transition to the "option." prefix easier, add both options.map { case (key, value) => TableCatalog.OPTION_PREFIX + key -> value } ++ convertToProperties(serdeInfo) ++ - (if (external) Map(TableCatalog.PROP_EXTERNAL -> "true") else Map.empty) ++ + (if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++ provider.map(TableCatalog.PROP_PROVIDER -> _) ++ comment.map(TableCatalog.PROP_COMMENT -> _) ++ location.map(TableCatalog.PROP_LOCATION -> _) } + /** + * Converts Hive Serde info to table properties. The mapped property keys are: + * - INPUTFORMAT/OUTPUTFORMAT: hive.input/output-format + * - STORED AS: hive.stored-as + * - ROW FORMAT SERDE: hive.serde + * - SERDEPROPERTIES: add "option." prefix + */ private def convertToProperties(serdeInfo: Option[SerdeInfo]): Map[String, String] = { serdeInfo match { case Some(s) => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index fc1ec1fb834f8..1f74c6699336f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -290,6 +290,25 @@ class DDLParserSuite extends AnalysisTest { } } + test("create/replace table - empty columns list") { + val createSql = "CREATE TABLE my_tab PARTITIONED BY (part string)" + val replaceSql = "REPLACE TABLE my_tab PARTITIONED BY (part string)" + val expectedTableSpec = TableSpec( + Seq("my_tab"), + Some(new StructType().add("part", StringType)), + Seq(IdentityTransform(FieldReference("part"))), + None, + Map.empty[String, String], + None, + Map.empty[String, String], + None, + None, + None) + Seq(createSql, replaceSql).foreach { sql => + testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) + } + } + test("create/replace table - using with partition column definitions") { val createSql = "CREATE TABLE my_tab (id bigint) USING parquet PARTITIONED BY (part string)" val replaceSql = "REPLACE TABLE my_tab (id bigint) USING parquet PARTITIONED BY (part string)" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index e21a2c0809e9b..fe63af47d5a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -276,21 +276,27 @@ class ResolveSessionCatalog( case c @ CreateTableStatement( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => assertNoNullTypeInSchema(c.tableSchema) - buildV1Table(tbl.asTableIdentifier, c) match { - case Some(tableDesc) => - val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTable(tableDesc, mode, None) - - case None => + val (storageFormat, provider) = getStorageFormatAndProvider( + c.provider, c.options, c.location, c.serde, ctas = false) + if (!isV2Provider(provider)) { + if (!DDLUtils.isHiveTable(Some(provider))) { assertNoCharTypeInSchema(c.tableSchema) - CreateV2Table( - catalog.asTableCatalog, - tbl.asIdentifier, - c.tableSchema, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - convertTableProperties(c), - ignoreIfExists = c.ifNotExists) + } + val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema, + c.partitioning, c.bucketSpec, c.properties, provider, c.location, + c.comment, storageFormat, c.external) + val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTable(tableDesc, mode, None) + } else { + assertNoCharTypeInSchema(c.tableSchema) + CreateV2Table( + catalog.asTableCatalog, + tbl.asIdentifier, + c.tableSchema, + // convert the bucket spec and add it as a transform + c.partitioning ++ c.bucketSpec.map(_.asTransform), + convertTableProperties(c), + ignoreIfExists = c.ifNotExists) } case c @ CreateTableAsSelectStatement( @@ -298,22 +304,25 @@ class ResolveSessionCatalog( if (c.asSelect.resolved) { assertNoNullTypeInSchema(c.asSelect.schema) } - buildV1Table(tbl.asTableIdentifier, c) match { - case Some(tableDesc) => - val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTable(tableDesc, mode, Some(c.asSelect)) - - case None => - assertNoCharTypeInSchema(c.schema) - CreateTableAsSelect( - catalog.asTableCatalog, - tbl.asIdentifier, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - c.asSelect, - convertTableProperties(c), - writeOptions = c.writeOptions, - ignoreIfExists = c.ifNotExists) + val (storageFormat, provider) = getStorageFormatAndProvider( + c.provider, c.options, c.location, c.serde, ctas = false) + if (!isV2Provider(provider)) { + val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType, + c.partitioning, c.bucketSpec, c.properties, provider, c.location, + c.comment, storageFormat, c.external) + val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTable(tableDesc, mode, Some(c.asSelect)) + } else { + assertNoCharTypeInSchema(c.schema) + CreateTableAsSelect( + catalog.asTableCatalog, + tbl.asIdentifier, + // convert the bucket spec and add it as a transform + c.partitioning ++ c.bucketSpec.map(_.asTransform), + c.asSelect, + convertTableProperties(c), + writeOptions = c.writeOptions, + ignoreIfExists = c.ifNotExists) } case RefreshTable(r @ ResolvedTable(_, _, _: V1Table)) if isSessionCatalog(r.catalog) => @@ -635,150 +644,78 @@ class ResolveSessionCatalog( case _ => throw new AnalysisException(s"$sql is only supported with temp views or v1 tables.") } - private def buildV1Table( - ident: TableIdentifier, - c: CreateTableAsSelectStatement): Option[CatalogTable] = { - val schema = new StructType - (c.provider, c.serde) match { - case (Some(provider), Some(serde)) => - throw new AnalysisException( - s"Cannot create table with both USING $provider and ${serde.describe}") - - case (None, Some(serde)) => - Some(buildHiveCatalogTable( - ident, schema, c.partitioning, c.bucketSpec, c.properties, serde, c.options, c.location, - c.comment, c.external)) - - case (None, None) if !conf.convertCTAS => - Some(buildHiveCatalogTable( - ident, schema, c.partitioning, c.bucketSpec, c.properties, SerdeInfo.empty, c.options, - c.location, c.comment, c.external)) - - case (Some(provider), None) if !isV2Provider(provider) => - Some(buildCatalogTable( - ident, schema, c.partitioning, c.bucketSpec, c.properties, provider, c.options, - c.location, c.comment, c.external)) - - case (None, None) if !isV2Provider(conf.defaultDataSourceName) => - Some(buildCatalogTable( - ident, schema, c.partitioning, c.bucketSpec, c.properties, conf.defaultDataSourceName, - c.options, c.location, c.comment, c.external)) - - case _ => - None - } - } - - private def buildV1Table( - ident: TableIdentifier, - c: CreateTableStatement): Option[CatalogTable] = { - val schema = c.tableSchema - (c.provider, c.serde) match { - case (Some(provider), Some(serde)) => - throw new AnalysisException( - s"Cannot create table with both USING $provider and ${serde.describe}") - - case (None, Some(serde)) => - Some(buildHiveCatalogTable( - ident, schema, c.partitioning, c.bucketSpec, c.properties, serde, c.options, c.location, - c.comment, c.external)) - - case (None, None) => - Some(buildHiveCatalogTable( - ident, schema, c.partitioning, c.bucketSpec, c.properties, SerdeInfo.empty, c.options, - c.location, c.comment, c.external)) - - case (Some(provider), None) if !isV2Provider(provider) => - Some(buildCatalogTable( - ident, schema, c.partitioning, c.bucketSpec, c.properties, provider, c.options, - c.location, c.comment, c.external)) - - case _ => - None - } - } - - private def buildCatalogTable( - table: TableIdentifier, - schema: StructType, - partitioning: Seq[Transform], - bucketSpec: Option[BucketSpec], - properties: Map[String, String], - provider: String, + private def getStorageFormatAndProvider( + provider: Option[String], options: Map[String, String], location: Option[String], - comment: Option[String], - external: Boolean): CatalogTable = { - // Hive types are allowed if the provider is "hive" - if (!DDLUtils.isHiveTable(Some(provider))) { - assertNoCharTypeInSchema(schema) - } - - if (external) { - throw new AnalysisException(s"Operation not allowed: CREATE EXTERNAL TABLE ... USING") - } - - val storage = CatalogStorageFormat.empty.copy( + maybeSerdeInfo: Option[SerdeInfo], + ctas: Boolean): (CatalogStorageFormat, String) = { + val nonHiveStorageFormat = CatalogStorageFormat.empty.copy( + locationUri = location.map(CatalogUtils.stringToURI), + properties = options) + val defaultHiveStorage = HiveSerDe.getDefaultStorage(conf).copy( locationUri = location.map(CatalogUtils.stringToURI), properties = options) - val tableType = if (location.isDefined) { - CatalogTableType.EXTERNAL + if (provider.isDefined) { + // The parser guarantees that USING and STORED AS/ROW FORMAT won't co-exist. + assert(maybeSerdeInfo.isEmpty) + nonHiveStorageFormat -> provider.get + } else if (maybeSerdeInfo.isDefined) { + val serdeInfo = maybeSerdeInfo.get + SerdeInfo.checkSerdePropMerging(serdeInfo.serdeProperties, defaultHiveStorage.properties) + val storageFormat = if (serdeInfo.storedAs.isDefined) { + // If `STORED AS fileFormat` is used, infer inputFormat, outputFormat and serde from it. + HiveSerDe.sourceToSerDe(serdeInfo.storedAs.get) match { + case Some(hiveSerde) => + defaultHiveStorage.copy( + inputFormat = hiveSerde.inputFormat.orElse(defaultHiveStorage.inputFormat), + outputFormat = hiveSerde.outputFormat.orElse(defaultHiveStorage.outputFormat), + // User specified serde takes precedence over the one inferred from file format. + serde = serdeInfo.serde.orElse(hiveSerde.serde).orElse(defaultHiveStorage.serde), + properties = serdeInfo.serdeProperties ++ defaultHiveStorage.properties) + case _ => throw new AnalysisException( + s"STORED AS with file format '${serdeInfo.storedAs.get}' is invalid.") + } + } else { + defaultHiveStorage.copy( + inputFormat = serdeInfo.formatClasses.map(_._1).orElse(defaultHiveStorage.inputFormat), + outputFormat = serdeInfo.formatClasses.map(_._2).orElse(defaultHiveStorage.outputFormat), + serde = serdeInfo.serde.orElse(defaultHiveStorage.serde), + properties = serdeInfo.serdeProperties ++ defaultHiveStorage.properties) + } + storageFormat -> DDLUtils.HIVE_PROVIDER } else { - CatalogTableType.MANAGED + // If neither USING nor STORED AS/ROW FORMAT is specified, we create native data source + // tables if it's a CTAS and `conf.convertCTAS` is true. + // TODO: create native data source table by default for non-CTAS. + if (ctas && conf.convertCTAS) { + nonHiveStorageFormat -> conf.defaultDataSourceName + } else { + defaultHiveStorage -> DDLUtils.HIVE_PROVIDER + } } - - CatalogTable( - identifier = table, - tableType = tableType, - storage = storage, - schema = schema, - provider = Some(provider), - partitionColumnNames = partitioning.asPartitionColumns, - bucketSpec = bucketSpec, - properties = properties, - comment = comment) } - private def buildHiveCatalogTable( + private def buildCatalogTable( table: TableIdentifier, schema: StructType, partitioning: Seq[Transform], bucketSpec: Option[BucketSpec], properties: Map[String, String], - serdeInfo: SerdeInfo, - options: Map[String, String], + provider: String, location: Option[String], comment: Option[String], + storageFormat: CatalogStorageFormat, external: Boolean): CatalogTable = { - val defaultStorage = HiveSerDe.getDefaultStorage(conf) - val baseStorage = defaultStorage.copy( - locationUri = location.map(CatalogUtils.stringToURI), - serde = serdeInfo.serde.orElse(defaultStorage.serde), - properties = options ++ serdeInfo.serdeProperties) - - val storage = (serdeInfo.storedAs, serdeInfo.formatClasses) match { - case (Some(format), None) => - HiveSerDe.sourceToSerDe(format) match { - case Some(hiveSerDe) => - baseStorage.copy( - inputFormat = hiveSerDe.inputFormat, - outputFormat = hiveSerDe.outputFormat, - serde = serdeInfo.serde.orElse(hiveSerDe.serde)) - case _ => - baseStorage + if (external) { + if (DDLUtils.isHiveTable(Some(provider))) { + if (location.isEmpty) { + throw new AnalysisException(s"CREATE EXTERNAL TABLE must be accompanied by LOCATION") } - case (None, Some((inFormat, outFormat))) => - baseStorage.copy( - inputFormat = Some(inFormat), - outputFormat = Some(outFormat)) - - case _ => - baseStorage - } - - if (external && location.isEmpty) { - throw new AnalysisException(s"CREATE EXTERNAL TABLE must be accompanied by LOCATION") + } else { + throw new AnalysisException(s"Operation not allowed: CREATE EXTERNAL TABLE ... USING") + } } val tableType = if (location.isDefined) { @@ -790,9 +727,9 @@ class ResolveSessionCatalog( CatalogTable( identifier = table, tableType = tableType, - storage = storage, + storage = storageFormat, schema = schema, - provider = Some(DDLUtils.HIVE_PROVIDER), + provider = Some(provider), partitionColumnNames = partitioning.asPartitionColumns, bucketSpec = bucketSpec, properties = properties, @@ -847,6 +784,9 @@ class ResolveSessionCatalog( } private def isV2Provider(provider: String): Boolean = { + // Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to + // `HiveFileFormat`, when running tests in sql/core. + if (DDLUtils.isHiveTable(Some(provider))) return false DataSource.lookupDataSourceV2(provider, conf) match { // TODO(SPARK-28396): Currently file source v2 can't work with tables. case Some(_: FileDataSourceV2) => false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 13943290efdc8..204a079081934 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -363,6 +363,37 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } } + private def toStorageFormat( + location: Option[String], + maybeSerdeInfo: Option[SerdeInfo], + ctx: ParserRuleContext): CatalogStorageFormat = { + if (maybeSerdeInfo.isEmpty) { + CatalogStorageFormat.empty.copy(locationUri = location.map(CatalogUtils.stringToURI)) + } else { + val serdeInfo = maybeSerdeInfo.get + if (serdeInfo.storedAs.isEmpty) { + CatalogStorageFormat.empty.copy( + locationUri = location.map(CatalogUtils.stringToURI), + inputFormat = serdeInfo.formatClasses.map(_._1), + outputFormat = serdeInfo.formatClasses.map(_._1), + serde = serdeInfo.serde, + properties = serdeInfo.serdeProperties) + } else { + HiveSerDe.sourceToSerDe(serdeInfo.storedAs.get) match { + case Some(hiveSerde) => + CatalogStorageFormat.empty.copy( + locationUri = location.map(CatalogUtils.stringToURI), + inputFormat = hiveSerde.inputFormat, + outputFormat = hiveSerde.outputFormat, + serde = serdeInfo.serde.orElse(hiveSerde.serde), + properties = serdeInfo.serdeProperties) + case _ => + operationNotAllowed(s"STORED AS with file format '${serdeInfo.storedAs.get}'", ctx) + } + } + } + } + /** * Create a [[CreateTableLikeCommand]] command. * @@ -392,53 +423,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { val location = visitLocationSpecList(ctx.locationSpec()) // rowStorage used to determine CatalogStorageFormat.serde and // CatalogStorageFormat.properties in STORED AS clause. - val rowFormatSerdeInfo = ctx.rowFormat.asScala.map(visitRowFormat) - val fileFormatSerdeInfo = ctx.createFileFormat.asScala.map(visitCreateFileFormat) - val serdeInfo = - (fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((x, y) => x.merge(y)) + val serdeInfo = getSerdeInfo(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx) if (provider.isDefined && serdeInfo.isDefined) { - val sd = serdeInfo.get - if (sd.storedAs.isDefined) { - throw new ParseException("'STORED AS hiveFormats' and 'USING provider' " + - "should not be specified both", ctx) - } else if (sd.formatClasses.isDefined) { - throw new ParseException("'INPUTFORMAT hiveFormat' and 'USING provider' " + - "should not be specified both", ctx) - } else if (sd.serde.isDefined) { - throw new ParseException("'ROW FORMAT' must be used with 'STORED AS' not 'USING'", ctx) - } - } - - val (inputFormat, outputFormat, serde, serdeProperties) = serdeInfo match { - case Some(SerdeInfo(Some(storedAs), None, serde, props)) => - HiveSerDe.sourceToSerDe(storedAs) match { - case Some(hiveSerde) => - (hiveSerde.inputFormat, hiveSerde.outputFormat, serde.orElse(hiveSerde.serde), props) - case _ => - (None, None, None, Map.empty[String, String]) - } - - case Some(SerdeInfo(None, Some((inputFormat, outputFormat)), serde, props)) => - (Some(inputFormat), Some(outputFormat), serde, props) - - case Some(SerdeInfo(None, None, serde, props)) => - if (serde.isDefined) { - throw new ParseException("'ROW FORMAT' must be used with 'STORED AS'", ctx) - } - (None, None, serde, props) - - case _ => - (None, None, None, Map.empty[String, String]) + operationNotAllowed(s"CREATE TABLE LIKE ... USING ... ${serdeInfo.get.describe}", ctx) } - val storage = CatalogStorageFormat( - locationUri = location.map(CatalogUtils.stringToURI), - inputFormat = inputFormat, - outputFormat = outputFormat, - serde = serde, - compressed = false, - properties = serdeProperties) - + val storage = toStorageFormat(location, serdeInfo, ctx) val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) CreateTableLikeCommand( targetTable, sourceTable, storage, provider, properties, ctx.EXISTS != null) @@ -594,12 +584,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { */ override def visitInsertOverwriteHiveDir( ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) { - validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) - val rowFormatSerdeInfo = Option(ctx.rowFormat).map(visitRowFormat) - val fileFormatSerdeInfo = Option(ctx.createFileFormat).map(visitCreateFileFormat) - val serdeInfo = - (fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((x, y) => x.merge(y)) - + val serdeInfo = getSerdeInfo(Seq(ctx.rowFormat), Seq(ctx.createFileFormat), ctx) val path = string(ctx.path) // The path field is required if (path.isEmpty) { @@ -607,34 +592,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } val default = HiveSerDe.getDefaultStorage(conf) + val storage = toStorageFormat(Some(path), serdeInfo, ctx) + val finalStorage = storage.copy( + inputFormat = storage.inputFormat.orElse(default.inputFormat), + outputFormat = storage.outputFormat.orElse(default.outputFormat), + serde = storage.serde.orElse(default.serde)) - val (inputFormat, outputFormat, serde, serdeProperties) = serdeInfo match { - case Some(SerdeInfo(Some(storedAs), None, None, properties)) => - HiveSerDe.sourceToSerDe(storedAs) match { - case Some(hiveSerde) => - (hiveSerde.inputFormat, hiveSerde.outputFormat, hiveSerde.serde, properties) - case _ => - (default.inputFormat, default.outputFormat, default.serde, Map.empty[String, String]) - } - - case Some(SerdeInfo(None, Some((inputFormat, outputFormat)), serde, properties)) => - (Some(inputFormat), Some(outputFormat), serde.orElse(default.serde), properties) - - case Some(SerdeInfo(None, None, serde, properties)) => - (default.inputFormat, default.outputFormat, serde.orElse(default.serde), properties) - - case _ => - (default.inputFormat, default.outputFormat, default.serde, Map.empty[String, String]) - } - - val storage = CatalogStorageFormat( - locationUri = Some(CatalogUtils.stringToURI(path)), - inputFormat = inputFormat, - outputFormat = outputFormat, - serde = serde, - compressed = false, - properties = serdeProperties) - - (ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER)) + (ctx.LOCAL != null, finalStorage, Some(DDLUtils.HIVE_PROVIDER)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index d1c104427e344..ab6690ff23a3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -113,11 +113,9 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) } private def toOptions(properties: Map[String, String]): Map[String, String] = { - properties - .filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)) - .map { - case (key, value) => key.replaceFirst(TableCatalog.OPTION_PREFIX, "") -> value - }.toMap + properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map { + case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value + } } override def alterTable( From 20d22be3dae03254c2573c2a66ff9c3b888bfa3a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 20 Nov 2020 12:28:07 +0800 Subject: [PATCH 2/3] update --- .../sql/catalyst/parser/AstBuilder.scala | 30 +++++++++++++------ .../catalyst/plans/logical/statements.scala | 18 +++++++---- .../sql/connector/catalog/CatalogV2Util.scala | 11 +++---- .../sql/catalyst/parser/DDLParserSuite.scala | 6 ++-- .../analysis/ResolveSessionCatalog.scala | 19 +++++++----- .../spark/sql/execution/SparkSqlParser.scala | 4 +-- 6 files changed, 55 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 508846742a382..801a5770d0e38 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2784,7 +2784,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging (ctx.fileFormat, ctx.storageHandler) match { // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format case (c: TableFileFormatContext, null) => - SerdeInfo(formatClasses = Some((string(c.inFmt), string(c.outFmt)))) + SerdeInfo(formatClasses = Some(FormatClasses(string(c.inFmt), string(c.outFmt)))) // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO case (c: GenericFileFormatContext, null) => SerdeInfo(storedAs = Some(c.identifier.getText)) @@ -2945,7 +2945,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx) val rowFormatSerdeInfo = rowFormatCtx.map(visitRowFormat) val fileFormatSerdeInfo = createFileFormatCtx.map(visitCreateFileFormat) - (fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption(_ merge _) + (fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((l, r) => l.merge(r)) } private def partitionExpressions( @@ -3028,9 +3028,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case Some(_) if partCols.nonEmpty => // non-reference partition columns are not allowed because schema can't be specified - val errorMessage = "Create Partitioned Table As Select cannot specify data type for " + - "the partition columns of the target table." - operationNotAllowed(errorMessage, ctx) + operationNotAllowed( + "Partition column types may not be specified in Create Table As Select (CTAS)", + ctx) case Some(query) => CreateTableAsSelectStatement( @@ -3074,7 +3074,19 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging */ override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitReplaceTableHeader(ctx.replaceTableHeader) - assert(!temp && !ifNotExists && !external) + if (temp) { + operationNotAllowed("CREATE OR REPLACE TEMPORARY TABLE is not supported yet. " + + "Please use CREATE OR REPLACE TEMPORARY VIEW as an alternative.", ctx) + } + + if (external) { + operationNotAllowed("REPLACE EXTERNAL TABLE ...", ctx) + } + + if (ifNotExists) { + operationNotAllowed("REPLACE ... IF NOT EXISTS, use CREATE IF NOT EXISTS instead", ctx) + } + val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) = visitCreateTableClauses(ctx.createTableClauses()) val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil) @@ -3095,9 +3107,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case Some(_) if partCols.nonEmpty => // non-reference partition columns are not allowed because schema can't be specified - val errorMessage = "Replace Partitioned Table As Select cannot specify data type for " + - "the partition columns of the target table." - operationNotAllowed(errorMessage, ctx) + operationNotAllowed( + "Partition column types may not be specified in Replace Table As Select (RTAS)", + ctx) case Some(query) => ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 2d7111dd75606..37e46ad4f7e58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -59,7 +59,7 @@ abstract class ParsedStatement extends LogicalPlan { */ case class SerdeInfo( storedAs: Option[String] = None, - formatClasses: Option[(String, String)] = None, + formatClasses: Option[FormatClasses] = None, serde: Option[String] = None, serdeProperties: Map[String, String] = Map.empty) { // this uses assertions because validation is done in validateRowFormatFileFormat etc. @@ -74,10 +74,10 @@ case class SerdeInfo( } this match { - case SerdeInfo(Some(format), _, _, _) => - s"STORED AS $format $serdeString" - case SerdeInfo(_, Some((inFormat, outFormat)), _, _) => - s"STORED AS INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat $serdeString" + case SerdeInfo(Some(storedAs), _, _, _) => + s"STORED AS $storedAs $serdeString" + case SerdeInfo(_, Some(formatClasses), _, _) => + s"STORED AS $formatClasses $serdeString" case _ => serdeString } @@ -107,17 +107,23 @@ case class SerdeInfo( } } +case class FormatClasses(input: String, output: String) { + override def toString: String = s"INPUTFORMAT $input OUTPUTFORMAT $output" +} + object SerdeInfo { val empty: SerdeInfo = SerdeInfo(None, None, None, Map.empty) def checkSerdePropMerging( props1: Map[String, String], props2: Map[String, String]): Unit = { - if (props1.keySet.intersect(props2.keySet).nonEmpty) { + val conflictKeys = props1.keySet.intersect(props2.keySet) + if (conflictKeys.nonEmpty) { throw new UnsupportedOperationException( s""" |Cannot safely merge SERDEPROPERTIES: |${props1.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")} |${props2.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")} + |The conflict keys: ${conflictKeys.mkString(", ")} |""".stripMargin) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index f935ff78d0725..b6dc4f61c8588 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -341,17 +341,14 @@ private[sql] object CatalogV2Util { private def convertToProperties(serdeInfo: Option[SerdeInfo]): Map[String, String] = { serdeInfo match { case Some(s) => - ((s.formatClasses match { - case Some((inputFormat, outputFormat)) => - Map("hive.input-format" -> inputFormat, "hive.output-format" -> outputFormat) - case _ => - Map.empty - }) ++ + s.formatClasses.map { f => + Map("hive.input-format" -> f.input, "hive.output-format" -> f.output) + }.getOrElse(Map.empty) ++ s.storedAs.map("hive.stored-as" -> _) ++ s.serde.map("hive.serde" -> _) ++ s.serdeProperties.map { case (key, value) => TableCatalog.OPTION_PREFIX + key -> value - }).toMap + } case None => Map.empty } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 1f74c6699336f..86d6f2d13c937 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -474,7 +474,7 @@ class DDLParserSuite extends AnalysisTest { Map.empty[String, String], None, None, - Some(SerdeInfo(formatClasses = Some(("inFormat", "outFormat"))))) + Some(SerdeInfo(formatClasses = Some(FormatClasses("inFormat", "outFormat"))))) Seq(createSql, replaceSql).foreach { sql => testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) } @@ -498,7 +498,9 @@ class DDLParserSuite extends AnalysisTest { Map.empty[String, String], None, None, - Some(SerdeInfo(formatClasses = Some(("inFormat", "outFormat")), serde = Some("customSerde")))) + Some(SerdeInfo( + formatClasses = Some(FormatClasses("inFormat", "outFormat")), + serde = Some("customSerde")))) Seq(createSql, replaceSql).foreach { sql => testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index fe63af47d5a5b..b83f6abe4c04d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -659,8 +659,11 @@ class ResolveSessionCatalog( if (provider.isDefined) { // The parser guarantees that USING and STORED AS/ROW FORMAT won't co-exist. - assert(maybeSerdeInfo.isEmpty) - nonHiveStorageFormat -> provider.get + if (maybeSerdeInfo.isDefined) { + throw new AnalysisException( + s"Cannot create table with both USING $provider and ${maybeSerdeInfo.get.describe}") + } + (nonHiveStorageFormat, provider.get) } else if (maybeSerdeInfo.isDefined) { val serdeInfo = maybeSerdeInfo.get SerdeInfo.checkSerdePropMerging(serdeInfo.serdeProperties, defaultHiveStorage.properties) @@ -679,20 +682,22 @@ class ResolveSessionCatalog( } } else { defaultHiveStorage.copy( - inputFormat = serdeInfo.formatClasses.map(_._1).orElse(defaultHiveStorage.inputFormat), - outputFormat = serdeInfo.formatClasses.map(_._2).orElse(defaultHiveStorage.outputFormat), + inputFormat = + serdeInfo.formatClasses.map(_.input).orElse(defaultHiveStorage.inputFormat), + outputFormat = + serdeInfo.formatClasses.map(_.output).orElse(defaultHiveStorage.outputFormat), serde = serdeInfo.serde.orElse(defaultHiveStorage.serde), properties = serdeInfo.serdeProperties ++ defaultHiveStorage.properties) } - storageFormat -> DDLUtils.HIVE_PROVIDER + (storageFormat, DDLUtils.HIVE_PROVIDER) } else { // If neither USING nor STORED AS/ROW FORMAT is specified, we create native data source // tables if it's a CTAS and `conf.convertCTAS` is true. // TODO: create native data source table by default for non-CTAS. if (ctas && conf.convertCTAS) { - nonHiveStorageFormat -> conf.defaultDataSourceName + (nonHiveStorageFormat, conf.defaultDataSourceName) } else { - defaultHiveStorage -> DDLUtils.HIVE_PROVIDER + (defaultHiveStorage, DDLUtils.HIVE_PROVIDER) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 204a079081934..4f06ab86bf167 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -374,8 +374,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { if (serdeInfo.storedAs.isEmpty) { CatalogStorageFormat.empty.copy( locationUri = location.map(CatalogUtils.stringToURI), - inputFormat = serdeInfo.formatClasses.map(_._1), - outputFormat = serdeInfo.formatClasses.map(_._1), + inputFormat = serdeInfo.formatClasses.map(_.input), + outputFormat = serdeInfo.formatClasses.map(_.output), serde = serdeInfo.serde, properties = serdeInfo.serdeProperties) } else { From 4d1b37a8ba496e29e7ed7b433b44e9285aa08469 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 23 Nov 2020 22:38:13 +0800 Subject: [PATCH 3/3] improve error message --- .../spark/sql/catalyst/parser/AstBuilder.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 801a5770d0e38..e4037aee925b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3014,8 +3014,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } if (temp) { - operationNotAllowed("CREATE TEMPORARY TABLE is not supported yet. " + - "Please use CREATE TEMPORARY VIEW as an alternative.", ctx) + val asSelect = if (ctx.query == null) "" else " AS ..." + operationNotAllowed( + s"CREATE TEMPORARY TABLE ...$asSelect, use CREATE TEMPORARY VIEW instead", ctx) } val partitioning = partitionExpressions(partTransforms, partCols, ctx) @@ -3074,9 +3075,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging */ override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitReplaceTableHeader(ctx.replaceTableHeader) + val orCreate = ctx.replaceTableHeader().CREATE() != null + if (temp) { - operationNotAllowed("CREATE OR REPLACE TEMPORARY TABLE is not supported yet. " + - "Please use CREATE OR REPLACE TEMPORARY VIEW as an alternative.", ctx) + val action = if (orCreate) "CREATE OR REPLACE" else "REPLACE" + operationNotAllowed(s"$action TEMPORARY TABLE ..., use $action TEMPORARY VIEW instead.", ctx) } if (external) { @@ -3097,7 +3100,6 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } val partitioning = partitionExpressions(partTransforms, partCols, ctx) - val orCreate = ctx.replaceTableHeader().CREATE() != null Option(ctx.query).map(plan) match { case Some(_) if columns.nonEmpty =>