Skip to content

Conversation

@peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Aug 29, 2022

What changes were proposed in this pull request?

After #32298 we were able to merge scalar subquery plans, but DSv2 sources couldn't benefit from that improvement.
The reason for DSv2 sources were not supported by default is that SparkOptimizer.earlyScanPushDownRules build different Scans in logical plans before MergeScalarSubqueries is executed. Those Scans can have different pushed-down filters and aggregates and different column pruning defined, which prevents merging the plans.
I would not alter the order of optimization rules as MergeScalarSubqueries works better when logical plans are better optimized (a plan is closer to its final logical form, e.g. InjectRuntimeFilter already executed). But instead, I would propose a new interface that a Scan can implement to indicate if merge is possible with another Scan and do the merge if it make sense depending on the Scan's actual parameters.

This PR:

  • adds a new interface SupportsMerge that Scans can implement to indicate if 2 Scans can be merged and
  • adds implementation of SupportsMerge to ParquetScan as the first DSv2 source. The merge only happens if pushed-down data and partition filters and pushed-down aggregates match.

Why are the changes needed?

Scalar subquery merge can bring considerable performance improvement (see the original #32298 for the benchmarks) so DSv2 sources should also benefit from that feature.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added new UT.

@peter-toth
Copy link
Contributor Author

cc @cloud-fan, @sigmod, @singhpk234

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Dec 15, 2022
@github-actions github-actions bot closed this Dec 16, 2022
pushedDownAggEqual(o) &&
normalizedPartitionFilters == o.normalizedPartitionFilters &&
normalizedDataFilters == o.normalizedDataFilters) {
val builder = table.newScanBuilder(options).asInstanceOf[ParquetScanBuilder]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] should we add assertion for table.newScanBuilder should be a instance of ParquetScanBuilder ?

Comment on lines +112 to +118
private def pushedDownAggEqual(p: ParquetScan) = {
if (pushedAggregate.nonEmpty && p.pushedAggregate.nonEmpty) {
AggregatePushDownUtils.equivalentAggregations(pushedAggregate.get, p.pushedAggregate.get)
} else {
pushedAggregate.isEmpty && p.pushedAggregate.isEmpty
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we move this to FileScan itself ? OrcScan also has some duplicate code

}

override def mergeWith(other: SupportsMerge, table: SupportsRead): Optional[SupportsMerge] = {
if (other.isInstanceOf[ParquetScan]) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can replace this with case match

Comment on lines +156 to +157
normalizedPartitionFilters == o.normalizedPartitionFilters &&
normalizedDataFilters == o.normalizedDataFilters) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] should we just disjunct these diff filters from scans and run a boolean simplification on top of it ? to handle the cases with diff partition and data filter on the scans ?

Are we expecting some heuristic here ? as if when combining the filters will be useful ?

Comment on lines +293 to +296
if (mappedNewKeyGroupedPartitioning.map(_.map(_.canonicalized)) ==
cachedKeyGroupedPartitioning.map(_.map(_.canonicalized))) {
val mappedNewOrdering = newOrdering.map(_.map(mapAttributes(_, outputMap)))
if (mappedNewOrdering.map(_.map(_.canonicalized)) ==

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] can we simplify the if else structure here ? something like

if (isKeyGroupPartitioningSame && isOrderingSame) { 
 // merge scans and update cachedRelation 
} else  { 
  None
}

@peter-toth
Copy link
Contributor Author

peter-toth commented Dec 29, 2022

Thanks for the comments @singhpk234!
Unfortunately this PR got closed due to lack of reviews and can't be reopened. I'm happy to open a new one and take your suggestions into account, but first it would be great if a Spark committer would confirm that the proposed SupportsMerge scan interface makes sense and somone have willingness to give some feedback about the change. Any feedback is much appreciated, really.

Maybe @cloud-fan or @gengliangwang are you interested in this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants