From 605634deb779a0cf0eaece8420692d9bf44dab64 Mon Sep 17 00:00:00 2001 From: cuiguangfan <736068048@qq.com> Date: Tue, 12 Jul 2016 21:16:43 +0800 Subject: [PATCH 1/2] SELECT INTO Implements --- .../spark/sql/catalyst/parser/SqlBase.g4 | 6 +- .../sql/catalyst/parser/AstBuilder.scala | 29 +++++- .../spark/sql/execution/SparkSqlParser.scala | 58 ++++++++++++ .../sql/hive/execution/SQLQuerySuite.scala | 93 +++++++++++++++++++ 4 files changed, 184 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 7ccbb2dac90c8..f62aad4894d51 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -338,7 +338,7 @@ querySpecification (RECORDREADER recordReader=STRING)? fromClause? (WHERE where=booleanExpression)?) - | ((kind=SELECT setQuantifier? namedExpressionSeq fromClause? + | ((kind=SELECT setQuantifier? namedExpressionSeq (intoClause? fromClause)? | fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?) lateralView* (WHERE where=booleanExpression)? @@ -347,6 +347,10 @@ querySpecification windows?) ; +intoClause + : INTO tableIdentifier + ; + fromClause : FROM relation (',' relation)* lateralView* ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f2cc8d362478a..8ef5dfae8646f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -159,7 +159,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { // Add organization statements. optionalMap(ctx.queryOrganization)(withQueryResultClauses). // Add insert. - optionalMap(ctx.insertInto())(withInsertInto) + optionalMap(ctx.insertInto())(withInsertInto). + // exist intoClause + optionalMap(existIntoClause(ctx.queryTerm))(withSelectInto) } /** @@ -394,6 +396,31 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { throw new ParseException("Script Transform is not supported", ctx) } + /** + * Find intoClause. + */ + private def existIntoClause( + ctx: QueryTermContext): IntoClauseContext = { + ctx match { + case queryTermDefault : QueryTermDefaultContext => + queryTermDefault.queryPrimary match { + case queryPrimaryDefault : QueryPrimaryDefaultContext => + queryPrimaryDefault.querySpecification.intoClause + case _ => null + } + case _ => null + } + } + + /** + * Change to Hive CTAS statement. + */ + protected def withSelectInto( + ctx: IntoClauseContext, + query: LogicalPlan): LogicalPlan = withOrigin(ctx) { + throw new ParseException("Script Select Into is not supported", ctx) + } + /** * Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma * separated) relations here, these get converted into a single plan by condition-less inner join. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c5f4d58da43ac..a2f04c21f431d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1376,4 +1376,62 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { reader, writer, schemaLess) } + + /** + * Reuse CTAS, convert select into to CTAS, + * returning [[CreateHiveTableAsSelectLogicalPlan]]. + * The SELECT INTO statement selects data from one table + * and inserts it into a new table.It is commonly used to + * create a backup copy for table or selected records. + * + * Expected format: + * {{{ + * SELECT column_name(s) + * INTO new_table + * FROM old_table + * ... + * }}} + */ + override protected def withSelectInto( + ctx: IntoClauseContext, + query: LogicalPlan): LogicalPlan = withOrigin(ctx) { + // Storage format + val defaultStorage: CatalogStorageFormat = { + val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) + CatalogStorageFormat( + locationUri = None, + inputFormat = defaultHiveSerde.flatMap(_.inputFormat) + .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), + outputFormat = defaultHiveSerde.flatMap(_.outputFormat) + .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + // Note: Keep this unspecified because we use the presence of the serde to decide + // whether to convert a table created by CTAS to a datasource table. + serde = None, + compressed = false, + serdeProperties = Map()) + } + // TODO support the sql text - have a proper location for this! + val tableDesc = CatalogTable( + identifier = visitTableIdentifier(ctx.tableIdentifier), + tableType = CatalogTableType.MANAGED, + storage = defaultStorage, + schema = Nil + ) + + // Table shouldn't exist + if (conf.convertCTAS) { + CreateTableUsingAsSelect( + tableIdent = tableDesc.identifier, + provider = conf.defaultDataSourceName, + partitionColumns = Array(), + bucketSpec = None, + mode = SaveMode.ErrorIfExists, + options = Map.empty[String, String], + query + ) + } else { + CreateHiveTableAsSelectLogicalPlan(tableDesc, query, false) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 961d95c268b2c..1ab572a221737 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1755,4 +1755,97 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("select into(check relation)") { + val originalConf = sessionState.conf.convertCTAS + + setConf(SQLConf.CONVERT_CTAS, true) + + val defaultDataSource = sessionState.conf.defaultDataSourceName + try { + sql("DROP TABLE IF EXISTS si1") + sql("SELECT key, value INTO si1 FROM src ORDER BY key, value") + val message = intercept[AnalysisException] { + sql("SELECT key, value INTO si1 FROM src ORDER BY key, value") + }.getMessage + assert(message.contains("already exists")) + checkRelation("si1", true, defaultDataSource) + sql("DROP TABLE si1") + + // Specifying database name for query can be converted to data source write path + // is not allowed right now. + sql("SELECT key, value INTO default.si1 FROM src ORDER BY key, value") + checkRelation("si1", true, defaultDataSource) + sql("DROP TABLE si1") + + } finally { + setConf(SQLConf.CONVERT_CTAS, originalConf) + sql("DROP TABLE IF EXISTS si1") + } + } + + test("select into(check answer)") { + sql("DROP TABLE IF EXISTS si1") + sql("DROP TABLE IF EXISTS si2") + sql("DROP TABLE IF EXISTS si3") + + sql("SELECT key, value INTO si1 FROM src") + checkAnswer( + sql("SELECT key, value FROM si1 ORDER BY key"), + sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) + + sql("SELECT key k, value INTO si2 FROM src ORDER BY k,value").collect() + checkAnswer( + sql("SELECT k, value FROM si2 ORDER BY k, value"), + sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq) + + sql("SELECT 1 AS key,value INTO si3 FROM src LIMIT 1").collect() + intercept[AnalysisException] { + sql("SELECT key, value INTO si3 FROM src ORDER BY key, value").collect() + } + checkAnswer( + sql("SELECT key, value FROM si3 ORDER BY key, value"), + sql("SELECT key, value FROM si3 LIMIT 1").collect().toSeq) + + sql("DROP TABLE IF EXISTS si1") + sql("DROP TABLE IF EXISTS si2") + sql("DROP TABLE IF EXISTS si3") + } + + test("select into(specifying the column list)") { + sql("DROP TABLE IF EXISTS mytable1") + sql("DROP TABLE IF EXISTS si4") + Seq((1, "111111"), (2, "222222")).toDF("key", "value").createOrReplaceTempView("mytable1") + + sql("SELECT key as a,value as b INTO si4 FROM mytable1") + checkAnswer( + sql("SELECT a, b from si4"), + sql("select key, value from mytable1").collect()) + + sql("DROP TABLE IF EXISTS mytable1") + sql("DROP TABLE IF EXISTS si4") + } + + test("select into(double nested data)") { + sql("DROP TABLE IF EXISTS nested") + sql("DROP TABLE IF EXISTS si5") + + sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil) + .toDF().createOrReplaceTempView("nested") + checkAnswer( + sql("SELECT f1.f2.f3 FROM nested"), + Row(1)) + + sql("SELECT * INTO si5 FROM nested") + checkAnswer( + sql("SELECT * FROM si5"), + sql("SELECT * FROM nested").collect().toSeq) + + intercept[AnalysisException] { + sql("SELECT * INTO si5 FROM notexists").collect() + } + + sql("DROP TABLE IF EXISTS nested") + sql("DROP TABLE IF EXISTS si5") + } } From 6075b82702e50f865deac95bf9bf5f0a5969b9fb Mon Sep 17 00:00:00 2001 From: cuiguangfan <736068048@qq.com> Date: Mon, 18 Jul 2016 16:34:52 +0800 Subject: [PATCH 2/2] 1.Support "SELECT 1 INTO newtable" 2.Add check in multiinsertquery syntax:not allow multi insert and select into appear at the same time 3.Add check in singleinsertquery:not allow insert into and select into appear at the same time --- .../spark/sql/catalyst/parser/SqlBase.g4 | 3 +- .../sql/catalyst/parser/AstBuilder.scala | 35 ++--- .../sql/hive/execution/SQLQuerySuite.scala | 122 ++++++++---------- 3 files changed, 71 insertions(+), 89 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index f62aad4894d51..5b60cd4f50b8a 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -338,7 +338,8 @@ querySpecification (RECORDREADER recordReader=STRING)? fromClause? (WHERE where=booleanExpression)?) - | ((kind=SELECT setQuantifier? namedExpressionSeq (intoClause? fromClause)? + | ((kind=SELECT setQuantifier? namedExpressionSeq fromClause? + |kind=SELECT setQuantifier? namedExpressionSeq intoClause? fromClause? | fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?) lateralView* (WHERE where=booleanExpression)? diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8ef5dfae8646f..9f92e0e49b3b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -132,6 +132,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { // Build the insert clauses. val inserts = ctx.multiInsertQueryBody.asScala.map { body => + assert(body.querySpecification.intoClause == null, + "Multi-Insert queries cannot have a into clause in their individual SELECT statements", + body) + assert(body.querySpecification.fromClause == null, "Multi-Insert queries cannot have a FROM clause in their individual SELECT statements", body) @@ -155,13 +159,26 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { */ override def visitSingleInsertQuery( ctx: SingleInsertQueryContext): LogicalPlan = withOrigin(ctx) { + // Find intoClause + val intoClause = ctx.queryTerm match { + case queryTermDefault : QueryTermDefaultContext => + queryTermDefault.queryPrimary match { + case queryPrimaryDefault : QueryPrimaryDefaultContext => + queryPrimaryDefault.querySpecification.intoClause + case _ => null + } + case _ => null + } + if (ctx.insertInto != null && intoClause != null) { + operationNotAllowed("INSERT INTO ... SELECT INTO", ctx) + } plan(ctx.queryTerm). // Add organization statements. optionalMap(ctx.queryOrganization)(withQueryResultClauses). // Add insert. optionalMap(ctx.insertInto())(withInsertInto). // exist intoClause - optionalMap(existIntoClause(ctx.queryTerm))(withSelectInto) + optionalMap(intoClause)(withSelectInto) } /** @@ -396,22 +413,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { throw new ParseException("Script Transform is not supported", ctx) } - /** - * Find intoClause. - */ - private def existIntoClause( - ctx: QueryTermContext): IntoClauseContext = { - ctx match { - case queryTermDefault : QueryTermDefaultContext => - queryTermDefault.queryPrimary match { - case queryPrimaryDefault : QueryPrimaryDefaultContext => - queryPrimaryDefault.querySpecification.intoClause - case _ => null - } - case _ => null - } - } - /** * Change to Hive CTAS statement. */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1ab572a221737..53f60a0d62548 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1757,95 +1757,75 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("select into(check relation)") { - val originalConf = sessionState.conf.convertCTAS - - setConf(SQLConf.CONVERT_CTAS, true) - - val defaultDataSource = sessionState.conf.defaultDataSourceName - try { - sql("DROP TABLE IF EXISTS si1") - sql("SELECT key, value INTO si1 FROM src ORDER BY key, value") - val message = intercept[AnalysisException] { + withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") { + withTable("si1", "si2") { + val defaultDataSource = sessionState.conf.defaultDataSourceName sql("SELECT key, value INTO si1 FROM src ORDER BY key, value") - }.getMessage - assert(message.contains("already exists")) - checkRelation("si1", true, defaultDataSource) - sql("DROP TABLE si1") - - // Specifying database name for query can be converted to data source write path - // is not allowed right now. - sql("SELECT key, value INTO default.si1 FROM src ORDER BY key, value") - checkRelation("si1", true, defaultDataSource) - sql("DROP TABLE si1") - - } finally { - setConf(SQLConf.CONVERT_CTAS, originalConf) - sql("DROP TABLE IF EXISTS si1") + val message = intercept[AnalysisException] { + sql("SELECT key, value INTO si1 FROM src ORDER BY key, value") + }.getMessage + assert(message.contains("already exists")) + checkRelation("si1", true, defaultDataSource) + + // Specifying database name for query can be converted to data source write path + // is not allowed right now. + sql("SELECT key, value INTO default.si2 FROM src ORDER BY key, value") + checkRelation("si2", true, defaultDataSource) + } } } test("select into(check answer)") { - sql("DROP TABLE IF EXISTS si1") - sql("DROP TABLE IF EXISTS si2") - sql("DROP TABLE IF EXISTS si3") + withTable("si1", "si2", "si3", "si4") { + sql("SELECT key, value INTO si1 FROM src") + checkAnswer( + sql("SELECT key, value FROM si1 ORDER BY key"), + sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) - sql("SELECT key, value INTO si1 FROM src") - checkAnswer( - sql("SELECT key, value FROM si1 ORDER BY key"), - sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) + sql("SELECT key k, value INTO si2 FROM src ORDER BY k,value").collect() + checkAnswer( + sql("SELECT k, value FROM si2 ORDER BY k, value"), + sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq) - sql("SELECT key k, value INTO si2 FROM src ORDER BY k,value").collect() - checkAnswer( - sql("SELECT k, value FROM si2 ORDER BY k, value"), - sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq) + sql("SELECT 1 AS key,value INTO si3 FROM src LIMIT 1").collect() + checkAnswer( + sql("SELECT key, value FROM si3 ORDER BY key, value"), + sql("SELECT key, value FROM si3 LIMIT 1").collect().toSeq) - sql("SELECT 1 AS key,value INTO si3 FROM src LIMIT 1").collect() - intercept[AnalysisException] { - sql("SELECT key, value INTO si3 FROM src ORDER BY key, value").collect() + sql("SELECT 1 INTO si4").collect() + checkAnswer( + sql("SELECT 1 FROM si4"), + sql("SELECT 1").collect().toSeq) } - checkAnswer( - sql("SELECT key, value FROM si3 ORDER BY key, value"), - sql("SELECT key, value FROM si3 LIMIT 1").collect().toSeq) - - sql("DROP TABLE IF EXISTS si1") - sql("DROP TABLE IF EXISTS si2") - sql("DROP TABLE IF EXISTS si3") } test("select into(specifying the column list)") { - sql("DROP TABLE IF EXISTS mytable1") - sql("DROP TABLE IF EXISTS si4") - Seq((1, "111111"), (2, "222222")).toDF("key", "value").createOrReplaceTempView("mytable1") + withTable("mytable1", "si1") { + Seq((1, "111111"), (2, "222222")).toDF("key", "value").createOrReplaceTempView("mytable1") - sql("SELECT key as a,value as b INTO si4 FROM mytable1") - checkAnswer( - sql("SELECT a, b from si4"), - sql("select key, value from mytable1").collect()) - - sql("DROP TABLE IF EXISTS mytable1") - sql("DROP TABLE IF EXISTS si4") + sql("SELECT key as a,value as b INTO si1 FROM mytable1") + checkAnswer( + sql("SELECT a, b from si1"), + sql("select key, value from mytable1").collect()) + } } test("select into(double nested data)") { - sql("DROP TABLE IF EXISTS nested") - sql("DROP TABLE IF EXISTS si5") - - sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil) - .toDF().createOrReplaceTempView("nested") - checkAnswer( - sql("SELECT f1.f2.f3 FROM nested"), - Row(1)) + withTable("nested", "si1") { + sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil) + .toDF().createOrReplaceTempView("nested") + checkAnswer( + sql("SELECT f1.f2.f3 FROM nested"), + Row(1)) - sql("SELECT * INTO si5 FROM nested") - checkAnswer( - sql("SELECT * FROM si5"), - sql("SELECT * FROM nested").collect().toSeq) + sql("SELECT * INTO si1 FROM nested") + checkAnswer( + sql("SELECT * FROM si1"), + sql("SELECT * FROM nested").collect().toSeq) - intercept[AnalysisException] { - sql("SELECT * INTO si5 FROM notexists").collect() + intercept[AnalysisException] { + sql("SELECT * INTO si1 FROM notexists").collect() + } } - - sql("DROP TABLE IF EXISTS nested") - sql("DROP TABLE IF EXISTS si5") } }