Skip to content

Commit 21e48b7

Browse files
beliefercloud-fan
authored andcommitted
[SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression
### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is #25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes #35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 5aba2b3 commit 21e48b7

File tree

27 files changed

+710
-34
lines changed

27 files changed

+710
-34
lines changed

docs/sql-ref-ansi-compliance.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ Below is a list of all the keywords in Spark SQL.
494494
|NULL|reserved|non-reserved|reserved|
495495
|NULLS|non-reserved|non-reserved|non-reserved|
496496
|OF|non-reserved|non-reserved|reserved|
497+
|OFFSET|reserved|non-reserved|reserved|
497498
|ON|reserved|strict-non-reserved|reserved|
498499
|ONLY|reserved|non-reserved|reserved|
499500
|OPTION|non-reserved|non-reserved|non-reserved|

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ NOT: 'NOT' | '!';
242242
NULL: 'NULL';
243243
NULLS: 'NULLS';
244244
OF: 'OF';
245+
OFFSET: 'OFFSET';
245246
ON: 'ON';
246247
ONLY: 'ONLY';
247248
OPTION: 'OPTION';

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ queryOrganization
439439
(SORT BY sort+=sortItem (COMMA sort+=sortItem)*)?
440440
windowClause?
441441
(LIMIT (ALL | limit=expression))?
442+
(OFFSET offset=expression)?
442443
;
443444

444445
multiInsertQueryBody
@@ -1450,6 +1451,7 @@ nonReserved
14501451
| NULL
14511452
| NULLS
14521453
| OF
1454+
| OFFSET
14531455
| ONLY
14541456
| OPTION
14551457
| OPTIONS

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,24 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
414414

415415
case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
416416

417-
case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
417+
case LocalLimit(limitExpr, child) =>
418+
checkLimitLikeClause("limit", limitExpr)
419+
child match {
420+
case Offset(offsetExpr, _) =>
421+
val limit = limitExpr.eval().asInstanceOf[Int]
422+
val offset = offsetExpr.eval().asInstanceOf[Int]
423+
if (Int.MaxValue - limit < offset) {
424+
failAnalysis(
425+
s"""
426+
|The sum of the LIMIT clause and the OFFSET clause must not be greater than
427+
|the maximum 32-bit integer value (2,147,483,647),
428+
|but found limit = $limit, offset = $offset.
429+
|""".stripMargin.replace("\n", " "))
430+
}
431+
case _ =>
432+
}
433+
434+
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)
418435

419436
case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)
420437

@@ -591,6 +608,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
591608
}
592609
}
593610
checkCollectedMetrics(plan)
611+
checkOffsetOperator(plan)
594612
extendedCheckRules.foreach(_(plan))
595613
plan.foreachUp {
596614
case o if !o.resolved =>
@@ -833,6 +851,30 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
833851
check(plan)
834852
}
835853

854+
/**
855+
* Validate whether the [[Offset]] is valid.
856+
*/
857+
private def checkOffsetOperator(plan: LogicalPlan): Unit = {
858+
plan.foreachUp {
859+
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
860+
&& o.children.exists(_.isInstanceOf[Offset]) =>
861+
failAnalysis(
862+
s"""
863+
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
864+
|clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
865+
case _ =>
866+
}
867+
plan match {
868+
case Offset(offsetExpr, _) =>
869+
checkLimitLikeClause("offset", offsetExpr)
870+
failAnalysis(
871+
s"""
872+
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
873+
|clause is found to be the outermost node.""".stripMargin.replace("\n", " "))
874+
case _ =>
875+
}
876+
}
877+
836878
/**
837879
* Validates to make sure the outer references appearing inside the subquery
838880
* are allowed.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,8 @@ object UnsupportedOperationChecker extends Logging {
386386
throwError("Limits are not supported on streaming DataFrames/Datasets in Update " +
387387
"output mode")
388388

389+
case Offset(_, _) => throwError("Offset is not supported on streaming DataFrames/Datasets")
390+
389391
case Sort(_, _, _) if !containsCompleteData(subPlan) =>
390392
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " +
391393
"aggregated DataFrame/Dataset in Complete output mode")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,8 @@ package object dsl {
402402

403403
def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan)
404404

405+
def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan)
406+
405407
def join(
406408
otherPlan: LogicalPlan,
407409
joinType: JoinType = Inner,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
9595
CollapseWindow,
9696
CombineFilters,
9797
EliminateLimits,
98+
RewriteOffsets,
9899
CombineUnions,
99100
// Constant folding and strength reduction
100101
OptimizeRepartition,
@@ -1868,6 +1869,25 @@ object EliminateLimits extends Rule[LogicalPlan] {
18681869
}
18691870
}
18701871

1872+
/**
1873+
* Rewrite [[Offset]] as [[GlobalLimitAndOffset]] or [[LocalLimit]],
1874+
* merging the expressions into one single expression. See [[Limit]] for more information
1875+
* about the difference between [[LocalLimit]] and [[GlobalLimit]].
1876+
*/
1877+
object RewriteOffsets extends Rule[LogicalPlan] {
1878+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1879+
case GlobalLimit(le, Offset(oe, grandChild)) =>
1880+
GlobalLimitAndOffset(le, oe, grandChild)
1881+
case localLimit @ LocalLimit(le, Offset(oe, grandChild)) =>
1882+
val offset = oe.eval().asInstanceOf[Int]
1883+
if (offset == 0) {
1884+
localLimit.withNewChildren(Seq(grandChild))
1885+
} else {
1886+
Offset(oe, LocalLimit(Add(le, oe), grandChild))
1887+
}
1888+
}
1889+
}
1890+
18711891
/**
18721892
* Check if there any cartesian products between joins of any type in the optimized plan tree.
18731893
* Throw an error if a cartesian product is found without an explicit cross join specified.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup
136136
case _: Sort => empty(p)
137137
case _: GlobalLimit if !p.isStreaming => empty(p)
138138
case _: LocalLimit if !p.isStreaming => empty(p)
139+
case _: Offset => empty(p)
139140
case _: Repartition => empty(p)
140141
case _: RepartitionByExpression => empty(p)
141142
case _: RebalancePartitions => empty(p)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -970,6 +970,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
970970
case _: Sample => true
971971
case _: GlobalLimit => true
972972
case _: LocalLimit => true
973+
case _: Offset => true
973974
case _: Generate => true
974975
case _: Distinct => true
975976
case _: AppendColumns => true

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,10 +575,16 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
575575
// WINDOWS
576576
val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)
577577

578+
// OFFSET
579+
// - OFFSET 0 is the same as omitting the OFFSET clause
580+
val withOffset = withWindow.optional(offset) {
581+
Offset(typedVisit(offset), withWindow)
582+
}
583+
578584
// LIMIT
579585
// - LIMIT ALL is the same as omitting the LIMIT clause
580-
withWindow.optional(limit) {
581-
Limit(typedVisit(limit), withWindow)
586+
withOffset.optional(limit) {
587+
Limit(typedVisit(limit), withOffset)
582588
}
583589
}
584590

0 commit comments

Comments
 (0)