Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ TakeOrderedAndProject (87)
(1) Scan parquet default.store_sales
Output [4]: [ss_sold_date_sk#1, ss_customer_sk#2, ss_ext_discount_amt#3, ss_ext_list_price#4]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/store_sales]
Location [not included in comparison]/{warehouse_dir}/store_sales]
PushedFilters: [IsNotNull(ss_customer_sk), IsNotNull(ss_sold_date_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int,ss_ext_discount_amt:decimal(7,2),ss_ext_list_price:decimal(7,2)>

Expand All @@ -105,7 +105,7 @@ Condition : (isnotnull(ss_customer_sk#2) AND isnotnull(ss_sold_date_sk#1))
(4) Scan parquet default.date_dim
Output [2]: [d_date_sk#5, d_year#6]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/date_dim]
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>

Expand Down Expand Up @@ -140,7 +140,7 @@ Arguments: [ss_customer_sk#2 ASC NULLS FIRST], false, 0
(12) Scan parquet default.customer
Output [8]: [c_customer_sk#9, c_customer_id#10, c_first_name#11, c_last_name#12, c_preferred_cust_flag#13, c_birth_country#14, c_login#15, c_email_address#16]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/customer]
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string,c_birth_country:string,c_login:string,c_email_address:string>

Expand Down Expand Up @@ -201,7 +201,7 @@ Arguments: [customer_id#22 ASC NULLS FIRST], false, 0
(25) Scan parquet default.store_sales
Output [4]: [ss_sold_date_sk#1, ss_customer_sk#2, ss_ext_discount_amt#3, ss_ext_list_price#4]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/store_sales]
Location [not included in comparison]/{warehouse_dir}/store_sales]
PushedFilters: [IsNotNull(ss_customer_sk), IsNotNull(ss_sold_date_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int,ss_ext_discount_amt:decimal(7,2),ss_ext_list_price:decimal(7,2)>

Expand All @@ -215,7 +215,7 @@ Condition : (isnotnull(ss_customer_sk#2) AND isnotnull(ss_sold_date_sk#1))
(28) Scan parquet default.date_dim
Output [2]: [d_date_sk#5, d_year#6]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/date_dim]
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2002), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>

Expand Down Expand Up @@ -301,7 +301,7 @@ Input [5]: [customer_id#22, year_total#23, customer_id#31, customer_preferred_cu
(47) Scan parquet default.web_sales
Output [4]: [ws_sold_date_sk#35, ws_bill_customer_sk#36, ws_ext_discount_amt#37, ws_ext_list_price#38]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/web_sales]
Location [not included in comparison]/{warehouse_dir}/web_sales]
PushedFilters: [IsNotNull(ws_bill_customer_sk), IsNotNull(ws_sold_date_sk)]
ReadSchema: struct<ws_sold_date_sk:int,ws_bill_customer_sk:int,ws_ext_discount_amt:decimal(7,2),ws_ext_list_price:decimal(7,2)>

Expand Down Expand Up @@ -394,7 +394,7 @@ Input [6]: [customer_id#22, year_total#23, customer_preferred_cust_flag#32, year
(68) Scan parquet default.web_sales
Output [4]: [ws_sold_date_sk#35, ws_bill_customer_sk#36, ws_ext_discount_amt#37, ws_ext_list_price#38]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/web_sales]
Location [not included in comparison]/{warehouse_dir}/web_sales]
PushedFilters: [IsNotNull(ws_bill_customer_sk), IsNotNull(ws_sold_date_sk)]
ReadSchema: struct<ws_sold_date_sk:int,ws_bill_customer_sk:int,ws_ext_discount_amt:decimal(7,2),ws_ext_list_price:decimal(7,2)>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ TakeOrderedAndProject (73)
(1) Scan parquet default.customer
Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer]
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string,c_birth_country:string,c_login:string,c_email_address:string>

Expand All @@ -91,7 +91,7 @@ Condition : (isnotnull(c_customer_sk#1) AND isnotnull(c_customer_id#2))
(4) Scan parquet default.store_sales
Output [4]: [ss_sold_date_sk#9, ss_customer_sk#10, ss_ext_discount_amt#11, ss_ext_list_price#12]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/store_sales]
Location [not included in comparison]/{warehouse_dir}/store_sales]
PushedFilters: [IsNotNull(ss_customer_sk), IsNotNull(ss_sold_date_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int,ss_ext_discount_amt:decimal(7,2),ss_ext_list_price:decimal(7,2)>

Expand All @@ -118,7 +118,7 @@ Input [12]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_
(10) Scan parquet default.date_dim
Output [2]: [d_date_sk#14, d_year#15]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/date_dim]
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>

Expand Down Expand Up @@ -167,7 +167,7 @@ Condition : (isnotnull(year_total#22) AND (year_total#22 > 0.00))
(20) Scan parquet default.customer
Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer]
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string,c_birth_country:string,c_login:string,c_email_address:string>

Expand All @@ -193,7 +193,7 @@ Input [12]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_
(26) Scan parquet default.date_dim
Output [2]: [d_date_sk#14, d_year#15]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/date_dim]
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2002), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>

Expand Down Expand Up @@ -251,7 +251,7 @@ Input [5]: [customer_id#21, year_total#22, customer_id#28, customer_preferred_cu
(38) Scan parquet default.customer
Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer]
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string,c_birth_country:string,c_login:string,c_email_address:string>

Expand All @@ -265,7 +265,7 @@ Condition : (isnotnull(c_customer_sk#1) AND isnotnull(c_customer_id#2))
(41) Scan parquet default.web_sales
Output [4]: [ws_sold_date_sk#32, ws_bill_customer_sk#33, ws_ext_discount_amt#34, ws_ext_list_price#35]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/web_sales]
Location [not included in comparison]/{warehouse_dir}/web_sales]
PushedFilters: [IsNotNull(ws_bill_customer_sk), IsNotNull(ws_sold_date_sk)]
ReadSchema: struct<ws_sold_date_sk:int,ws_bill_customer_sk:int,ws_ext_discount_amt:decimal(7,2),ws_ext_list_price:decimal(7,2)>

Expand Down Expand Up @@ -343,7 +343,7 @@ Input [6]: [customer_id#21, year_total#22, customer_preferred_cust_flag#29, year
(58) Scan parquet default.customer
Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer]
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string,c_birth_country:string,c_login:string,c_email_address:string>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
private val referenceRegex = "#\\d+".r
private val normalizeRegex = "#\\d+L?".r

private val clsName = this.getClass.getCanonicalName

def goldenFilePath: String

private def getDirForTest(name: String): File = {
Expand All @@ -102,8 +104,8 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {

private def isApproved(dir: File, actualSimplifiedPlan: String): Boolean = {
val file = new File(dir, "simplified.txt")
val approved = FileUtils.readFileToString(file, StandardCharsets.UTF_8)
approved == actualSimplifiedPlan
val expected = FileUtils.readFileToString(file, StandardCharsets.UTF_8)
expected == actualSimplifiedPlan
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address the comment from #29270 (comment)

}

/**
Expand All @@ -115,7 +117,7 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
* @param explain the full explain output; this is saved to help debug later as the simplified
* plan is not too useful for debugging
*/
private def generateApprovedPlanFile(plan: SparkPlan, name: String, explain: String): Unit = {
private def generateGoldenFile(plan: SparkPlan, name: String, explain: String): Unit = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address the comment from #29270 (comment)

val dir = getDirForTest(name)
val simplified = getSimplifiedPlan(plan)
val foundMatch = dir.exists() && isApproved(dir, simplified)
Expand Down Expand Up @@ -207,7 +209,7 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
* WholeStageCodegen
* Project [c_customer_id]
*/
def getSimplifiedPlan(node: SparkPlan, depth: Int): String = {
def simplifyNode(node: SparkPlan, depth: Int): String = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address the comment from #29270 (comment)

val padding = " " * depth
var thisNode = node.nodeName
if (node.references.nonEmpty) {
Expand All @@ -220,19 +222,24 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
if (id > 0) {
thisNode += s" #$id"
}
val childrenSimplified = node.children.map(getSimplifiedPlan(_, depth + 1))
val subqueriesSimplified = node.subqueries.map(getSimplifiedPlan(_, depth + 1))
val childrenSimplified = node.children.map(simplifyNode(_, depth + 1))
val subqueriesSimplified = node.subqueries.map(simplifyNode(_, depth + 1))
s"$padding$thisNode\n${subqueriesSimplified.mkString("")}${childrenSimplified.mkString("")}"
}

getSimplifiedPlan(plan, 0)
simplifyNode(plan, 0)
}

private def normalizeIds(query: String): String = {
private def normalizeIds(plan: String): String = {
val map = new mutable.HashMap[String, String]()
normalizeRegex.findAllMatchIn(query).map(_.toString)
normalizeRegex.findAllMatchIn(plan).map(_.toString)
.foreach(map.getOrElseUpdate(_, (map.size + 1).toString))
normalizeRegex.replaceAllIn(query, regexMatch => s"#${map(regexMatch.toString)}")
normalizeRegex.replaceAllIn(plan, regexMatch => s"#${map(regexMatch.toString)}")
}

private def normalizeLocation(plan: String): String = {
plan.replaceAll(s"Location.*$clsName/",
"Location [not included in comparison]/{warehouse_dir}/")
}

/**
Expand All @@ -244,10 +251,10 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
classLoader = Thread.currentThread().getContextClassLoader)
val qe = sql(queryString).queryExecution
val plan = qe.executedPlan
val explain = normalizeIds(qe.explainString(FormattedMode))
val explain = normalizeLocation(normalizeIds(qe.explainString(FormattedMode)))

if (regenerateGoldenFiles) {
generateApprovedPlanFile(plan, query + suffix, explain)
generateGoldenFile(plan, query + suffix, explain)
} else {
checkWithApproved(plan, query + suffix, explain)
}
Expand Down