Skip to content

Commit ab1bcef

Browse files
beliefercloud-fan
authored andcommitted
[SPARK-39159][SQL] Add new Dataset API for Offset
### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes #36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent e336567 commit ab1bcef

File tree

4 files changed

+34
-14
lines changed

4 files changed

+34
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -433,13 +433,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
433433

434434
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)
435435

436-
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
437-
&& o.children.exists(_.isInstanceOf[Offset]) =>
438-
failAnalysis(
439-
s"""
440-
|The OFFSET clause is allowed in the LIMIT clause or be the outermost node,
441-
|but the OFFSET clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
442-
443436
case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)
444437

445438
case _: Union | _: SetOperation if operator.children.length > 1 =>

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -556,13 +556,6 @@ class AnalysisErrorSuite extends AnalysisTest {
556556
"The offset expression must be equal to or greater than 0, but got -1" :: Nil
557557
)
558558

559-
errorTest(
560-
"OFFSET clause in other node",
561-
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
562-
"The OFFSET clause is allowed in the LIMIT clause or be the outermost node," +
563-
" but the OFFSET clause found in: Filter." :: Nil
564-
)
565-
566559
errorTest(
567560
"the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue",
568561
testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)),

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2102,6 +2102,16 @@ class Dataset[T] private[sql](
21022102
Limit(Literal(n), logicalPlan)
21032103
}
21042104

2105+
/**
2106+
* Returns a new Dataset by skipping the first `m` rows.
2107+
*
2108+
* @group typedrel
2109+
* @since 3.4.0
2110+
*/
2111+
def offset(n: Int): Dataset[T] = withTypedPlan {
2112+
Offset(Literal(n), logicalPlan)
2113+
}
2114+
21052115
/**
21062116
* Returns a new Dataset containing union of rows in this Dataset and another Dataset.
21072117
*

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,30 @@ class DataFrameSuite extends QueryTest
605605
)
606606
}
607607

608+
test("offset") {
609+
checkAnswer(
610+
testData.offset(90),
611+
testData.collect().drop(90).toSeq)
612+
613+
checkAnswer(
614+
arrayData.toDF().offset(99),
615+
arrayData.collect().drop(99).map(r => Row.fromSeq(r.productIterator.toSeq)))
616+
617+
checkAnswer(
618+
mapData.toDF().offset(99),
619+
mapData.collect().drop(99).map(r => Row.fromSeq(r.productIterator.toSeq)))
620+
}
621+
622+
test("limit with offset") {
623+
checkAnswer(
624+
testData.limit(10).offset(5),
625+
testData.take(10).drop(5).toSeq)
626+
627+
checkAnswer(
628+
testData.offset(5).limit(10),
629+
testData.take(15).drop(5).toSeq)
630+
}
631+
608632
test("udf") {
609633
val foo = udf((a: Int, b: String) => a.toString + b)
610634

0 commit comments

Comments
 (0)