-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-40885] Sort may not take effect when it is the last 'Transform' operator
#38356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Sort may not take effect when it is the last 'Transform' operator
|
Can one of the admins verify this patch? |
|
@cloud-fan @HyukjinKwon @dongjoon-hyun @MaxGekk Can you please review this PR? |
|
Should the outer |
+1, we should figure out why this was happening. cc @allisonwang-db @ulysses-you |
|
The reason is spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala Lines 72 to 87 in 90d3154
This issue should only affect partition column who is string type. We should use the original plan to get the output ordering directly. |
|
The given example has The reason is that canonicalized spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala Lines 262 to 263 in 90d3154
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala Lines 275 to 280 in 90d3154
The spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala Lines 47 to 52 in 90d3154
|
Projection does not modify spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala Lines 208 to 210 in 90d3154
So |
|
It's much less complicated.. just after pull out |
|
But spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala Line 85 in ff66add
However, spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala Lines 81 to 84 in ff66add
I think that should simply read: With that, |
|
@allisonwang-db can you elaborate on mapping spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala Lines 81 to 84 in ff66add
|
In this case ,this code will take effect |
| // Do not insert logical sort when concurrent output writers are enabled. | ||
| Seq.empty | ||
| } else { | ||
| // We should first sort by dynamic partition columns, then bucket id, and finally sorting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the outer Sort("p") better not be added in the first place, as the inner Sort("p", "sort_col") provides this order already? See #38358.
There is a constraint on the order of the sorting fields. If outer Sort("p") added in the first place., will this constraint be broken, will it affect the efficiency or other?
|
I had another deeper look into this issue. The spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala Lines 70 to 94 in 7f3b598
The modified spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala Lines 172 to 194 in 7f3b598
The inner Either Or, --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -140,10 +140,16 @@ object FileFormatWriter extends Logging {
statsTrackers = statsTrackers
)
- // We should first sort by dynamic partition columns, then bucket id, and finally sorting
- // columns.
- val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
- writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
+ // Only require order when bucket spec is given
+ val requiredOrdering = if (writerBucketSpec.isEmpty) {
+ Seq.empty
+ } else {
+ // We should first sort by dynamic partition columns, then bucket id, and finally sorting
+ // columns.
+ partitionColumns.drop(numStaticPartitionCols) ++ writerBucketSpec.map(_.bucketIdExpression) ++
+ sortColumns
+ }
+
// the sort order doesn't matter
// Use the output ordering from the original plan before adding the empty2null projection.
val actualOrdering = plan.outputOrdering.map(_.child)
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -172,8 +172,10 @@ object V1WritesUtils {
// Static partition must to be ahead of dynamic partition
val dynamicPartitionColumns = partitionColumns.drop(numStaticPartitionCols)
- if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) {
- // Do not insert logical sort when concurrent output writers are enabled.
+ if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty
+ || writerBucketSpec.isEmpty) {
+ // Do not insert logical sort when concurrent output writers are enabled
+ // or no bucket spec exists.
Seq.empty
} else {
// We should first sort by dynamic partition columns, then bucket id, and finally sortingThen, the query is: |
| .repartition(1) | ||
| .sortWithinPartitions("p", "sort_col") | ||
| .write | ||
| .partitionBy("p") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very sure what the problem is. partitionBy("p") requires sorting the data by p per partition. It should already be satisfied and we don't need to add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionBy("p")requires sorting the data bypper partition.
Why does write.partitionBy("p") require sorting the data by p per partition? I understand why df.groupBy($"p") requires in-partition order by p (GroupedIterator). But write.partitionBy("p")?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very sure what the problem is.
See #38356 (comment):
The V1Writes rule introduced in Spark 3.4 adds the empty2null to all nullable string partition columns. The modified V1WriteCommand then has a modified write.requiredOrdering but the old write.query.outputOrdering, which then do not match any more. In FileFormatWriter, the outer sort will be added because of that mismatch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems Project shouldn't extend OrderPreservingUnaryNode, as it needs to rewrite the child output ordering based on the project list. cc @ulysses-you @wangyum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah we can rewrite the ordering to preverse the attribute before alias like AliasAwareOutputExpression. But empty2null is special, it is not just an alias. Since we pull out the empty2null, we should tell project the ordering of empty2null(p) is same with p. One idea may be we should not pull out empty2null and leave it at original place..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be aware that empty2null(p) does not preserve any ordering of p, e.g. it does not preserve NULL LAST or ordering by p and x.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only add empty2null for string type partition columns, maybe we shouldn't optimize this case anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding empty2null throws null values and empty values into same partition, and the user has no way to make Spark treat them as distinct values. But changing this smells like a breaking change, unless some config allows to bring back the current behaviour.
|
The redundant outer See SPARK-41914. This affects not only string type partition columns. Avoiding that |
|
This is fixed by SPARK-41959 / #39475 |
|
@zzzzming95 please test that #39475 fixes the issue for you. |
@EnricoMi this pr can fix my issue ,thanks ! |
|
This can be closed as the issue has been fixed in #37525 (review) |
|
Hello @EnricoMi @zzzzming95 Is this issue fixed? If so how do I make sure I include this fix in Spark 3.4? I'm using Databricks 13.2 version(which provides Spark 3.4.0) and still facing this issue where the last |
|
@abhishekukmeesho you can report the bug to Databricks directly. The fix was indeed merged to 3.4, but not sure if you hit the same issue or not. |
What changes were proposed in this pull request?
The following codes:
When the 'Sort'+'partitionBy' operator is used, because 'partitionBy' will generate a new
Sort("partition_col"), the logical plan is as follows:However, due to ` org.apache.spark.sql.catalyst.optimizer.EliminateSorts' will prune the operators irrelevant to the output, which will cause 'Sort ("p", "sort_col")' to be pruned, but in fact it should take effect.
This causes Sort to fail in various scenarios, as shown in the following SQL:
The scenario covered by this issue is that 'Sort' is the last 'Transform' operator of the job. When there are other 'Transform' operators after the 'Sort', the Sort should not be effective.
Why are the changes needed?
fix bug
Does this PR introduce any user-facing change?
No
How was this patch tested?
add new UT.