Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ querySpecification
fromClause?
(WHERE where=booleanExpression)?)
| ((kind=SELECT setQuantifier? namedExpressionSeq fromClause?
|kind=SELECT setQuantifier? namedExpressionSeq intoClause? fromClause?
| fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?)
lateralView*
(WHERE where=booleanExpression)?
Expand All @@ -347,6 +348,10 @@ querySpecification
windows?)
;

intoClause
: INTO tableIdentifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is easier to just put this in the querySpecification rule. Make sure you given the tableIdentifier a proper name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also check what kind of a plan the following query produces:

SELECT a INTO tbl_a FROM tbl_b

We might run into a weird syntax error here. If we do then we need to move the INTO keyword from the nonReserved rule to the identifier rule.

Copy link
Author

@wuxianxingkong wuxianxingkong Jul 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell
For example, the json data registed as table tbl_b is:
json data
The Logical Plan of sql "SELECT PRODUCT_ID INTO tbl_a FROM tbl_b" is:
json plan
The results match the expectations

;

fromClause
: FROM relation (',' relation)* lateralView*
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
// Build the insert clauses.
val inserts = ctx.multiInsertQueryBody.asScala.map {
body =>
assert(body.querySpecification.intoClause == null,
"Multi-Insert queries cannot have a into clause in their individual SELECT statements",
body)

assert(body.querySpecification.fromClause == null,
"Multi-Insert queries cannot have a FROM clause in their individual SELECT statements",
body)
Expand All @@ -155,11 +159,26 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
*/
override def visitSingleInsertQuery(
ctx: SingleInsertQueryContext): LogicalPlan = withOrigin(ctx) {
// Find intoClause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows for the following syntax:

INSERT INTO tbl_a
SELECT *
INTO tbl_a
FROM tbl_b

Make sure that we cannot have both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to check what this does with multi-insert syntax, i.e.:

FROM tbl_a
INSERT INTO tbl_b
SELECT *
INSERT INTO tbl_c
SELECT *
INTO tbl_c

val intoClause = ctx.queryTerm match {
case queryTermDefault : QueryTermDefaultContext =>
queryTermDefault.queryPrimary match {
case queryPrimaryDefault : QueryPrimaryDefaultContext =>
queryPrimaryDefault.querySpecification.intoClause
case _ => null
}
case _ => null
}
if (ctx.insertInto != null && intoClause != null) {
operationNotAllowed("INSERT INTO ... SELECT INTO", ctx)
}
plan(ctx.queryTerm).
// Add organization statements.
optionalMap(ctx.queryOrganization)(withQueryResultClauses).
// Add insert.
optionalMap(ctx.insertInto())(withInsertInto)
optionalMap(ctx.insertInto())(withInsertInto).
// exist intoClause
optionalMap(intoClause)(withSelectInto)
}

/**
Expand Down Expand Up @@ -394,6 +413,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
throw new ParseException("Script Transform is not supported", ctx)
}

/**
* Change to Hive CTAS statement.
*/
protected def withSelectInto(
ctx: IntoClauseContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
throw new ParseException("Script Select Into is not supported", ctx)
Copy link
Member

@gatorsmile gatorsmile Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why throwing a ParseException ?

}

/**
* Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma
* separated) relations here, these get converted into a single plan by condition-less inner join.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1376,4 +1376,62 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
reader, writer,
schemaLess)
}

/**
* Reuse CTAS, convert select into to CTAS,
* returning [[CreateHiveTableAsSelectLogicalPlan]].
* The SELECT INTO statement selects data from one table
* and inserts it into a new table.It is commonly used to
* create a backup copy for table or selected records.
*
* Expected format:
* {{{
* SELECT column_name(s)
* INTO new_table
* FROM old_table
* ...
* }}}
*/
override protected def withSelectInto(
Copy link
Contributor

@hvanhovell hvanhovell Jul 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code below is duplicate. Why are we not using the existing CTAS code path?

Copy link
Author

@wuxianxingkong wuxianxingkong Jul 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Reusing CTAS code path means we need to convert IntoClauseContext to CreateTableContext (or construct a new CreateTableContext),it might be difficult to archive. Maybe there is another way?

ctx: IntoClauseContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
// Storage format
val defaultStorage: CatalogStorageFormat = {
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf)
CatalogStorageFormat(
locationUri = None,
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
// Note: Keep this unspecified because we use the presence of the serde to decide
// whether to convert a table created by CTAS to a datasource table.
serde = None,
compressed = false,
serdeProperties = Map())
}
// TODO support the sql text - have a proper location for this!
val tableDesc = CatalogTable(
identifier = visitTableIdentifier(ctx.tableIdentifier),
tableType = CatalogTableType.MANAGED,
storage = defaultStorage,
schema = Nil
)

// Table shouldn't exist
if (conf.convertCTAS) {
CreateTableUsingAsSelect(
tableIdent = tableDesc.identifier,
provider = conf.defaultDataSourceName,
partitionColumns = Array(),
bucketSpec = None,
mode = SaveMode.ErrorIfExists,
options = Map.empty[String, String],
query
)
} else {
CreateHiveTableAsSelectLogicalPlan(tableDesc, query, false)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1755,4 +1755,77 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}

test("select into(check relation)") {
withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
withTable("si1", "si2") {
val defaultDataSource = sessionState.conf.defaultDataSourceName
sql("SELECT key, value INTO si1 FROM src ORDER BY key, value")
val message = intercept[AnalysisException] {
sql("SELECT key, value INTO si1 FROM src ORDER BY key, value")
}.getMessage
assert(message.contains("already exists"))
checkRelation("si1", true, defaultDataSource)

// Specifying database name for query can be converted to data source write path
// is not allowed right now.
sql("SELECT key, value INTO default.si2 FROM src ORDER BY key, value")
checkRelation("si2", true, defaultDataSource)
}
}
}

test("select into(check answer)") {
withTable("si1", "si2", "si3", "si4") {
sql("SELECT key, value INTO si1 FROM src")
checkAnswer(
sql("SELECT key, value FROM si1 ORDER BY key"),
sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)

sql("SELECT key k, value INTO si2 FROM src ORDER BY k,value").collect()
checkAnswer(
sql("SELECT k, value FROM si2 ORDER BY k, value"),
sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)

sql("SELECT 1 AS key,value INTO si3 FROM src LIMIT 1").collect()
checkAnswer(
sql("SELECT key, value FROM si3 ORDER BY key, value"),
sql("SELECT key, value FROM si3 LIMIT 1").collect().toSeq)

sql("SELECT 1 INTO si4").collect()
checkAnswer(
sql("SELECT 1 FROM si4"),
sql("SELECT 1").collect().toSeq)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the real error message is better.

val m = intercept[AnalysisException] {
     sql("SELECT key, value INTO si3 FROM src ORDER BY key, value").collect()
}.getMessage
assert(m.contains("your exception message"))

}

test("select into(specifying the column list)") {
withTable("mytable1", "si1") {
Seq((1, "111111"), (2, "222222")).toDF("key", "value").createOrReplaceTempView("mytable1")

sql("SELECT key as a,value as b INTO si1 FROM mytable1")
checkAnswer(
sql("SELECT a, b from si1"),
sql("select key, value from mytable1").collect())
}
}

test("select into(double nested data)") {
withTable("nested", "si1") {
sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil)
.toDF().createOrReplaceTempView("nested")
checkAnswer(
sql("SELECT f1.f2.f3 FROM nested"),
Row(1))

sql("SELECT * INTO si1 FROM nested")
checkAnswer(
sql("SELECT * FROM si1"),
sql("SELECT * FROM nested").collect().toSeq)

intercept[AnalysisException] {
sql("SELECT * INTO si1 FROM notexists").collect()
}
}
}
}