-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-16217][SQL] Support SELECT INTO statement #14191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -132,6 +132,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { | |
| // Build the insert clauses. | ||
| val inserts = ctx.multiInsertQueryBody.asScala.map { | ||
| body => | ||
| assert(body.querySpecification.intoClause == null, | ||
| "Multi-Insert queries cannot have a into clause in their individual SELECT statements", | ||
| body) | ||
|
|
||
| assert(body.querySpecification.fromClause == null, | ||
| "Multi-Insert queries cannot have a FROM clause in their individual SELECT statements", | ||
| body) | ||
|
|
@@ -155,11 +159,26 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { | |
| */ | ||
| override def visitSingleInsertQuery( | ||
| ctx: SingleInsertQueryContext): LogicalPlan = withOrigin(ctx) { | ||
| // Find intoClause | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This allows for the following syntax: INSERT INTO tbl_a
SELECT *
INTO tbl_a
FROM tbl_bMake sure that we cannot have both. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also need to check what this does with multi-insert syntax, i.e.: FROM tbl_a
INSERT INTO tbl_b
SELECT *
INSERT INTO tbl_c
SELECT *
INTO tbl_c |
||
| val intoClause = ctx.queryTerm match { | ||
| case queryTermDefault : QueryTermDefaultContext => | ||
| queryTermDefault.queryPrimary match { | ||
| case queryPrimaryDefault : QueryPrimaryDefaultContext => | ||
| queryPrimaryDefault.querySpecification.intoClause | ||
| case _ => null | ||
| } | ||
| case _ => null | ||
| } | ||
| if (ctx.insertInto != null && intoClause != null) { | ||
| operationNotAllowed("INSERT INTO ... SELECT INTO", ctx) | ||
| } | ||
| plan(ctx.queryTerm). | ||
| // Add organization statements. | ||
| optionalMap(ctx.queryOrganization)(withQueryResultClauses). | ||
| // Add insert. | ||
| optionalMap(ctx.insertInto())(withInsertInto) | ||
| optionalMap(ctx.insertInto())(withInsertInto). | ||
| // exist intoClause | ||
| optionalMap(intoClause)(withSelectInto) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -394,6 +413,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { | |
| throw new ParseException("Script Transform is not supported", ctx) | ||
| } | ||
|
|
||
| /** | ||
| * Change to Hive CTAS statement. | ||
| */ | ||
| protected def withSelectInto( | ||
| ctx: IntoClauseContext, | ||
| query: LogicalPlan): LogicalPlan = withOrigin(ctx) { | ||
| throw new ParseException("Script Select Into is not supported", ctx) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why throwing a |
||
| } | ||
|
|
||
| /** | ||
| * Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma | ||
| * separated) relations here, these get converted into a single plan by condition-less inner join. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1376,4 +1376,62 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { | |
| reader, writer, | ||
| schemaLess) | ||
| } | ||
|
|
||
| /** | ||
| * Reuse CTAS, convert select into to CTAS, | ||
| * returning [[CreateHiveTableAsSelectLogicalPlan]]. | ||
| * The SELECT INTO statement selects data from one table | ||
| * and inserts it into a new table.It is commonly used to | ||
| * create a backup copy for table or selected records. | ||
| * | ||
| * Expected format: | ||
| * {{{ | ||
| * SELECT column_name(s) | ||
| * INTO new_table | ||
| * FROM old_table | ||
| * ... | ||
| * }}} | ||
| */ | ||
| override protected def withSelectInto( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code below is duplicate. Why are we not using the existing CTAS code path? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hvanhovell Reusing CTAS code path means we need to convert IntoClauseContext to CreateTableContext (or construct a new CreateTableContext),it might be difficult to archive. Maybe there is another way? |
||
| ctx: IntoClauseContext, | ||
| query: LogicalPlan): LogicalPlan = withOrigin(ctx) { | ||
| // Storage format | ||
| val defaultStorage: CatalogStorageFormat = { | ||
| val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") | ||
| val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) | ||
| CatalogStorageFormat( | ||
| locationUri = None, | ||
| inputFormat = defaultHiveSerde.flatMap(_.inputFormat) | ||
| .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), | ||
| outputFormat = defaultHiveSerde.flatMap(_.outputFormat) | ||
| .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), | ||
| // Note: Keep this unspecified because we use the presence of the serde to decide | ||
| // whether to convert a table created by CTAS to a datasource table. | ||
| serde = None, | ||
| compressed = false, | ||
| serdeProperties = Map()) | ||
| } | ||
| // TODO support the sql text - have a proper location for this! | ||
| val tableDesc = CatalogTable( | ||
| identifier = visitTableIdentifier(ctx.tableIdentifier), | ||
| tableType = CatalogTableType.MANAGED, | ||
| storage = defaultStorage, | ||
| schema = Nil | ||
| ) | ||
|
|
||
| // Table shouldn't exist | ||
| if (conf.convertCTAS) { | ||
| CreateTableUsingAsSelect( | ||
| tableIdent = tableDesc.identifier, | ||
| provider = conf.defaultDataSourceName, | ||
| partitionColumns = Array(), | ||
| bucketSpec = None, | ||
| mode = SaveMode.ErrorIfExists, | ||
| options = Map.empty[String, String], | ||
| query | ||
| ) | ||
| } else { | ||
| CreateHiveTableAsSelectLogicalPlan(tableDesc, query, false) | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1755,4 +1755,77 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { | |
| } | ||
| } | ||
| } | ||
|
|
||
| test("select into(check relation)") { | ||
| withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") { | ||
| withTable("si1", "si2") { | ||
| val defaultDataSource = sessionState.conf.defaultDataSourceName | ||
| sql("SELECT key, value INTO si1 FROM src ORDER BY key, value") | ||
| val message = intercept[AnalysisException] { | ||
| sql("SELECT key, value INTO si1 FROM src ORDER BY key, value") | ||
| }.getMessage | ||
| assert(message.contains("already exists")) | ||
| checkRelation("si1", true, defaultDataSource) | ||
|
|
||
| // Specifying database name for query can be converted to data source write path | ||
| // is not allowed right now. | ||
| sql("SELECT key, value INTO default.si2 FROM src ORDER BY key, value") | ||
| checkRelation("si2", true, defaultDataSource) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("select into(check answer)") { | ||
| withTable("si1", "si2", "si3", "si4") { | ||
| sql("SELECT key, value INTO si1 FROM src") | ||
| checkAnswer( | ||
| sql("SELECT key, value FROM si1 ORDER BY key"), | ||
| sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) | ||
|
|
||
| sql("SELECT key k, value INTO si2 FROM src ORDER BY k,value").collect() | ||
| checkAnswer( | ||
| sql("SELECT k, value FROM si2 ORDER BY k, value"), | ||
| sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq) | ||
|
|
||
| sql("SELECT 1 AS key,value INTO si3 FROM src LIMIT 1").collect() | ||
| checkAnswer( | ||
| sql("SELECT key, value FROM si3 ORDER BY key, value"), | ||
| sql("SELECT key, value FROM si3 LIMIT 1").collect().toSeq) | ||
|
|
||
| sql("SELECT 1 INTO si4").collect() | ||
| checkAnswer( | ||
| sql("SELECT 1 FROM si4"), | ||
| sql("SELECT 1").collect().toSeq) | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Checking the real error message is better. |
||
| } | ||
|
|
||
| test("select into(specifying the column list)") { | ||
| withTable("mytable1", "si1") { | ||
| Seq((1, "111111"), (2, "222222")).toDF("key", "value").createOrReplaceTempView("mytable1") | ||
|
|
||
| sql("SELECT key as a,value as b INTO si1 FROM mytable1") | ||
| checkAnswer( | ||
| sql("SELECT a, b from si1"), | ||
| sql("select key, value from mytable1").collect()) | ||
| } | ||
| } | ||
|
|
||
| test("select into(double nested data)") { | ||
| withTable("nested", "si1") { | ||
| sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil) | ||
| .toDF().createOrReplaceTempView("nested") | ||
| checkAnswer( | ||
| sql("SELECT f1.f2.f3 FROM nested"), | ||
| Row(1)) | ||
|
|
||
| sql("SELECT * INTO si1 FROM nested") | ||
| checkAnswer( | ||
| sql("SELECT * FROM si1"), | ||
| sql("SELECT * FROM nested").collect().toSeq) | ||
|
|
||
| intercept[AnalysisException] { | ||
| sql("SELECT * INTO si1 FROM notexists").collect() | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is easier to just put this in the
querySpecificationrule. Make sure you given the tableIdentifier a proper nameThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also check what kind of a plan the following query produces:
We might run into a weird syntax error here. If we do then we need to move the
INTOkeyword from thenonReservedrule to theidentifierrule.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell


For example, the json data registed as table tbl_b is:
The Logical Plan of sql "SELECT PRODUCT_ID INTO tbl_a FROM tbl_b" is:
The results match the expectations