-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-41914][SQL] FileFormatWriter materializes AQE plan before accessing outputOrdering #39431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
64c6797
eb22ff8
a68d605
8414715
0ae3ce5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,10 +18,13 @@ | |
| package org.apache.spark.sql.execution.datasources | ||
|
|
||
| import org.apache.spark.sql.{QueryTest, Row} | ||
| import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} | ||
| import org.apache.spark.sql.execution.QueryExecution | ||
| import org.apache.spark.sql.execution.{QueryExecution, SortExec} | ||
| import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} | ||
| import org.apache.spark.sql.types.{IntegerType, StringType} | ||
| import org.apache.spark.sql.util.QueryExecutionListener | ||
|
|
||
| trait V1WriteCommandSuiteBase extends SQLTestUtils { | ||
|
|
@@ -52,8 +55,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils { | |
| } | ||
|
|
||
| /** | ||
| * Execute a write query and check ordering of the plan. Return the optimized logical write | ||
| * query plan. | ||
| * Execute a write query and check ordering of the plan. | ||
| */ | ||
| protected def executeAndCheckOrdering( | ||
| hasLogicalSort: Boolean, | ||
|
|
@@ -160,12 +162,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write | |
| |CREATE TABLE t(i INT, k STRING) USING PARQUET | ||
| |PARTITIONED BY (j INT) | ||
| |""".stripMargin) | ||
| // When planned write is disabled, even though the write plan is already sorted, | ||
| // the AQE node inserted on top of the write query will remove the original | ||
| // sort orders. So the ordering will not match. This issue does not exist when | ||
| // planned write is enabled, because AQE will be applied on top of the write | ||
| // command instead of on top of the child query plan. | ||
| executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = enabled) { | ||
| executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { | ||
| sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j") | ||
| } | ||
| } | ||
|
|
@@ -181,13 +178,111 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write | |
| |PARTITIONED BY (k STRING) | ||
| |""".stripMargin) | ||
| executeAndCheckOrdering( | ||
| hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = enabled) { | ||
| hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we still need the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what you mean, as in needed in this test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean, shall we remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no default value for |
||
| sql("INSERT INTO t SELECT * FROM t0 ORDER BY k") | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column") { | ||
| withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { | ||
| withPlannedWrite { enabled => | ||
| withTable("t") { | ||
| sql( | ||
| """ | ||
| |CREATE TABLE t(b INT, value STRING) USING PARQUET | ||
| |PARTITIONED BY (key INT) | ||
| |""".stripMargin) | ||
| executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { | ||
| sql( | ||
| """ | ||
| |INSERT INTO t | ||
| |SELECT b, value, key | ||
| |FROM testData JOIN testData2 ON key = a | ||
| |SORT BY key, value | ||
| |""".stripMargin) | ||
| } | ||
|
|
||
| // inspect the actually executed plan (that is different to executeAndCheckOrdering) | ||
| assert(FileFormatWriter.executedPlan.isDefined) | ||
| val executedPlan = FileFormatWriter.executedPlan.get | ||
|
|
||
| val plan = if (enabled) { | ||
| assert(executedPlan.isInstanceOf[WriteFilesExec]) | ||
| executedPlan.asInstanceOf[WriteFilesExec].child | ||
| } else { | ||
| executedPlan.transformDown { | ||
| case a: AdaptiveSparkPlanExec => a.executedPlan | ||
| } | ||
| } | ||
|
|
||
| // assert the outer most sort in the executed plan | ||
| assert(plan.collectFirst { | ||
| case s: SortExec => s | ||
| }.exists { | ||
| case SortExec(Seq( | ||
| SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), | ||
| SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) | ||
| ), false, _, _) => true | ||
| case _ => false | ||
| }, plan) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-41914: v1 write with AQE and in-partition sorted - string partition column") { | ||
| withPlannedWrite { enabled => | ||
| withTable("t") { | ||
| sql( | ||
| """ | ||
| |CREATE TABLE t(key INT, b INT) USING PARQUET | ||
| |PARTITIONED BY (value STRING) | ||
| |""".stripMargin) | ||
| executeAndCheckOrdering( | ||
| hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { | ||
| sql( | ||
| """ | ||
| |INSERT INTO t | ||
| |SELECT key, b, value | ||
| |FROM testData JOIN testData2 ON key = a | ||
| |SORT BY value, key | ||
| |""".stripMargin) | ||
| } | ||
|
|
||
| // inspect the actually executed plan (that is different to executeAndCheckOrdering) | ||
| assert(FileFormatWriter.executedPlan.isDefined) | ||
| val executedPlan = FileFormatWriter.executedPlan.get | ||
|
|
||
| val plan = if (enabled) { | ||
| assert(executedPlan.isInstanceOf[WriteFilesExec]) | ||
| executedPlan.asInstanceOf[WriteFilesExec].child | ||
| } else { | ||
| executedPlan.transformDown { | ||
| case a: AdaptiveSparkPlanExec => a.executedPlan | ||
| } | ||
| } | ||
|
|
||
| // assert the outer most sort in the executed plan | ||
| assert(plan.collectFirst { | ||
| case s: SortExec => s | ||
| }.map(s => (enabled, s)).exists { | ||
| case (false, SortExec(Seq( | ||
| SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), | ||
| SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) | ||
| ), false, _, _)) => true | ||
|
|
||
| // SPARK-40885: this bug removes the in-partition sort, which manifests here | ||
| case (true, SortExec(Seq( | ||
| SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you know why the sorting key is different when planned write is enabled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that is correctness bug SPARK-40885 discussed in #38356 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ulysses-you can you take a look at this bug? |
||
| ), false, _, _)) => true | ||
| case _ => false | ||
| }, plan) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("v1 write with null and empty string column values") { | ||
| withPlannedWrite { enabled => | ||
| withTempPath { path => | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we put all code changes inside if
writeFilesOptis empty ? ifwriteFilesOptis defined that means the write have been planned which does not have this issue.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.getOrElsealready does what you said, isn't it?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes,
materializeAdaptiveSparkPlanis applied onplanonly ifwriteFilesOptis undefined.