-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-32276][SQL] Remove redundant sorts before repartition nodes #29089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight)) | ||
| case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) => | ||
| g.copy(child = recursiveRemoveSort(originChild)) | ||
| case r: RepartitionByExpression => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two branches can be replaces with one:
case r: RepartitionOperation =>
r.withNewChildren(r.children.map(recursiveRemoveSort))
It will mean all repartition nodes we add in the future will be also taken into account. It seems safe but I want to hear what everybody thinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding RepartitionOperation.preservesOrder? Then we could collapse these cases while also excluding Coalesce and making this explicit for future repartition operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am +1 on having a repartition node that preserves ordering. In fact, we have such a node internally. Coalesce is not really order preserving, though. It has custom logic in DefaultPartitionCoalescer that gets applied if the parent RDD has locality info. We would also need to report outputOrdering correctly (which is not done in case of Coalesce now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't be able to squash all cases into one as we need to check if repartition expressions are deterministic. However, I'd consider extending repartition nodes with order preserving repartition in a follow-up if there is enough support for that.
|
|
||
| val a_i = List[Int](1, -1, 2, -2, 2147483647, -2147483648) | ||
| val b_i = List[Option[Int]](Some(1), None, None, Some(-2), None, Some(-2147483648)) | ||
| // we order values by $"a_i".desc manually as sortBy before coalesce is ignored |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the case I mention in the PR description. Here, DefaultPartitionCoalescer does preserve the ordering and the test relied on that even though there is no guarantee it will happen. We could apply the new optimization only if the repartition operation requires a shuffle. That way, we will keep the existing behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should apply the optimization only if the repartition requires a shuffle as you suggest. I know that there are users that depend on this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I lean towards that as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
|
cc @dongjoon-hyun @dbtsai @cloud-fan @viirya @gengliangwang for feedback |
|
Thank you for pinging me, @aokolnychyi . |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
|
Test build #125781 has finished for PR 29089 at commit
|
| comparePlans(optimized, correctAnswer) | ||
| } | ||
|
|
||
| testRepartitionOptimization( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer not to replace test because it makes tests difficult to run individually (at least in my IntelliJ environment).
It also tends to increase readability. Here, you're passing a function to testRepartitionOptimization that gets passed a function that modifies the logical plan. I think it would be easier to read if these were separate suites, with a suite-level repartition function:
class EliminateSortsInRepartitionSuite extends ... {
def repartition(plan: LogicalPlan): LogicalPlan = plan.repartition(10)
test("remove sortBy") {
val plan = testRelation.select('a, 'b).sortBy('a.asc, 'b.desc)
val planWithRepartition = repartition(plan)
...
}
}
class EliminateSortsInRepartitionByExpressionSuite extends EliminateSortsInRepartitionSuite {
override def repartition(plan: LogicalPlan): LogicalPlan = plan.distribute('a, 'b)(10)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this pattern is common for the codebase but I agree having separate suites makes more sense here. Updated.
...est/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsBeforeRepartitionSuite.scala
Show resolved
Hide resolved
|
@dongjoon-hyun @viirya @rdblue this PR is ready for another review round. |
...est/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsBeforeRepartitionSuite.scala
Outdated
Show resolved
Hide resolved
|
LGTM. I just had one question in the tests. |
| def repartition(plan: LogicalPlan): LogicalPlan = plan.repartition(10) | ||
| def isOptimized: Boolean = true | ||
|
|
||
| test(s"sortBy") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. s" -> ".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, these are leftovers from the old version of tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| comparePlans(optimizedPlan, analyzer.execute(correctPlan)) | ||
| } | ||
|
|
||
| test(s"sortBy with projection") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
Test build #125847 has finished for PR 29089 at commit
|
|
Some tests in |
|
Those failures seem to belong to the old commit before this change where I removed old tests. Tests for coalesce that failed were no longer valid. |
|
Test build #125849 has finished for PR 29089 at commit
|
|
Test build #125848 has finished for PR 29089 at commit
|
|
Test build #125858 has finished for PR 29089 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @aokolnychyi , @rdblue , @viirya , @maropu .
Merged to master for Apache Spark 3.1.0 on December 2020.
The tests already passed. Last two comments are about commenting and changing test case names.
|
Also, cc @gatorsmile and @cloud-fan |
|
Thanks, everyone! |
|
Oops. Sorry, guys. It seems that I missed something during testing. For the following case, we should not remove BEFORE THIS PR scala> Seq((1,10),(1,20),(2,30),(2,40)).toDF("a", "b").repartition(2).createOrReplaceTempView("t")
scala> sql("select * from (select * from t order by b desc) distribute by a").show()
+---+---+
| a| b|
+---+---+
| 1| 20|
| 1| 10|
| 2| 40|
| 2| 30|
+---+---+AFTER THIS PR scala> Seq((1,10),(1,20),(2,30),(2,40)).toDF("a", "b").repartition(2).createOrReplaceTempView("t")
scala> sql("select * from (select * from t order by b desc) distribute by a").show()
+---+---+
| a| b|
+---+---+
| 1| 10|
| 1| 20|
| 2| 30|
| 2| 40|
+---+---+ |
|
To generate small final Parquet/ORC files, we do the above tricks, don't we? This PR may cause a regression on the size of output storage. |
|
The same question for local sort too. |
|
Test build #125886 has finished for PR 29089 at commit
|
|
In general, I think we can remove sort if it doesn't affect the final output ordering. The case caught by @dongjoon-hyun is a good example: the final output ordering changes and affect the file size. |
We don't rely on this. Our recommendation to users is to add a global sort to distribute the data, which adds the local sort in the final stage that won't be removed. I can understand people relying on this behavior, though. For now, I think it makes sense to remove a sort before a repartition if the data will be sorted later, like what I think @aokolnychyi is suggesting. That's really what we will need for tables that require a sort order -- that will be the final sort and we should be able to remove other sorts. We may also want to choose whether this is a guarantee and document it. |
|
Yes, my proposal is to optimize cases when we sort the data after the repartition like in the examples I gave above. In those cases, sorts below seem to be redundant. |
|
@dongjoon-hyun @viirya @hvanhovell @maropu, what do you think? |
|
It looks reasonable to me to remove a sort before a repartition if we know the data will be sorted later, e.g. @aokolnychyi's examples above. |
|
I've updated the PR to show what I meant. I'll check for additional edge cases in the morning but the change is ready for review. |
|
Thank you for quick updating, @aokolnychyi . Also, thank you all for your opinions. |
|
Test build #125931 has finished for PR 29089 at commit
|
|
Test build #125986 has finished for PR 29089 at commit
|
|
I gave it a bit of thought and did not find a case where the updated logic would break. |
| * 4) if the Sort operator is within Join separated by 0...n Project/Filter/Repartition | ||
| * operators only, and the Join conditions is deterministic | ||
| * 5) if the Sort operator is within GroupBy separated by 0...n Project/Filter/Repartition | ||
| * operators only, and the aggregate function is order irrelevant |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This documentation update seems to focus on case _: Repartition => true only. Could you revise more to cover case r: RepartitionByExpression => r.partitionExpressions.forall(_.deterministic), please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| } | ||
|
|
||
| /** | ||
| * Removes Sort operation. This can happen: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we revise this line?
- Removes Sort operation. This can happen:
+ Removes Sort operation if it doesn't affect the final output ordering.
+ Note that the final output ordering changes and affect the file size (SPARK-32318).
+ This optimizer handles the following cases:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
Test build #126060 has finished for PR 29089 at commit
|
|
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you so much, @aokolnychyi and all.
The contribution of this PR is not only on improving the optimizer but also adding more test coverage.
Merged to master.
|
cc @cloud-fan and @gatorsmile once more. |
|
@dongjoon-hyun I am a bit late with my response but here goes :)
That is making the argument for explicitly organizing the data before the write right? You are currently just lucky that the system accidentally produces a nice layout for you; 99% of our users won't be as lucky. The only way you can he sure, is when you add these things yourself.
If you generalize the procedural argument then we also should not do things like join reordering or swapping window operators. The whole point of a declarative system like Spark SQL is that you don't care about how the system executes a query, and that it has the freedom move operations around to make execution more optimal. Have you considered that your regression is someone else's speed-up? Sorting is not free, and if we can avoid it we should. There might a large group of users that are adversely affected by spurious sorts in the queries (e.g. an order by in a view). Finally I do want to point out that there is no mechanism that captures this regression if it pops up again. |
|
Test build #126134 has finished for PR 29089 at commit
|
|
@hvanhovell . Thank you for your feedback. The following looks a little wrong to me because the above optimization was one of the recommendations for many Hortonworks customers to save their HDFS usage. I knew many production usages like that. I almost forgot that, but it rang my head suddenly during this PR. (Sadly, after I merged this.)
I understand your point of views fully. However, I'm wondering if you can persuade the customers to waste their storage by generating 160x bigger files (the example from SPARK-32318). Do you think you can? .
|
|
For the following, I'd like to ask your help if you are interested. I believe we want to build the better Apache Spark in the community together.
|
What changes were proposed in this pull request?
This PR proposes to remove redundant sorts before repartition nodes whenever the data is ordered after the repartitioning.
Why are the changes needed?
It looks like our
EliminateSortsrule can be extended further to remove sorts before repartition nodes that don't affect the final output ordering. It seems safe to perform the following rewrites:Sort -> Repartition -> Sort -> ScanasSort -> Repartition -> ScanSort -> Repartition -> Project -> Sort -> ScanasSort -> Repartition -> Project -> ScanDoes this PR introduce any user-facing change?
No.
How was this patch tested?
More test cases.