Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ case class AdaptiveSparkPlanExec(
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
}

def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity)

private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
if (isFinalPlan) return currentPhysicalPlan

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{SerializableConfiguration, Utils}

Expand All @@ -62,6 +63,11 @@ object FileFormatWriter extends Logging {
*/
private[sql] var outputOrderingMatched: Boolean = false

/**
* A variable used in tests to check the final executed plan.
*/
private[sql] var executedPlan: Option[SparkPlan] = None

// scalastyle:off argcount
/**
* Basic work flow of this command is:
Expand Down Expand Up @@ -138,9 +144,21 @@ object FileFormatWriter extends Logging {
val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan)

// SPARK-40588: when planned writing is disabled and AQE is enabled,
// plan contains an AdaptiveSparkPlanExec, which does not know
// its final plan's ordering, so we have to materialize that plan first
// it is fine to use plan further down as the final plan is cached in that plan
def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {
case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan))
}

// the sort order doesn't matter
// Use the output ordering from the original plan before adding the empty2null projection.
val actualOrdering = writeFilesOpt.map(_.child).getOrElse(plan).outputOrdering.map(_.child)
val actualOrdering = writeFilesOpt.map(_.child)
.getOrElse(materializeAdaptiveSparkPlan(plan))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put all code changes inside if writeFilesOpt is empty ? if writeFilesOpt is defined that means the write have been planned which does not have this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.getOrElse already does what you said, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, materializeAdaptiveSparkPlan is applied on plan only if writeFilesOpt is undefined.

.outputOrdering.map(_.child)
val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering)

SQLExecution.checkSQLExecutionId(sparkSession)
Expand Down Expand Up @@ -198,19 +216,24 @@ object FileFormatWriter extends Logging {
}

writeAndCommit(job, description, committer) {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
(empty2NullPlan.execute(), None)
val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) {
(empty2NullPlan, None)
} else {
val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, outputSpec)
val concurrentOutputWriterSpec = createConcurrentOutputWriterSpec(
sparkSession, sortPlan, sortColumns)
if (concurrentOutputWriterSpec.isDefined) {
(empty2NullPlan.execute(), concurrentOutputWriterSpec)
(empty2NullPlan, concurrentOutputWriterSpec)
} else {
(sortPlan.execute(), concurrentOutputWriterSpec)
(sortPlan, concurrentOutputWriterSpec)
}
}

// In testing, this is the only way to get hold of the actually executed plan written to file
if (Utils.isTesting) executedPlan = Some(planToExecute)

val rdd = planToExecute.execute()

// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
// partition rdd to make sure we at least set up one write task to write the metadata.
val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) {
Expand Down Expand Up @@ -281,6 +304,9 @@ object FileFormatWriter extends Logging {
val committer = writeFilesSpec.committer
val description = writeFilesSpec.description

// In testing, this is the only way to get hold of the actually executed plan written to file
if (Utils.isTesting) executedPlan = Some(planForWrites)

writeAndCommit(job, description, committer) {
val rdd = planForWrites.executeWrite(writeFilesSpec)
val ret = new Array[WriteTaskResult](rdd.partitions.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.{QueryExecution, SortExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.util.QueryExecutionListener

trait V1WriteCommandSuiteBase extends SQLTestUtils {
Expand Down Expand Up @@ -52,8 +55,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
}

/**
* Execute a write query and check ordering of the plan. Return the optimized logical write
* query plan.
* Execute a write query and check ordering of the plan.
*/
protected def executeAndCheckOrdering(
hasLogicalSort: Boolean,
Expand Down Expand Up @@ -160,12 +162,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write
|CREATE TABLE t(i INT, k STRING) USING PARQUET
|PARTITIONED BY (j INT)
|""".stripMargin)
// When planned write is disabled, even though the write plan is already sorted,
// the AQE node inserted on top of the write query will remove the original
// sort orders. So the ordering will not match. This issue does not exist when
// planned write is enabled, because AQE will be applied on top of the write
// command instead of on top of the child query plan.
executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = enabled) {
executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) {
sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j")
}
}
Expand All @@ -181,13 +178,111 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write
|PARTITIONED BY (k STRING)
|""".stripMargin)
executeAndCheckOrdering(
hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need the orderingMatched parameter if it's always true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean, as in needed in this test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, shall we remove orderingMatched from the method executeAndCheckOrdering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no default value for orderingMatched and two other unit tests still use orderingMatched=enabled.

sql("INSERT INTO t SELECT * FROM t0 ORDER BY k")
}
}
}
}

test("SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
withPlannedWrite { enabled =>
withTable("t") {
sql(
"""
|CREATE TABLE t(b INT, value STRING) USING PARQUET
|PARTITIONED BY (key INT)
|""".stripMargin)
executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) {
sql(
"""
|INSERT INTO t
|SELECT b, value, key
|FROM testData JOIN testData2 ON key = a
|SORT BY key, value
|""".stripMargin)
}

// inspect the actually executed plan (that is different to executeAndCheckOrdering)
assert(FileFormatWriter.executedPlan.isDefined)
val executedPlan = FileFormatWriter.executedPlan.get

val plan = if (enabled) {
assert(executedPlan.isInstanceOf[WriteFilesExec])
executedPlan.asInstanceOf[WriteFilesExec].child
} else {
executedPlan.transformDown {
case a: AdaptiveSparkPlanExec => a.executedPlan
}
}

// assert the outer most sort in the executed plan
assert(plan.collectFirst {
case s: SortExec => s
}.exists {
case SortExec(Seq(
SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _),
SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _)
), false, _, _) => true
case _ => false
}, plan)
}
}
}
}

test("SPARK-41914: v1 write with AQE and in-partition sorted - string partition column") {
withPlannedWrite { enabled =>
withTable("t") {
sql(
"""
|CREATE TABLE t(key INT, b INT) USING PARQUET
|PARTITIONED BY (value STRING)
|""".stripMargin)
executeAndCheckOrdering(
hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) {
sql(
"""
|INSERT INTO t
|SELECT key, b, value
|FROM testData JOIN testData2 ON key = a
|SORT BY value, key
|""".stripMargin)
}

// inspect the actually executed plan (that is different to executeAndCheckOrdering)
assert(FileFormatWriter.executedPlan.isDefined)
val executedPlan = FileFormatWriter.executedPlan.get

val plan = if (enabled) {
assert(executedPlan.isInstanceOf[WriteFilesExec])
executedPlan.asInstanceOf[WriteFilesExec].child
} else {
executedPlan.transformDown {
case a: AdaptiveSparkPlanExec => a.executedPlan
}
}

// assert the outer most sort in the executed plan
assert(plan.collectFirst {
case s: SortExec => s
}.map(s => (enabled, s)).exists {
case (false, SortExec(Seq(
SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _),
SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _)
), false, _, _)) => true

// SPARK-40885: this bug removes the in-partition sort, which manifests here
case (true, SortExec(Seq(
SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why the sorting key is different when planned write is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is correctness bug SPARK-40885 discussed in #38356

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you can you take a look at this bug?

), false, _, _)) => true
case _ => false
}, plan)
}
}
}

test("v1 write with null and empty string column values") {
withPlannedWrite { enabled =>
withTempPath { path =>
Expand Down