-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion #24068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @dongjoon-hyun @cloud-fan - it'd be great if you get a chance to review this! I know it's a bit of a large change, but I tried to make it as isolated as possible and explain it well - let me know if I can be of additional help. (Also, a large part of the diff delta comes from the regenerated benchmark results AFAICT.) |
|
ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the builder be copied? Then we can simulate the rollback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t see support for copying a builder in the hive source.
|
Just remembered that I should probably apply these changes to the |
|
Test build #103425 has finished for PR 24068 at commit
|
|
@cloud-fan @dongjoon-hyun ping - let me know if someone else would be better suited to review this, or if there are things I can do to make the review easier! |
|
I checked the implementation of orc filter builder, it creates |
|
The way I understand this is:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like using reflection to instantiate ExpressionTree is easier than this approach.
4eae044 to
eb3c158
Compare
|
@cloud-fan I pushed a new version that has the same complexity and behaviour but doesn't need the While I agree that building an
So, even if the changes introduced because of reasons 1. and 2. above are both correct, they could still end up producing ORC predicates that are less efficient than the current ones because they haven't been transformed / optimized in the proper way by the builder, and thus introduce performance regressions. I'm okay taking a stab at the
After getting rid of the Let me know what you think about the newest version and my thoughts above, and thanks for taking a look! |
|
I agree with you that building The problem we have is, the orc filter builder has no way back, so we have to do an extra check before calling the builder methods. The idea I have is, create a tree-like orc filter representations by our own (similar to |
|
@cloud-fan interesting! A couple of questions to make sure we're on the same page:
|
|
|
Test build #103826 has finished for PR 24068 at commit
|
|
To clarify with regards to 2. - this is exactly the behavior that this PR introduces. There’s effectively a single pass throughout the tree to check for convertibility, and then a second pass to build the ORC filter. While the build phase calls I’ll think about the details of implementing the trimming and take a stab at an implementation if it seems like it can be made cleaner. |
eb3c158 to
25b8ce1
Compare
|
@cloud-fan after some noticeable delay, I managed to find the time to implement the tree trimming idea yesterday. I think the result looks pretty nice - thanks for the suggestion! I ran the relevant parts of the benchmark and the performance appears to be comparable - maybe ~10% slower than the previous version. I imagine this is due to external factors, but since it's such a small difference compared to going from exponential to linear, I don't think it's worth looking into it. I haven't updated the benchmark results in the commit with the most recent code since doing so took a couple hours last time I tried, and the differences are not huge. Let me know if you'd like me to update this. Looking forward to hearing your thoughts! |
|
Test build #105249 has finished for PR 24068 at commit
|
| Range(0, count).map(r => scala.util.Random.nextInt(numRows * distribution / 100)) | ||
| val whereExpr = s"value in(${filter.mkString(",")})" | ||
| val title = s"InSet -> InFilters (values count: $count, distribution: $distribution)" | ||
| val title = s"InSet -> InFilters (values count: $count, distribution: $distribution)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change?
| } | ||
| } | ||
|
|
||
| runBenchmark(s"Pushdown benchmark with unbalanced Column") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between Column and Expression in this benchmark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. The initial idea was that the Expression test only runs the Expression -> SearchArgument conversion without any actual Spark jobs. This is useful because once we get away from the exponential complexity and get down to linear vs quadratic, there are a lot of other Spark components that become slower than the predicate conversion. Thus, having an isolated benchmark can be helpful in getting a more detailed read on the performance of this individual component. I'll add a comment explaining that in the code.
Regarding the Column vs Expression part, for some reason when I was researching it, I found it easier to go from an Expression to a SearchArgument. I now see that it's trivial to get the underlying Expression from a Column so I'll just do that for consistency (and it's also a bit more readable as well).
| * | ||
| * @param filter The underlying filter representation. | ||
| */ | ||
| private case class TrimmedFilter(filter: Filter) extends AnyVal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I feel this one is not very useful. We just wrap the trimmer filter with it and then throw it away when building ORC filter. I think we can add assert when building ORC filter, to make sure the passed filter is already trimed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you implement such an assert though? I can't think of an easy way to do so without running the convertibility checks again and going down the same rabbit hole we're trying to get out of.
The other benefit is that this makes it kind of obvious at compile time that you can only invoke the createBuilder function with an already trimmed down filter. Of course, you can still get around it by manually wrapping the filter, but at least you got the compiler screaming at you instead of silently accepting it and then failing at runtime.
Another option I considered is calling the function createBuilderForTrimmedFilter, but this felt easier to ignore and still get wrong.
| trimNonConvertibleSubtreesImpl(dataTypeMap, child, canPartialPushDownConjuncts = false) | ||
| .map(Not) | ||
|
|
||
| // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to update comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right - I actually added it back, but apparently put it in the wrong place. Moving.
| updateBuilder(child) | ||
| builder.end() | ||
|
|
||
| case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove the if isSearchableType(dataTypeMap(attribute)) now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, removed. I figured I'll keep them for safety in case something goes out of date between the two functions, but a lot of other things will likely break anyway if things get out of sync.
Btw if you have thoughts on how to make sure the functions stay in sync beyond the comments I've put now, I'm very open to hearing them.
| saveAsTable(df, dir) | ||
| Seq(1, 250, 500).foreach { numFilter => | ||
| val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ") | ||
| val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" or ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change the existing benchmark?
|
|
||
| case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => | ||
| case EqualTo(attribute, value) => | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing we can do is, add an assert(isSearchableType(dataTypeMap(attribute))) at the beginning.
case _ =>
assert(isSearchableType(dataTypeMap(attribute)))
subexpression match {
case EqualTo(attribute, value) => ...
case EqualNullSafe(attribute, value) => ...
...
case _ => throw new IllegalStateException(...)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also guarantees that the input filter is already trimmed, so that we can get rid of the TrimmedFilter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, that's a good idea in principle, but I don't think the outer case-match with the assert can be implemented properly. That is, when I do:
case _ =>
assert(isSearchableType(dataTypeMap(attribute)))
subexpression match {
case EqualTo(attribute, value) =>
...
, the compiler doesn't know what attribute is because the match is simply against _. AFAICT there's no common class that I can match against here to say "match any Filter that has an attribute field" since all the relevant filters are just implemented as separate case classes that just extend Filter directly. The idea of using a structural type to signify "has the field attribute" also crossed my mind, but it seems like you can't match on a structural type.
So AFAICT the options for implementing this are:
- Add the assert to every single case. This seems a bit too verbose.
- Bring back the
if isSearchableType(...)on every case, and then throw an exception in the finalcase _instead of returning true.
What are your thoughts? Another question is whether we actually want this to fail if things go wrong. I'm a bit concerned about failing since it could prevent people from using this if something in the implementation gets inconsistent.
The other thing is that all of this will only happen at runtime. The TrimmedFilter is a way to alert the developer at compile time that they shouldn't be calling this method with anything that's not trimmed, whereas these options could fail unexpectedly at runtime.
Let me know what you think about these and we can figure out what to do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the assert to every single case. This seems a bit too verbose.
I think it's as verbose as adding a if isSearchableType(...) for every case and I'm ok with it.
The TrimmedFilter is a way to alert the developer at compile time that they shouldn't be calling this method with anything that's not trimmed
Personally I think it's just a way to force the developers to look at the doc and see what they should do before calling this method. I don't see a big value of it, as I don't think there will be more callers of this method. It's acceptable to fail some tests and alert the developers to change their code.
|
Test build #105276 has finished for PR 24068 at commit
|
|
Test build #105404 has finished for PR 24068 at commit
|
|
@cloud-fan I took a stab at a slightly different approach to structuring the code in https://github.com/IvanVergiliev/spark/pull/2/files . The idea is to implement filtering and building in the same match expression, with an enum that tells us whether to perform a filter or a build operation. This has the following benefits:
The only annoying part is the need for the Overall I'm pretty happy with that version of the code. Let me know if that looks reasonable to you, and if so - I can merge that branch into this one and we can then wrap up the review. |
|
that sounds like a good idea! Can you merge it and fix the conflict? thanks! |
|
Test build #105816 has finished for PR 24068 at commit
|
|
@cloud-fan I applied the changes to the @gatorsmile I see what you're saying. I got rid of the benchmark that only does the filter conversion step since that indeed seemed a bit out of place with the rest. The remaining new benchmark is |
|
@gatorsmile also added reproduction steps (modelled after #22313 ) to the commit description. 30 filters go from 7 minutes to 300ms 🙂 |
|
Test build #106240 has finished for PR 24068 at commit
|
|
@IvanVergiliev Thank you for your contribution! The PR description looks much better! @IvanVergiliev and @gengliangwang I have not read the implementation details yet. Could you show us the perf difference between two approaches? |
|
@gatorsmile I take a quick check, the perf is quite close. The key difference between the two approaches is how the code is implemented: abstraction and readability. |
|
@gatorsmile yep, the performance is pretty much the same because the two implementations are exactly the same at a high level. The two main high-level steps are:
The main differences are in code structure. At a high level, the two main differences are:
@cloud-fan agreed that this does sound like an improvement in #24068 (comment) , so I merged the change into the main PR. I still think this has code design benefits over the separate functions, but I'm open to reverting to the previous version if there's consensus that it's better.
Instead, if we
This leads to all kinds of questions:
tl;dr I've gone through similar passes as the one proposed by @gengliangwang, but I've had specific design reasons to move away from them. I'm happy to go back to a version where the filtering and building are written in separate methods if people like this better. I'm more sceptical about the reuse of This got pretty long, but it also summarizes ~50 commits of changes and the discussion surrounding them. Let me know if anything is unclear or doesn't make sense. |
| // complexity in the height of the filter tree. | ||
| // The difference between these two benchmarks is that this one benchmarks pushdown with a | ||
| // large string filter (`a AND b AND c ...`), whereas the next one benchmarks pushdown with | ||
| // a large Column-based filter (`col(a) || (col(b) || (col(c)...))`). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I read this comment correctly, it seems that we should just remove the next benchmark, as string filter and Column-based filter have no difference regarding performance. Is there any other critical difference that I missed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan the two go through different code paths. The string-based one was added in #22313 , but it doesn't expose the slowness when passing a Column filter directly. That is, the string-based one was fast before this PR. The one this PR fixes is specifically when passing in a Column directly to something like df.filter(Column).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still can't get it. Both the string filter and column-based filter will become an Expression in the Filter operator. The differences I see are
- the new benchmark builds a larger filter
- the new benchmark use
Orinstead ofAnd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for late review, all.
@IvanVergiliev . Could you make a separate benchmark for both ORC v1 OrcFilters.createFilter and ORC v2 OrcFilters.createFilter. We don't need a disk access benchmark for this PR.
| Parquet Vectorized 11098 11146 39 1.4 705.6 1.0X | ||
| Parquet Vectorized (Pushdown) 11187 11254 45 1.4 711.2 1.0X | ||
| Native ORC Vectorized 9847 9895 43 1.6 626.0 1.1X | ||
| Native ORC Vectorized (Pushdown) 10227 12071 623 1.5 650.2 1.1X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stddev(ms) 623 seems high here although it doesn't matter to the ratio.
| Intel(R) Core(TM) i7-8850H CPU @ 2.60GHz | ||
| Select 1 distinct string row ('100' <= value <= '100'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| Parquet Vectorized 6708 7325 686 2.3 426.5 1.0X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The laptop was stable here? Any other jobs?
|
@IvanVergiliev I'd suggest we revert all the benchmark changes, and we write a simple microbenmark to test Currently we do not run benchmarks automatically for Spark, so perf regressions rely on user reports. |
|
+1 for @cloud-fan 's suggestion. Also, I saw that @gatorsmile also gave the same advice before. |
|
Thanks for the feedback! Just clarifying before I make any changes.
|
|
@IvanVergiliev I'd suggest something more low-level, e.g. |
|
@cloud-fan I reverted the benchmark changes, and updated the benchmark in the PR description so it only runs the filter conversion. It's even nicer to look at now, with the old version very visibly increasing 2x on each iteration, and the new one taking roughly 0 ms for each size. I don't remember if we came to a conclusion on whether I should bring this PR back into a state where it has separate There's one minor caveat with benchmark code. The // Paste this into spark-shell using the `-raw` flag so it gets interpreted as
// a Scala file and so that we can trick spark-shell into thinking our class is
// actually in the `sql` package and can thus access `OrcFilters`.
:paste -raw
package org.apache.spark.sql
import org.apache.spark.sql.execution.datasources.orc._
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources._
object OrcFiltersTest {
def foo(): Unit = {
val schema = StructType.fromDDL("col INT")
(20 to 30).foreach { width =>
val whereFilter = (1 to width).map(i => EqualTo("col", i)).reduceLeft(Or)
val start = System.currentTimeMillis()
OrcFilters.createFilter(schema, Seq(whereFilter))
println(s"With $width filters, conversion takes ${System.currentTimeMillis() - start} ms")
}
}
} |
|
Test build #106618 has finished for PR 24068 at commit
|
|
@IvanVergiliev thanks for your great work! merging to master |
|
@gengliangwang feel free to open a PR to simplify the code as you proposed. |
What changes were proposed in this pull request?
OrcFilters.createBuilderhas exponential complexity in the height of the filter tree due to the way the check-and-build pattern is implemented. We've hit this in production by passing aColumnfilter to Spark directly, with a job taking multiple hours for a simple set of ~30 filters. This PR changes the checking logic so that the conversion has linear complexity in the size of the tree instead of exponential in its height.Right now, due to the way ORC
SearchArgumentworks, the code is forced to do two separate phases when converting a given Spark filter to an ORC filter:However, there's one detail which is the culprit in the exponential complexity: phases 1. and 2. are both done using the exact same method. The resulting exponential complexity is easiest to see in the
NOTcase - consider the following code:Now, when we run
createBuilderonf5, we get the following behaviour:createBuilder(f4)to check if the childf4is convertiblecreateBuilder(f4)to actually convert itThis seems fine when looking at a single level, but what actually ends up happening is:
createBuilder(f3)will then recursively be called 4 times - 2 times in step 1., and two times in step 2.createBuilder(f2)will be called 8 times - 4 times in each top-level step, 2 times in each sub-step.createBuilder(f1)will be called 16 times.As a result, having a tree of height > 30 leads to billions of calls to
createBuilder, heap allocations, and so on and can take multiple hours.The way this PR solves this problem is by separating the
checkandconvertfunctionalities into separate functions. This way, the call tocreateBuilderonf5above would look like this:isConvertible(f4)to check if the childf4is convertible - amortized constant complexitycreateBuilder(f4)to actually convert it - linear complexity in the size of the subtree.This way, we get an overall complexity that's linear in the size of the filter tree, allowing us to convert tree with 10s of thousands of nodes in milliseconds.
The reason this split (
checkandbuild) is possible is that the checking never actually depends on the actual building of the filter. Thecheckpart ofcreateBuilderdepends mainly on:isSearchableTypefor leaf nodes, andcheck-ing the child filters for composite nodes like NOT, AND and OR.Situations like the
SearchArgumentBuilderthrowing an exception while building the resulting ORC filter are not handled right now - they just get thrown out of the class, and this change preserves this behaviour.This PR extracts this part of the code to a separate class which allows the conversion to make very efficient checks to confirm that a given child is convertible before actually converting it.
Results:
Before:
Now:
Steps to reproduce
Before this PR
After this PR
How was this patch tested?
There are no changes in behaviour, and the existing tests pass. Added new benchmarks that expose the problematic behaviour and they finish quickly with the changes applied.