Skip to content

Commit 6064ab9

Browse files
committed
limit = 0 test case
1 parent 6d596fc commit 6064ab9

File tree

1 file changed

+20
-18
lines changed

1 file changed

+20
-18
lines changed

externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -152,20 +152,31 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
152152

153153
test("SparkDatasetHelper.executeArrowBatchCollect should return expect row count") {
154154
val returnSize = Seq(
155+
0, // spark optimizer guaranty the `limit != 0`, it's just for the sanity check
155156
7, // less than one partition
156157
10, // equal to one partition
157158
13, // between one and two partitions, run two jobs
158159
20, // equal to two partitions
159160
29, // between two and three partitions
160161
1000, // all partitions
161162
1001) // more than total row count
162-
// -1) // all
163163

164+
def runAndCheck(sparkPlan: SparkPlan, expectSize: Int): Unit = {
165+
val arrowBinary = SparkDatasetHelper.executeArrowBatchCollect(sparkPlan)
166+
val rows = KyuubiArrowConverters.fromBatchIterator(
167+
arrowBinary.iterator,
168+
sparkPlan.schema,
169+
"",
170+
KyuubiSparkContextHelper.dummyTaskContext())
171+
assert(rows.size == expectSize)
172+
}
173+
174+
val excludedRules = Seq(
175+
"org.apache.spark.sql.catalyst.optimizer.EliminateLimits",
176+
"org.apache.spark.sql.execution.adaptive.AQEPropagateEmptyRelation").mkString(",")
164177
withSQLConf(
165-
SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
166-
"org.apache.spark.sql.catalyst.optimizer.EliminateLimits",
167-
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key ->
168-
"org.apache.spark.sql.catalyst.optimizer.EliminateLimits") {
178+
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> excludedRules,
179+
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> excludedRules) {
169180
// aqe
170181
// outermost AdaptiveSparkPlanExec
171182
spark.range(1000)
@@ -185,19 +196,10 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
185196
SparkDatasetHelper.finalPhysicalPlan(headPlan.asInstanceOf[AdaptiveSparkPlanExec])
186197
assert(finalPhysicalPlan.isInstanceOf[CollectLimitExec])
187198
}
188-
189-
val arrowBinary = SparkDatasetHelper.executeArrowBatchCollect(df.queryExecution
190-
.executedPlan)
191-
192-
val rows = KyuubiArrowConverters.fromBatchIterator(
193-
arrowBinary.iterator,
194-
df.schema,
195-
"",
196-
KyuubiSparkContextHelper.dummyTaskContext())
197199
if (size > 1000) {
198-
assert(rows.size == 1000)
200+
runAndCheck(df.queryExecution.executedPlan, 1000)
199201
} else {
200-
assert(rows.size == size)
202+
runAndCheck(df.queryExecution.executedPlan, size)
201203
}
202204
}
203205

@@ -221,9 +223,9 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
221223
"",
222224
KyuubiSparkContextHelper.dummyTaskContext())
223225
if (size > 1000) {
224-
assert(rows.size == 1000)
226+
runAndCheck(df.queryExecution.executedPlan, 1000)
225227
} else {
226-
assert(rows.size == size)
228+
runAndCheck(df.queryExecution.executedPlan, size)
227229
}
228230
}
229231
}

0 commit comments

Comments
 (0)