-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge #37711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge #37711
Conversation
|
cc @cloud-fan, @sigmod, @singhpk234 |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
| pushedDownAggEqual(o) && | ||
| normalizedPartitionFilters == o.normalizedPartitionFilters && | ||
| normalizedDataFilters == o.normalizedDataFilters) { | ||
| val builder = table.newScanBuilder(options).asInstanceOf[ParquetScanBuilder] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] should we add assertion for table.newScanBuilder should be a instance of ParquetScanBuilder ?
| private def pushedDownAggEqual(p: ParquetScan) = { | ||
| if (pushedAggregate.nonEmpty && p.pushedAggregate.nonEmpty) { | ||
| AggregatePushDownUtils.equivalentAggregations(pushedAggregate.get, p.pushedAggregate.get) | ||
| } else { | ||
| pushedAggregate.isEmpty && p.pushedAggregate.isEmpty | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we move this to FileScan itself ? OrcScan also has some duplicate code
| } | ||
|
|
||
| override def mergeWith(other: SupportsMerge, table: SupportsRead): Optional[SupportsMerge] = { | ||
| if (other.isInstanceOf[ParquetScan]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can replace this with case match
| normalizedPartitionFilters == o.normalizedPartitionFilters && | ||
| normalizedDataFilters == o.normalizedDataFilters) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] should we just disjunct these diff filters from scans and run a boolean simplification on top of it ? to handle the cases with diff partition and data filter on the scans ?
Are we expecting some heuristic here ? as if when combining the filters will be useful ?
| if (mappedNewKeyGroupedPartitioning.map(_.map(_.canonicalized)) == | ||
| cachedKeyGroupedPartitioning.map(_.map(_.canonicalized))) { | ||
| val mappedNewOrdering = newOrdering.map(_.map(mapAttributes(_, outputMap))) | ||
| if (mappedNewOrdering.map(_.map(_.canonicalized)) == |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] can we simplify the if else structure here ? something like
if (isKeyGroupPartitioningSame && isOrderingSame) {
// merge scans and update cachedRelation
} else {
None
}|
Thanks for the comments @singhpk234! Maybe @cloud-fan or @gengliangwang are you interested in this PR? |
What changes were proposed in this pull request?
After #32298 we were able to merge scalar subquery plans, but DSv2 sources couldn't benefit from that improvement.
The reason for DSv2 sources were not supported by default is that
SparkOptimizer.earlyScanPushDownRulesbuild differentScans in logical plans beforeMergeScalarSubqueriesis executed. ThoseScans can have different pushed-down filters and aggregates and different column pruning defined, which prevents merging the plans.I would not alter the order of optimization rules as
MergeScalarSubqueriesworks better when logical plans are better optimized (a plan is closer to its final logical form, e.g.InjectRuntimeFilteralready executed). But instead, I would propose a new interface that aScancan implement to indicate if merge is possible with anotherScanand do the merge if it make sense depending on theScan's actual parameters.This PR:
SupportsMergethatScans can implement to indicate if 2Scans can be merged andSupportsMergetoParquetScanas the first DSv2 source. The merge only happens if pushed-down data and partition filters and pushed-down aggregates match.Why are the changes needed?
Scalar subquery merge can bring considerable performance improvement (see the original #32298 for the benchmarks) so DSv2 sources should also benefit from that feature.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added new UT.