Skip to content

Conversation

@zzzzming95
Copy link
Contributor

What changes were proposed in this pull request?

The following codes:

      Seq((20, 30, "partition"), (15, 20, "partition"),
        (30, 70, "partition"), (18, 40, "partition"))
        .toDF("id", "sort_col", "p")
        .repartition(1)
        .sortWithinPartitions("p", "sort_col")
        .write
        .partitionBy("p")
        .parquet(f.getAbsolutePath)

When the 'Sort'+'partitionBy' operator is used, because 'partitionBy' will generate a new Sort("partition_col"), the logical plan is as follows:

Sort("p")
  Project("id", "sort_col", "p")
    Sort("p", "sort_col")
      Input

However, due to ` org.apache.spark.sql.catalyst.optimizer.EliminateSorts' will prune the operators irrelevant to the output, which will cause 'Sort ("p", "sort_col")' to be pruned, but in fact it should take effect.

This causes Sort to fail in various scenarios, as shown in the following SQL:

set spark.hadoop.hive.exec.dynamici.partition=true;
set spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict;

// this sql sort with partition filed (`dt`) and data filed (`name`), but sort with `name` can not work
insert overwrite table sort_table partition(dt) select id,name,dt from test_table order by name,dt;

The scenario covered by this issue is that 'Sort' is the last 'Transform' operator of the job. When there are other 'Transform' operators after the 'Sort', the Sort should not be effective.

Why are the changes needed?

fix bug

Does this PR introduce any user-facing change?

No

How was this patch tested?

add new UT.

@github-actions github-actions bot added the SQL label Oct 23, 2022
@zzzzming95 zzzzming95 changed the title SPARK-40885 [SPARK-40885] When Sort is the last 'Transform' operator, it may not take effect Oct 23, 2022
@zzzzming95 zzzzming95 changed the title [SPARK-40885] When Sort is the last 'Transform' operator, it may not take effect [SPARK-40885] Sort may not take effect when it is the last 'Transform' operator Oct 23, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@zzzzming95
Copy link
Contributor Author

@cloud-fan @HyukjinKwon @dongjoon-hyun @MaxGekk

Can you please review this PR?

@EnricoMi
Copy link
Contributor

Should the outer Sort("p") better not be added in the first place, as the inner Sort("p", "sort_col") provides this order already? See #38358.

@cloud-fan
Copy link
Contributor

Should the outer Sort("p") better not be added in the first place, as the inner Sort("p", "sort_col") provides this order already?

+1, we should figure out why this was happening. cc @allisonwang-db @ulysses-you

@ulysses-you
Copy link
Contributor

The reason is convertEmptyToNull added a new project on the top of the original plan, then the output ordering of the plan does not match the required ordering of dynamic partition.

val empty2NullPlan = if (hasEmpty2Null) {
query
} else {
val projectList = convertEmptyToNull(query.output, write.partitionColumns)
if (projectList.isEmpty) query else Project(projectList, query)
}
assert(empty2NullPlan.output.length == query.output.length)
val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output))
// Rewrite the attribute references in the required ordering to use the new output.
val requiredOrdering = write.requiredOrdering.map(_.transform {
case a: Attribute => attrMap.getOrElse(a, a)
}.asInstanceOf[SortOrder])
val outputOrdering = query.outputOrdering
// Check if the ordering is already matched to ensure the idempotency of the rule.
val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering)

This issue should only affect partition column who is string type. We should use the original plan to get the output ordering directly.

@EnricoMi
Copy link
Contributor

EnricoMi commented Oct 31, 2022

The given example has requiredOrdering with ["p"] and outputOrdering with ["p", "sort_col"]. Therefore, isOrderingMatched(requiredOrdering, outputOrdering) should be true, but returns false.

The reason is that canonicalized "p" in requiredOrdering is "none#12" whereas canonicalized "p" in outputOrdering is "none#19", hence semanticEquals returns false:

final def semanticEquals(other: Expression): Boolean =
deterministic && other.deterministic && canonicalized == other.canonicalized

override def equals(other: Any): Boolean = other match {
case ar: AttributeReference =>
name == ar.name && dataType == ar.dataType && nullable == ar.nullable &&
metadata == ar.metadata && exprId == ar.exprId && qualifier == ar.qualifier
case _ => false
}

The id in exprId differ (12 vs. 19) while jvmId is identical in both expressions:

case class ExprId(id: Long, jvmId: UUID) {
override def equals(other: Any): Boolean = other match {
case ExprId(id, jvmId) => this.id == id && this.jvmId == jvmId
case _ => false
}

@EnricoMi
Copy link
Contributor

We should use the original plan to get the output ordering directly.

Projection does not modify outputOrdering, in fact Project extends OrderPreservingUnaryNode:

abstract class OrderPreservingUnaryNode extends UnaryNode {
override final def outputOrdering: Seq[SortOrder] = child.outputOrdering
}

So Project(projection, plan).outputOrdering is already identical to plan.outputOrdering, as suggested.

@ulysses-you
Copy link
Contributor

It's much less complicated.. just after pull out empty2NullPlan, p becomes empty2null(p) so output ordering (p) does not match required ordering empty2null(p).

@EnricoMi
Copy link
Contributor

EnricoMi commented Oct 31, 2022

But val outputOrdering = query.outputOrdering, empty2NullPlan is not involved in outputOrdering:

However, requiredOrdering incorporates empty2NullPlan.output:

// Rewrite the attribute references in the required ordering to use the new output.
val requiredOrdering = write.requiredOrdering.map(_.transform {
case a: Attribute => attrMap.getOrElse(a, a)
}.asInstanceOf[SortOrder])

I think that should simply read:

val requiredOrdering = write.requiredOrdering

With that, isOrderingMatched(requiredOrdering, outputOrdering) turns true for the example given in the PR description.

@EnricoMi
Copy link
Contributor

EnricoMi commented Nov 1, 2022

@allisonwang-db can you elaborate on mapping write.requiredOrdering to the projected columns via attrMap that you introduced in f98f9f8? Was that existing code moved into V1Writes, or was that logic added.

// Rewrite the attribute references in the required ordering to use the new output.
val requiredOrdering = write.requiredOrdering.map(_.transform {
case a: Attribute => attrMap.getOrElse(a, a)
}.asInstanceOf[SortOrder])

@zzzzming95
Copy link
Contributor Author

zzzzming95 commented Nov 3, 2022

@allisonwang-db can you elaborate on mapping write.requiredOrdering to the projected columns via attrMap that you introduced in f98f9f8? Was that existing code moved into V1Writes, or was that logic added.

// Rewrite the attribute references in the required ordering to use the new output.
val requiredOrdering = write.requiredOrdering.map(_.transform {
case a: Attribute => attrMap.getOrElse(a, a)
}.asInstanceOf[SortOrder])

@EnricoMi

In this case ,this code will take effect

    val df1 = Seq((20, "partition"), (15, "partition"), (30, "partition"), (18, "partition"))
      .toDF("id", "P")
      .repartition(1)
      .sortWithinPartitions("p", "id")    // lowercase p
      df1.write
      .partitionBy("p")   //lowercase p
      .csv("file:///Users/shezhiming/tmp/csv_data/sort_path7/")

// Do not insert logical sort when concurrent output writers are enabled.
Seq.empty
} else {
// We should first sort by dynamic partition columns, then bucket id, and finally sorting
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the outer Sort("p") better not be added in the first place, as the inner Sort("p", "sort_col") provides this order already? See #38358.

@EnricoMi

There is a constraint on the order of the sorting fields. If outer Sort("p") added in the first place., will this constraint be broken, will it affect the efficiency or other?

@EnricoMi
Copy link
Contributor

EnricoMi commented Nov 3, 2022

I had another deeper look into this issue. The V1Writes rule introduced in Spark 3.4 adds the empty2null to all nullable string partition columns:

private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = {
val hasEmpty2Null = query.exists(p => hasEmptyToNull(p.expressions))
val empty2NullPlan = if (hasEmpty2Null) {
query
} else {
val projectList = convertEmptyToNull(query.output, write.partitionColumns)
if (projectList.isEmpty) query else Project(projectList, query)
}
assert(empty2NullPlan.output.length == query.output.length)
val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output))
// Rewrite the attribute references in the required ordering to use the new output.
val requiredOrdering = write.requiredOrdering.map(_.transform {
case a: Attribute => attrMap.getOrElse(a, a)
}.asInstanceOf[SortOrder])
val outputOrdering = query.outputOrdering
// Check if the ordering is already matched to ensure the idempotency of the rule.
val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering)
if (orderingMatched) {
empty2NullPlan
} else {
Sort(requiredOrdering, global = false, empty2NullPlan)
}
}
}

The modified V1WriteCommand then has a modified write.requiredOrdering but the old write.query.outputOrdering, which then do not match any more. In FileFormatWriter, the outer sort will be added because of that mismatch:

try {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
(empty2NullPlan.execute(), None)
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = bindReferences(
requiredOrdering.map(SortOrder(_, Ascending)), finalOutputSpec.outputColumns)
val sortPlan = SortExec(
orderingExpr,
global = false,
child = empty2NullPlan)
val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters
val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty
if (concurrentWritersEnabled) {
(empty2NullPlan.execute(),
Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())))
} else {
(sortPlan.execute(), None)
}
}

The inner Sort will be optimized away because it is logically not needed. Result is:

*(1) Sort [p#39 ASC NULLS FIRST], false, 0
+- *(1) Project [id#30, sort_col#31, empty2null(p#32) AS p#39]
   +- ShuffleQueryStage 0
      +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=56]
         +- LocalTableScan [id#30, sort_col#31, p#32]

Either FileFormatWriter gets the ability to see that whatever empty2null references is part of the ordering so it does not want to add the outer Sort.

Or, FileFormatWriter and the V1Writes rule do not expect any ordering when no bucketing spec is given.
Why would you want to sort the partition by the partition columns when there are no bucket spec and bucket sort?

--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -140,10 +140,16 @@ object FileFormatWriter extends Logging {
       statsTrackers = statsTrackers
     )
 
-    // We should first sort by dynamic partition columns, then bucket id, and finally sorting
-    // columns.
-    val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
-        writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
+    // Only require order when bucket spec is given
+    val requiredOrdering = if (writerBucketSpec.isEmpty) {
+      Seq.empty
+    } else {
+      // We should first sort by dynamic partition columns, then bucket id, and finally sorting
+      // columns.
+      partitionColumns.drop(numStaticPartitionCols) ++ writerBucketSpec.map(_.bucketIdExpression) ++
+        sortColumns
+    }
+
     // the sort order doesn't matter
     // Use the output ordering from the original plan before adding the empty2null projection.
     val actualOrdering = plan.outputOrdering.map(_.child)
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -172,8 +172,10 @@ object V1WritesUtils {
     // Static partition must to be ahead of dynamic partition
     val dynamicPartitionColumns = partitionColumns.drop(numStaticPartitionCols)
 
-    if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) {
-      // Do not insert logical sort when concurrent output writers are enabled.
+    if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty
+      || writerBucketSpec.isEmpty) {
+      // Do not insert logical sort when concurrent output writers are enabled
+      // or no bucket spec exists.
       Seq.empty
     } else {
       // We should first sort by dynamic partition columns, then bucket id, and finally sorting

Then, the query is:

*(1) Project [id#37, sort_col#38, empty2null(p#39) AS p#46]
+- *(1) Sort [p#39 ASC NULLS FIRST, sort_col#38 ASC NULLS FIRST], false, 0
   +- ShuffleQueryStage 0
      +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=56]
         +- LocalTableScan [id#37, sort_col#38, p#39]

.repartition(1)
.sortWithinPartitions("p", "sort_col")
.write
.partitionBy("p")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very sure what the problem is. partitionBy("p") requires sorting the data by p per partition. It should already be satisfied and we don't need to add it.

Copy link
Contributor

@EnricoMi EnricoMi Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionBy("p") requires sorting the data by p per partition.

Why does write.partitionBy("p") require sorting the data by p per partition? I understand why df.groupBy($"p") requires in-partition order by p (GroupedIterator). But write.partitionBy("p")?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very sure what the problem is.

See #38356 (comment):

The V1Writes rule introduced in Spark 3.4 adds the empty2null to all nullable string partition columns. The modified V1WriteCommand then has a modified write.requiredOrdering but the old write.query.outputOrdering, which then do not match any more. In FileFormatWriter, the outer sort will be added because of that mismatch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems Project shouldn't extend OrderPreservingUnaryNode, as it needs to rewrite the child output ordering based on the project list. cc @ulysses-you @wangyum

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we can rewrite the ordering to preverse the attribute before alias like AliasAwareOutputExpression. But empty2null is special, it is not just an alias. Since we pull out the empty2null, we should tell project the ordering of empty2null(p) is same with p. One idea may be we should not pull out empty2null and leave it at original place..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be aware that empty2null(p) does not preserve any ordering of p, e.g. it does not preserve NULL LAST or ordering by p and x.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only add empty2null for string type partition columns, maybe we shouldn't optimize this case anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding empty2null throws null values and empty values into same partition, and the user has no way to make Spark treat them as distinct values. But changing this smells like a breaking change, unless some config allows to bring back the current behaviour.

@EnricoMi
Copy link
Contributor

EnricoMi commented Jan 5, 2023

The redundant outer Sort breaks in-partition order when spilling occurs, similar to #38358 (SPARK-40588).

See SPARK-41914. This affects not only string type partition columns.

Avoiding that Sort will fix SPARK-41914 and SPARK-40885.

@EnricoMi
Copy link
Contributor

This is fixed by SPARK-41959 / #39475

@EnricoMi
Copy link
Contributor

@zzzzming95 please test that #39475 fixes the issue for you.

@zzzzming95
Copy link
Contributor Author

@zzzzming95 please test that #39475 fixes the issue for you.

@EnricoMi this pr can fix my issue ,thanks !

@EnricoMi
Copy link
Contributor

EnricoMi commented Feb 1, 2023

This can be closed as the issue has been fixed in #37525 (review)

@HyukjinKwon HyukjinKwon closed this Feb 3, 2023
@abhishekukmeesho
Copy link

Hello @EnricoMi @zzzzming95 Is this issue fixed? If so how do I make sure I include this fix in Spark 3.4? I'm using Databricks 13.2 version(which provides Spark 3.4.0) and still facing this issue where the last sortByPartition is overriden by partitionBy sort columns in the physical plan resulting in complete ignoring of sortByPartition sort columns. Any help is much appreciated here. TIA.

@cloud-fan
Copy link
Contributor

@abhishekukmeesho you can report the bug to Databricks directly. The fix was indeed merged to 3.4, but not sure if you hit the same issue or not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants