Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,7 @@ operatorPipeRightSide
| unpivotClause pivotClause?
| sample
| joinRelation
| operator=(UNION | EXCEPT | SETMINUS | INTERSECT) setQuantifier? right=queryTerm
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1407,10 +1407,13 @@ class AstBuilder extends DataTypeAstBuilder
* - INTERSECT [DISTINCT | ALL]
*/
override def visitSetOperation(ctx: SetOperationContext): LogicalPlan = withOrigin(ctx) {
val left = plan(ctx.left)
val right = plan(ctx.right)
val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
ctx.operator.getType match {
visitSetOperationImpl(plan(ctx.left), plan(ctx.right), all, ctx.operator.getType)
}

private def visitSetOperationImpl(
left: LogicalPlan, right: LogicalPlan, all: Boolean, operatorType: Int): LogicalPlan = {
operatorType match {
case SqlBaseParser.UNION if all =>
Union(left, right)
case SqlBaseParser.UNION =>
Expand Down Expand Up @@ -5918,7 +5921,10 @@ class AstBuilder extends DataTypeAstBuilder
withSample(c, left)
}.getOrElse(Option(ctx.joinRelation()).map { c =>
withJoinRelation(c, left)
}.get)))))
}.getOrElse(Option(ctx.operator).map { c =>
val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
visitSetOperationImpl(left, plan(ctx.right), all, c.getType)
}.get))))))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1820,6 +1820,208 @@ org.apache.spark.sql.catalyst.parser.ParseException
}


-- !query
table t
|> union all table t
-- !query analysis
Union false, false
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> union table t
-- !query analysis
Distinct
+- Union false, false
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
(select * from t)
|> union all table t
-- !query analysis
Union false, false
:- Project [x#x, y#x]
: +- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
(select * from t)
|> union table t
-- !query analysis
Distinct
+- Union false, false
:- Project [x#x, y#x]
: +- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
values (0, 'abc') tab(x, y)
|> union all table t
-- !query analysis
Union false, false
:- SubqueryAlias tab
: +- LocalRelation [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
values (0, 1) tab(x, y)
|> union table t
-- !query analysis
Distinct
+- Union false, false
:- Project [x#x, cast(y#x as string) AS y#x]
: +- SubqueryAlias tab
: +- LocalRelation [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
(select * from t)
|> union all (select * from t)
-- !query analysis
Union false, false
:- Project [x#x, y#x]
: +- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> except all table t
-- !query analysis
Except All true
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> except table t
-- !query analysis
Except false
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> intersect all table t
-- !query analysis
Intersect All true
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> intersect table t
-- !query analysis
Intersect false
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> minus all table t
-- !query analysis
Except All true
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> minus table t
-- !query analysis
Except false
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select x
|> union all table t
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "NUM_COLUMNS_MISMATCH",
"sqlState" : "42826",
"messageParameters" : {
"firstNumColumns" : "1",
"invalidNumColumns" : "2",
"invalidOrdinalNum" : "second",
"operator" : "UNION"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 40,
"fragment" : "table t\n|> select x\n|> union all table t"
} ]
}


-- !query
table t
|> union all table st
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "INCOMPATIBLE_COLUMN_TYPE",
"sqlState" : "42825",
"messageParameters" : {
"columnOrdinalNumber" : "second",
"dataType1" : "\"STRUCT<i1: INT, i2: INT>\"",
"dataType2" : "\"STRING\"",
"hint" : "",
"operator" : "UNION",
"tableOrdinalNumber" : "second"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 29,
"fragment" : "table t\n|> union all table st"
} ]
}


-- !query
drop table t
-- !query analysis
Expand Down
67 changes: 67 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,73 @@ table join_test_t1
table join_test_t1 jt
|> cross join (select * from jt);

-- Set operations: positive tests.
-----------------------------------

-- Union all.
table t
|> union all table t;

-- Union distinct.
table t
|> union table t;

-- Union all with a table subquery.
(select * from t)
|> union all table t;

-- Union distinct with a table subquery.
(select * from t)
|> union table t;

-- Union all with a VALUES list.
values (0, 'abc') tab(x, y)
|> union all table t;

-- Union distinct with a VALUES list.
values (0, 1) tab(x, y)
|> union table t;

-- Union all with a table subquery on both the source and target sides.
(select * from t)
|> union all (select * from t);

-- Except all.
table t
|> except all table t;

-- Except distinct.
table t
|> except table t;

-- Intersect all.
table t
|> intersect all table t;

-- Intersect distinct.
table t
|> intersect table t;

-- Minus all.
table t
|> minus all table t;

-- Minus distinct.
table t
|> minus table t;

-- Set operations: negative tests.
-----------------------------------

-- The UNION operator requires the same number of columns in the input relations.
table t
|> select x
|> union all table t;

-- The UNION operator requires the column types to be compatible.
table t
|> union all table st;

-- Cleanup.
-----------
drop table t;
Expand Down
Loading