Skip to content

Commit 5b8444a

Browse files
maropudongjoon-hyun
authored andcommitted
[SPARK-32564][SQL][TEST] Inject data statistics to simulate plan generation on actual TPCDS data
### What changes were proposed in this pull request? `TPCDSQuerySuite` currently computes plans with empty TPCDS tables, then checks if plans can be generated correctly. But, the generated plans can be different from actual ones because the input tables are empty (e.g., the plans always use broadcast-hash joins, but actual ones use sort-merge joins for larger tables). To mitigate the issue, this PR defines data statistics constants extracted from generated TPCDS data in `TPCDSTableStats`, then injects the statistics via `spark.sessionState.catalog.alterTableStats` when defining TPCDS tables in `TPCDSQuerySuite`. Please see a link below about how to extract the table statistics: - https://gist.github.com/maropu/f553d32c323ee803d39e2f7fa0b5a8c3 For example, the generated plans of TPCDS `q2` are different with/without this fix: ``` ==== w/ this fix: q2 ==== == Physical Plan == * Sort (43) +- Exchange (42) +- * Project (41) +- * SortMergeJoin Inner (40) :- * Sort (28) : +- Exchange (27) : +- * Project (26) : +- * BroadcastHashJoin Inner BuildRight (25) : :- * HashAggregate (19) : : +- Exchange (18) : : +- * HashAggregate (17) : : +- * Project (16) : : +- * BroadcastHashJoin Inner BuildRight (15) : : :- Union (9) : : : :- * Project (4) : : : : +- * Filter (3) : : : : +- * ColumnarToRow (2) : : : : +- Scan parquet default.web_sales (1) : : : +- * Project (8) : : : +- * Filter (7) : : : +- * ColumnarToRow (6) : : : +- Scan parquet default.catalog_sales (5) : : +- BroadcastExchange (14) : : +- * Project (13) : : +- * Filter (12) : : +- * ColumnarToRow (11) : : +- Scan parquet default.date_dim (10) : +- BroadcastExchange (24) : +- * Project (23) : +- * Filter (22) : +- * ColumnarToRow (21) : +- Scan parquet default.date_dim (20) +- * Sort (39) +- Exchange (38) +- * Project (37) +- * BroadcastHashJoin Inner BuildRight (36) :- * HashAggregate (30) : +- ReusedExchange (29) +- BroadcastExchange (35) +- * Project (34) +- * Filter (33) +- * ColumnarToRow (32) +- Scan parquet default.date_dim (31) ==== w/o this fix: q2 ==== == Physical Plan == * Sort (40) +- Exchange (39) +- * Project (38) +- * BroadcastHashJoin Inner BuildRight (37) :- * Project (26) : +- * BroadcastHashJoin Inner BuildRight (25) : :- * HashAggregate (19) : : +- Exchange (18) : : +- * HashAggregate (17) : : +- * Project (16) : : +- * BroadcastHashJoin Inner BuildRight (15) : : :- Union (9) : : : :- * Project (4) : : : : +- * Filter (3) : : : : +- * ColumnarToRow (2) : : : : +- Scan parquet default.web_sales (1) : : : +- * Project (8) : : : +- * Filter (7) : : : +- * ColumnarToRow (6) : : : +- Scan parquet default.catalog_sales (5) : : +- BroadcastExchange (14) : : +- * Project (13) : : +- * Filter (12) : : +- * ColumnarToRow (11) : : +- Scan parquet default.date_dim (10) : +- BroadcastExchange (24) : +- * Project (23) : +- * Filter (22) : +- * ColumnarToRow (21) : +- Scan parquet default.date_dim (20) +- BroadcastExchange (36) +- * Project (35) +- * BroadcastHashJoin Inner BuildRight (34) :- * HashAggregate (28) : +- ReusedExchange (27) +- BroadcastExchange (33) +- * Project (32) +- * Filter (31) +- * ColumnarToRow (30) +- Scan parquet default.date_dim (29) ``` This comes from the cloud-fan comment: #29270 (comment) ### Why are the changes needed? For better test coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #29384 from maropu/AddTPCDSTableStats. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 7b6e1d5 commit 5b8444a

File tree

3 files changed

+517
-2
lines changed

3 files changed

+517
-2
lines changed

sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,17 @@ class TPCDSQuerySuite extends BenchmarkQueryTest with TPCDSSchema {
4646
"q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
4747
"q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")
4848

49+
val sqlConfgs = Seq(
50+
SQLConf.CBO_ENABLED.key -> "true",
51+
SQLConf.PLAN_STATS_ENABLED.key -> "true",
52+
SQLConf.JOIN_REORDER_ENABLED.key -> "true"
53+
)
54+
4955
tpcdsQueries.foreach { name =>
5056
val queryString = resourceToString(s"tpcds/$name.sql",
5157
classLoader = Thread.currentThread().getContextClassLoader)
5258
test(name) {
53-
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
59+
withSQLConf(sqlConfgs: _*) {
5460
// check the plans can be properly generated
5561
val plan = sql(queryString).queryExecution.executedPlan
5662
checkGeneratedCode(plan)
@@ -69,7 +75,7 @@ class TPCDSQuerySuite extends BenchmarkQueryTest with TPCDSSchema {
6975
val queryString = resourceToString(s"tpcds-v2.7.0/$name.sql",
7076
classLoader = Thread.currentThread().getContextClassLoader)
7177
test(s"$name-v2.7") {
72-
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
78+
withSQLConf(sqlConfgs: _*) {
7379
// check the plans can be properly generated
7480
val plan = sql(queryString).queryExecution.executedPlan
7581
checkGeneratedCode(plan)

sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.sql.catalyst.TableIdentifier
21+
2022
trait TPCDSSchema {
2123

2224
private val tableColumns = Map(
@@ -255,5 +257,9 @@ trait TPCDSSchema {
255257
|USING $format
256258
|${options.mkString("\n")}
257259
""".stripMargin)
260+
261+
// To simulate plan generation on actual TPCDS data, injects data stats here
262+
spark.sessionState.catalog.alterTableStats(
263+
TableIdentifier(tableName), Some(TPCDSTableStats.sf100TableStats(tableName)))
258264
}
259265
}

0 commit comments

Comments
 (0)