-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-28169][SQL] Fix Partition table partition PushDown failed by "OR" expression #24973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @AngersZhuuuu Is this issue has been fixed by SPARK-27699?
| } | ||
|
|
||
| val extractedPruningPredicates = extractPushDownPredicate(predicates, partitionKeyIds) | ||
| .filter(_ != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu, just for clarification, this code path does support OR expression but you want to do a partial pushdown right? Considering it needs a lot of codes as @wangyum pointed out, I think we should better try to promote to use (or convert) Spark's Parquet or ORC. It looks like an overkill to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon What I do is to extract condition's about partition keys.For the old code :
val (pruningPredicates, otherPredicates) = predicates.partition { predicate => !predicate.references.isEmpty && predicate.references.subsetOf(partitionKeyIds) }
If in expression, there contains other key, it won't be a push to HiveTableScanExec, So what I to it to fix this situation, just extract all condition about partition keys, then push it to HiveTableScanExec, HiveTableScanExec will handle complex combine expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon
Spark's Parquet or ORC is perfect, and it can push down filter condition, but it can't resolve the problem that when we read a Hive table, our first behavior is scan, What this pr want to do is to reduce the time of resolve file info and partition metadata, and the file we scan. Then the file num or partition num is big, it takes too long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we convert Hive table reading operations into Spark's ones, via, for instance, spark.sql.hive.convertMetastoreParquet conf. If the diff is small, I might be fine but this does look like an overkill to me. I haven't taken a close look but it virtually looks like we need a fix like #24598
I won't object if some other committers are fine with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I know that it's better to convert Hive table reading operations into Spark's , but it can't fix all situation. In our production env, we just change hive data's default storage type to orc. For partition table, if different partition's serde is not the same, Convert will failed, since during converting , it will check all partition's file by table level serde.
|
shall we just add an optimizer rule to do CNF conversion? |
@cloud-fan I think it's not about optimizer, since this is special for partition push down, and origin rule in HiveStrategies is too simple. |
|
CNF conversion can solve this problem, isn't it? |
Understand you, You mean In Optimizer level change condition it to CNF , only this can't solve this pr's problem. And it may seriously affect other rules. In HiveStrategies to Change it to CNF can work well . |
PS: First, I wish to convert predicate to CNF, but since I found that HiveTableScanExec can resolve complex partitionkeys's expression combine of And & Or , so I just need to keep origin combine relation of partitionkey expression . |
|
@srowen Could you review this pr. And give some advise |
|
This part really isn't my area |
|
This place is definitely not the only place that extracts partition predicates (e.g. |
Fix situation of A is partition key SELECT * FROM A WHERE A=1 OR B = 2 Int this case, we should ignore this condition
|
@cloud-fan |
| * FROM DEFAULT.PARTITION_TABLE | ||
| * WHERE DT = 20190601 OR (DT = 20190602 AND C = "TEST") | ||
| * | ||
| * Where condition "DT = 20190601 OR (DT = 20190602 AND C = "TEST")" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's be more clear about the approach here. We try to weaken a predicate so that it only refers to partition columns, by removing some branches in AND. Let's also mention the corner case when there is no partition column attribute in the predicate(we should return Nil in this case?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan
In this place it will return (DT = 20190601 OR DT = 20190602).
But this whole condition will still return to filter.
What I want to do is purely to avoid read unnecessary partitions. When this case we only read partition(dt=20190601 + dt=20190602), If we don't push down this, we will read all data.
In condition " (DT = 20190602 AND C = "TEST") ", DT = 20190602 is C = "TEST"'s precondition.
If the whole condition is DT = 20190601 OR (DT = 20190602 OR C = "TEST"). We should return null, since DT = 20190602 is not C = "TEST"'s constraint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan
In my code. coming predicate Set[Expression] has a potential AND logical.
For one Expression, it will be restricted by other same level Expression.
and :
- if it is a combine of AND each side can be a constraint to others, so it one side is tenable, it can return a tenable condition.
- if it is a combine of OR, if one side is out of control(such as have no condition about partition cols) this whole OR Expression should return NONE. Only when both side of OR 's child is reasonable, it can return a tenable combine of OR.
- if it 's a multilayer nested Expression combined by BinaryOperator. It will visit the lowest level, if it found one level's OR Expression is untenable, it will break this Expression totally and return null.
| */ | ||
| object ExtractPartitionPredicates extends Logging { | ||
|
|
||
| private type ReturnType = Seq[Expression] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this type alias as Seq[Expression] is short.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, Thanks.
Checked previous activity for the CNF normalization in below issue: Seems we can't make it as an Optimizer Rule, it should only work for some special point such as partition predicate and filter push down。 |
|
Can one of the admins verify this patch? |
|
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
What changes were proposed in this pull request?
Spark can't push down filter condition of Or:
Such as if I have a table default.test, his partition col is "dt",
If we use query :
select * from default.testwhere dt=20190625 or (dt = 20190626 and id in (1,2,3) )In this case, Spark will resolve Or condition as one expression, and since this expr has reference of "id", then it can't been push down.
In my PR , for SQL like
select * from default.testwhere dt = 20190626 or (dt = 20190626 and xxx="")For this Or condition
or (dt = 20190626 or (dt = 20190626 and xxx="" )All expression about partition keys will be extracted as an expression only contains partition expression
and retain the original logical relationship of And & Or like below :
dt = 20190626 or dt = 20190626Then this condition will Passed to HiveTableScanExec. Such predicate expressions can be pushed down as expected .
For this PR, it will extract deep relation of OR expression and push down this condition
How was this patch tested?
Exist unit test