Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Jun 26, 2019

What changes were proposed in this pull request?

Spark can't push down filter condition of Or:

Such as if I have a table default.test, his partition col is "dt",

If we use query :

select * from default.test
where dt=20190625 or (dt = 20190626 and id in (1,2,3) )

In this case, Spark will resolve Or condition as one expression, and since this expr has reference of "id", then it can't been push down.

In my PR , for SQL like
select * from default.test
where dt = 20190626 or (dt = 20190626 and xxx="")

For this Or condition
or (dt = 20190626 or (dt = 20190626 and xxx="" )

All expression about partition keys will be extracted as an expression only contains partition expression
and retain the original logical relationship of And & Or like below :
dt = 20190626 or dt = 20190626

Then this condition will Passed to HiveTableScanExec. Such predicate expressions can be pushed down as expected .

For this PR, it will extract deep relation of OR expression and push down this condition

How was this patch tested?

Exist unit test

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-28169] Fix PushDown failed by "OR" expression [WIP][SPARK-28169] Fix PushDown failed by "OR" expression Jun 26, 2019
Copy link
Member

@wangyum wangyum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @AngersZhuuuu Is this issue has been fixed by SPARK-27699?

@AngersZhuuuu
Copy link
Contributor Author

AngersZhuuuu commented Jun 27, 2019

@wangyum I looked about #24598 , we are not same , what I want to do is to fix the problem of hive partition table's partition push down. That pr is for ORC & Parquet filter condition push down.

By the way, we are in the Kyuubi wechat group

@AngersZhuuuu AngersZhuuuu changed the title [WIP][SPARK-28169] Fix PushDown failed by "OR" expression [WIP][SPARK-28169] Fix Partition table partition PushDown failed by "OR" expression Jun 27, 2019
@AngersZhuuuu AngersZhuuuu changed the title [WIP][SPARK-28169] Fix Partition table partition PushDown failed by "OR" expression [SPARK-28169] Fix Partition table partition PushDown failed by "OR" expression Jun 27, 2019
@AngersZhuuuu
Copy link
Contributor Author

}

val extractedPruningPredicates = extractPushDownPredicate(predicates, partitionKeyIds)
.filter(_ != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu, just for clarification, this code path does support OR expression but you want to do a partial pushdown right? Considering it needs a lot of codes as @wangyum pointed out, I think we should better try to promote to use (or convert) Spark's Parquet or ORC. It looks like an overkill to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon What I do is to extract condition's about partition keys.For the old code :
val (pruningPredicates, otherPredicates) = predicates.partition { predicate => !predicate.references.isEmpty && predicate.references.subsetOf(partitionKeyIds) }

If in expression, there contains other key, it won't be a push to HiveTableScanExec, So what I to it to fix this situation, just extract all condition about partition keys, then push it to HiveTableScanExec, HiveTableScanExec will handle complex combine expressions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon
Spark's Parquet or ORC is perfect, and it can push down filter condition, but it can't resolve the problem that when we read a Hive table, our first behavior is scan, What this pr want to do is to reduce the time of resolve file info and partition metadata, and the file we scan. Then the file num or partition num is big, it takes too long.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we convert Hive table reading operations into Spark's ones, via, for instance, spark.sql.hive.convertMetastoreParquet conf. If the diff is small, I might be fine but this does look like an overkill to me. I haven't taken a close look but it virtually looks like we need a fix like #24598

I won't object if some other committers are fine with that.

Copy link
Contributor Author

@AngersZhuuuu AngersZhuuuu Jun 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I know that it's better to convert Hive table reading operations into Spark's , but it can't fix all situation. In our production env, we just change hive data's default storage type to orc. For partition table, if different partition's serde is not the same, Convert will failed, since during converting , it will check all partition's file by table level serde.

@cloud-fan
Copy link
Contributor

shall we just add an optimizer rule to do CNF conversion?

@AngersZhuuuu
Copy link
Contributor Author

shall we just add an optimizer rule to do CNF conversion?

@cloud-fan I think it's not about optimizer, since this is special for partition push down, and origin rule in HiveStrategies is too simple.

@cloud-fan
Copy link
Contributor

CNF conversion can solve this problem, isn't it?

@AngersZhuuuu
Copy link
Contributor Author

CNF conversion can solve this problem, isn't it?

Understand you, You mean In Optimizer level change condition it to CNF , only this can't solve this pr's problem. And it may seriously affect other rules.

In HiveStrategies to Change it to CNF can work well .

@AngersZhuuuu
Copy link
Contributor Author

CNF conversion can solve this problem, isn't it?

PS: First, I wish to convert predicate to CNF, but since I found that HiveTableScanExec can resolve complex partitionkeys's expression combine of And & Or , so I just need to keep origin combine relation of partitionkey expression .

@AngersZhuuuu
Copy link
Contributor Author

@srowen Could you review this pr. And give some advise

@srowen
Copy link
Member

srowen commented Jun 27, 2019

This part really isn't my area

@cloud-fan
Copy link
Contributor

This place is definitely not the only place that extracts partition predicates (e.g. FileSourceStrategy), I'm -1 to add a hack just in this place. I still prefer to add a CNF conversion rule to solve this problem for all the places, or other general solutions.

朱夷 and others added 2 commits June 28, 2019 14:14
Fix situation of  A is partition key
SELECT * FROM  A
WHERE A=1 OR B = 2

Int this case, we should ignore this condition
@AngersZhuuuu
Copy link
Contributor Author

@cloud-fan
Checked a lot, if we make a rule for convert CNF, we still need to change some code in FileSourceStrategy and HiveStrategies or others.
How about make it as a general method, and call it in each section where need to do this?

* FROM DEFAULT.PARTITION_TABLE
* WHERE DT = 20190601 OR (DT = 20190602 AND C = "TEST")
*
* Where condition "DT = 20190601 OR (DT = 20190602 AND C = "TEST")"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be more clear about the approach here. We try to weaken a predicate so that it only refers to partition columns, by removing some branches in AND. Let's also mention the corner case when there is no partition column attribute in the predicate(we should return Nil in this case?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
In this place it will return (DT = 20190601 OR DT = 20190602).
But this whole condition will still return to filter.
What I want to do is purely to avoid read unnecessary partitions. When this case we only read partition(dt=20190601 + dt=20190602), If we don't push down this, we will read all data.

In condition " (DT = 20190602 AND C = "TEST") ", DT = 20190602 is C = "TEST"'s precondition.

If the whole condition is DT = 20190601 OR (DT = 20190602 OR C = "TEST"). We should return null, since DT = 20190602 is not C = "TEST"'s constraint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
In my code. coming predicate Set[Expression] has a potential AND logical.
For one Expression, it will be restricted by other same level Expression.
and :

  • if it is a combine of AND each side can be a constraint to others, so it one side is tenable, it can return a tenable condition.
  • if it is a combine of OR, if one side is out of control(such as have no condition about partition cols) this whole OR Expression should return NONE. Only when both side of OR 's child is reasonable, it can return a tenable combine of OR.
  • if it 's a multilayer nested Expression combined by BinaryOperator. It will visit the lowest level, if it found one level's OR Expression is untenable, it will break this Expression totally and return null.

*/
object ExtractPartitionPredicates extends Logging {

private type ReturnType = Seq[Expression]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this type alias as Seq[Expression] is short.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, Thanks.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-28169] Fix Partition table partition PushDown failed by "OR" expression [SPARK-28169][SQL] Fix Partition table partition PushDown failed by "OR" expression Jul 7, 2019
@AngersZhuuuu
Copy link
Contributor Author

This place is definitely not the only place that extracts partition predicates (e.g. FileSourceStrategy), I'm -1 to add a hack just in this place. I still prefer to add a CNF conversion rule to solve this problem for all the places, or other general solutions.

Checked previous activity for the CNF normalization in below issue:
https://issues.apache.org/jira/browse/SPARK-6624

Seems we can't make it as an Optimizer Rule, it should only work for some special point such as partition predicate and filter push down。

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants