Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Jul 13, 2020

What changes were proposed in this pull request?

This PR proposes to remove redundant sorts before repartition nodes whenever the data is ordered after the repartitioning.

Why are the changes needed?

It looks like our EliminateSorts rule can be extended further to remove sorts before repartition nodes that don't affect the final output ordering. It seems safe to perform the following rewrites:

  • Sort -> Repartition -> Sort -> Scan as Sort -> Repartition -> Scan
  • Sort -> Repartition -> Project -> Sort -> Scan as Sort -> Repartition -> Project -> Scan

Does this PR introduce any user-facing change?

No.

How was this patch tested?

More test cases.

j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))
case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) =>
g.copy(child = recursiveRemoveSort(originChild))
case r: RepartitionByExpression =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two branches can be replaces with one:

case r: RepartitionOperation =>
  r.withNewChildren(r.children.map(recursiveRemoveSort))

It will mean all repartition nodes we add in the future will be also taken into account. It seems safe but I want to hear what everybody thinks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding RepartitionOperation.preservesOrder? Then we could collapse these cases while also excluding Coalesce and making this explicit for future repartition operators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am +1 on having a repartition node that preserves ordering. In fact, we have such a node internally. Coalesce is not really order preserving, though. It has custom logic in DefaultPartitionCoalescer that gets applied if the parent RDD has locality info. We would also need to report outputOrdering correctly (which is not done in case of Coalesce now)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't be able to squash all cases into one as we need to check if repartition expressions are deterministic. However, I'd consider extending repartition nodes with order preserving repartition in a follow-up if there is enough support for that.


val a_i = List[Int](1, -1, 2, -2, 2147483647, -2147483648)
val b_i = List[Option[Int]](Some(1), None, None, Some(-2), None, Some(-2147483648))
// we order values by $"a_i".desc manually as sortBy before coalesce is ignored
Copy link
Contributor Author

@aokolnychyi aokolnychyi Jul 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case I mention in the PR description. Here, DefaultPartitionCoalescer does preserve the ordering and the test relied on that even though there is no guarantee it will happen. We could apply the new optimization only if the repartition operation requires a shuffle. That way, we will keep the existing behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should apply the optimization only if the repartition requires a shuffle as you suggest. I know that there are users that depend on this behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lean towards that as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@aokolnychyi
Copy link
Contributor Author

cc @dongjoon-hyun @dbtsai @cloud-fan @viirya @gengliangwang for feedback

@dongjoon-hyun dongjoon-hyun changed the title [SQL][SPARK-32276] Remove redundant sorts before repartition nodes [SPARK-32276][SQL] Remove redundant sorts before repartition nodes Jul 13, 2020
@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @aokolnychyi .

@SparkQA
Copy link

SparkQA commented Jul 13, 2020

Test build #125781 has finished for PR 29089 at commit d71b9e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

comparePlans(optimized, correctAnswer)
}

testRepartitionOptimization(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to replace test because it makes tests difficult to run individually (at least in my IntelliJ environment).

It also tends to increase readability. Here, you're passing a function to testRepartitionOptimization that gets passed a function that modifies the logical plan. I think it would be easier to read if these were separate suites, with a suite-level repartition function:

class EliminateSortsInRepartitionSuite extends ... {
  def repartition(plan: LogicalPlan): LogicalPlan = plan.repartition(10)

  test("remove sortBy") {
    val plan = testRelation.select('a, 'b).sortBy('a.asc, 'b.desc)
    val planWithRepartition = repartition(plan)
    ...
  }
}

class EliminateSortsInRepartitionByExpressionSuite extends EliminateSortsInRepartitionSuite {
  override def repartition(plan: LogicalPlan): LogicalPlan = plan.distribute('a, 'b)(10)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this pattern is common for the codebase but I agree having separate suites makes more sense here. Updated.

@aokolnychyi
Copy link
Contributor Author

@dongjoon-hyun @viirya @rdblue this PR is ready for another review round.

@rdblue
Copy link
Contributor

rdblue commented Jul 14, 2020

LGTM. I just had one question in the tests.

def repartition(plan: LogicalPlan): LogicalPlan = plan.repartition(10)
def isOptimized: Boolean = true

test(s"sortBy") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. s" -> ".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, these are leftovers from the old version of tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

comparePlans(optimizedPlan, analyzer.execute(correctPlan))
}

test(s"sortBy with projection") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@SparkQA
Copy link

SparkQA commented Jul 14, 2020

Test build #125847 has finished for PR 29089 at commit 83791b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jul 14, 2020

Some tests in EliminateSortsSuite were failed?

@aokolnychyi
Copy link
Contributor Author

Those failures seem to belong to the old commit before this change where I removed old tests. Tests for coalesce that failed were no longer valid.

@SparkQA
Copy link

SparkQA commented Jul 14, 2020

Test build #125849 has finished for PR 29089 at commit 0ff3092.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125848 has finished for PR 29089 at commit c58ad12.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125858 has finished for PR 29089 at commit ba6a1bb.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @aokolnychyi , @rdblue , @viirya , @maropu .
Merged to master for Apache Spark 3.1.0 on December 2020.
The tests already passed. Last two comments are about commenting and changing test case names.

@dongjoon-hyun
Copy link
Member

Also, cc @gatorsmile and @cloud-fan

@aokolnychyi
Copy link
Contributor Author

Thanks, everyone!

@dongjoon-hyun
Copy link
Member

Oops. Sorry, guys. It seems that I missed something during testing. For the following case, we should not remove Sort.

BEFORE THIS PR

scala> Seq((1,10),(1,20),(2,30),(2,40)).toDF("a", "b").repartition(2).createOrReplaceTempView("t")

scala> sql("select * from (select * from t order by b desc) distribute by a").show()
+---+---+
|  a|  b|
+---+---+
|  1| 20|
|  1| 10|
|  2| 40|
|  2| 30|
+---+---+

AFTER THIS PR

scala> Seq((1,10),(1,20),(2,30),(2,40)).toDF("a", "b").repartition(2).createOrReplaceTempView("t")

scala> sql("select * from (select * from t order by b desc) distribute by a").show()
+---+---+
|  a|  b|
+---+---+
|  1| 10|
|  1| 20|
|  2| 30|
|  2| 40|
+---+---+

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 15, 2020

To generate small final Parquet/ORC files, we do the above tricks, don't we? This PR may cause a regression on the size of output storage.

@aokolnychyi
Copy link
Contributor Author

The same question for local sort too.

sql("select * from (select * from (select * from t order by b desc) distribute by a) sort by b asc")
Sort [b#6 ASC NULLS FIRST], false
+- RepartitionByExpression [a#5], 4
   +- Sort [b#6 DESC NULLS LAST], true
      +- Repartition 2, true
         +- LocalRelation [a#5, b#6]

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125886 has finished for PR 29089 at commit ba6a1bb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

In general, I think we can remove sort if it doesn't affect the final output ordering. The case caught by @dongjoon-hyun is a good example: the final output ordering changes and affect the file size.

@rdblue
Copy link
Contributor

rdblue commented Jul 15, 2020

To generate small final Parquet/ORC files, we do the above tricks, don't we?

We don't rely on this. Our recommendation to users is to add a global sort to distribute the data, which adds the local sort in the final stage that won't be removed. I can understand people relying on this behavior, though.

For now, I think it makes sense to remove a sort before a repartition if the data will be sorted later, like what I think @aokolnychyi is suggesting. That's really what we will need for tables that require a sort order -- that will be the final sort and we should be able to remove other sorts.

We may also want to choose whether this is a guarantee and document it.

@aokolnychyi
Copy link
Contributor Author

Yes, my proposal is to optimize cases when we sort the data after the repartition like in the examples I gave above. In those cases, sorts below seem to be redundant.

@aokolnychyi
Copy link
Contributor Author

@dongjoon-hyun @viirya @hvanhovell @maropu, what do you think?

@viirya
Copy link
Member

viirya commented Jul 15, 2020

It looks reasonable to me to remove a sort before a repartition if we know the data will be sorted later, e.g. @aokolnychyi's examples above.

@aokolnychyi
Copy link
Contributor Author

I've updated the PR to show what I meant. I'll check for additional edge cases in the morning but the change is ready for review.

@dongjoon-hyun
Copy link
Member

Thank you for quick updating, @aokolnychyi . Also, thank you all for your opinions.

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125931 has finished for PR 29089 at commit 21a84ad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125986 has finished for PR 29089 at commit 0545b09.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@aokolnychyi
Copy link
Contributor Author

I gave it a bit of thought and did not find a case where the updated logic would break.

* 4) if the Sort operator is within Join separated by 0...n Project/Filter/Repartition
* operators only, and the Join conditions is deterministic
* 5) if the Sort operator is within GroupBy separated by 0...n Project/Filter/Repartition
* operators only, and the aggregate function is order irrelevant
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This documentation update seems to focus on case _: Repartition => true only. Could you revise more to cover case r: RepartitionByExpression => r.partitionExpressions.forall(_.deterministic), please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

/**
* Removes Sort operation. This can happen:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we revise this line?

- Removes Sort operation. This can happen:
+ Removes Sort operation if it doesn't affect the final output ordering.
+ Note that the final output ordering changes and affect the file size (SPARK-32318).
+ This optimizer handles the following cases:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Test build #126060 has finished for PR 29089 at commit 2157a71.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you so much, @aokolnychyi and all.
The contribution of this PR is not only on improving the optimizer but also adding more test coverage.
Merged to master.

@dongjoon-hyun
Copy link
Member

cc @cloud-fan and @gatorsmile once more.

@hvanhovell
Copy link
Contributor

@dongjoon-hyun I am a bit late with my response but here goes :)

However, the following is not reasonable. There is nothing wrong in the file formats. They are just consumers and showing a better performance in a sorted input sequence because they are columnar vectorized format. I guess you assume that this is only a behavior at ORC. But, I'm sure that you can find your customers are relying on this in Parquet, too.

That is making the argument for explicitly organizing the data before the write right? You are currently just lucky that the system accidentally produces a nice layout for you; 99% of our users won't be as lucky. The only way you can he sure, is when you add these things yourself.

This is not an implicit system behavior in Apache Spark. Apache Spark has been working in the procedural ways as you see in the above. If we start to ignore the valid working pattern in the production, it becomes a huge regression.
In short, saving to a file is a totally different and valid story. To optimize the final output files, the above pattern have been used in the production among Apache Spark users for a long time. If some optimizer rule ignores the existing usage, this ends up at a large regression in terms of the cost (for example, S3) obviously.

If you generalize the procedural argument then we also should not do things like join reordering or swapping window operators. The whole point of a declarative system like Spark SQL is that you don't care about how the system executes a query, and that it has the freedom move operations around to make execution more optimal.

Have you considered that your regression is someone else's speed-up? Sorting is not free, and if we can avoid it we should. There might a large group of users that are adversely affected by spurious sorts in the queries (e.g. an order by in a view).

Finally I do want to point out that there is no mechanism that captures this regression if it pops up again.

@SparkQA
Copy link

SparkQA commented Jul 19, 2020

Test build #126134 has finished for PR 29089 at commit 2157a71.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 20, 2020

@hvanhovell . Thank you for your feedback. The following looks a little wrong to me because the above optimization was one of the recommendations for many Hortonworks customers to save their HDFS usage. I knew many production usages like that. I almost forgot that, but it rang my head suddenly during this PR. (Sadly, after I merged this.)

You are currently just lucky that the system accidentally produces a nice layout for you; 99% of our users won't be as lucky. The only way you can he sure, is when you add these things yourself.

I understand your point of views fully. However, I'm wondering if you can persuade the customers to waste their storage by generating 160x bigger files (the example from SPARK-32318). Do you think you can?

-rw-r--r--   1 dongjoon  wheel  939 Jul 14 22:12 part-00191-2cd3a50e-eded-49a4-b7cf-94e3f090b8c1-c000.snappy.orc
-rw-r--r--   1 dongjoon  wheel  150741 Jul 14 22:08 part-00191-ba5049f9-b835-49b7-9fdb-bdd11b9891cb-c000.snappy.orc

.
For the following, SPARK-32318 added a test coverage at master/3.0/2.4. Are you suggesting that's not enough? If then, we can add more.

Finally I do want to point out that there is no mechanism that captures this regression if it pops up again.

@dongjoon-hyun
Copy link
Member

For the following, I'd like to ask your help if you are interested. I believe we want to build the better Apache Spark in the community together.

If you generalize the procedural argument then we also should not do things like join reordering or swapping window operators.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants