-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-39180][SQL] Simplify the planning of limit and offset #36541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3f59794
cb09f9c
eee8db7
cd7b4f9
0e2348c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,55 +81,56 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
| */ | ||
| object SpecialLimits extends Strategy { | ||
| override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { | ||
| case ReturnAnswer(rootPlan) => rootPlan match { | ||
| case Limit(IntegerLiteral(limit), Sort(order, true, child)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems we missing this case.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. handled by |
||
| if limit < conf.topKSortFallbackThreshold => | ||
| TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil | ||
| case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing too ? |
||
| if limit < conf.topKSortFallbackThreshold => | ||
| TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil | ||
| // Call `planTakeOrdered` first which matches a larger plan. | ||
| case ReturnAnswer(rootPlan) => planTakeOrdered(rootPlan).getOrElse(rootPlan match { | ||
| // We should match the combination of limit and offset first, to get the optimal physical | ||
| // plan, instead of planning limit and offset separately. | ||
| case LimitAndOffset(limit, offset, child) => | ||
| CollectLimitExec(limit = limit, child = planLater(child), offset = offset) | ||
| case OffsetAndLimit(offset, limit, child) => | ||
| // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. | ||
| CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
| case Limit(IntegerLiteral(limit), child) => | ||
| CollectLimitExec(limit, planLater(child)) :: Nil | ||
| case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), | ||
| Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold => | ||
| TakeOrderedAndProjectExec( | ||
| limit, order, child.output, planLater(child), offset) :: Nil | ||
| case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), | ||
| Project(projectList, Sort(order, true, child))) | ||
| if limit + offset < conf.topKSortFallbackThreshold => | ||
| TakeOrderedAndProjectExec( | ||
| limit, order, projectList, planLater(child), offset) :: Nil | ||
| case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => | ||
| CollectLimitExec(limit, planLater(child), offset) :: Nil | ||
| CollectLimitExec(limit = limit, child = planLater(child)) | ||
| case logical.Offset(IntegerLiteral(offset), child) => | ||
| CollectLimitExec(child = planLater(child), offset = offset) :: Nil | ||
| CollectLimitExec(child = planLater(child), offset = offset) | ||
| case Tail(IntegerLiteral(limit), child) => | ||
| CollectTailExec(limit, planLater(child)) :: Nil | ||
| case other => planLater(other) :: Nil | ||
| } | ||
| CollectTailExec(limit, planLater(child)) | ||
| case other => planLater(other) | ||
| }) :: Nil | ||
|
|
||
| case other => planTakeOrdered(other).toSeq | ||
| } | ||
|
|
||
| private def planTakeOrdered(plan: LogicalPlan): Option[SparkPlan] = plan match { | ||
| // We should match the combination of limit and offset first, to get the optimal physical | ||
| // plan, instead of planning limit and offset separately. | ||
| case LimitAndOffset(limit, offset, Sort(order, true, child)) | ||
| if limit < conf.topKSortFallbackThreshold => | ||
| Some(TakeOrderedAndProjectExec( | ||
| limit, order, child.output, planLater(child), offset)) | ||
| case LimitAndOffset(limit, offset, Project(projectList, Sort(order, true, child))) | ||
| if limit < conf.topKSortFallbackThreshold => | ||
| Some(TakeOrderedAndProjectExec( | ||
| limit, order, projectList, planLater(child), offset)) | ||
| // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. | ||
| case OffsetAndLimit(offset, limit, Sort(order, true, child)) | ||
| if offset + limit < conf.topKSortFallbackThreshold => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we use |
||
| Some(TakeOrderedAndProjectExec( | ||
| offset + limit, order, child.output, planLater(child), offset)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
| case OffsetAndLimit(offset, limit, Project(projectList, Sort(order, true, child))) | ||
| if offset + limit < conf.topKSortFallbackThreshold => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
| Some(TakeOrderedAndProjectExec( | ||
| offset + limit, order, projectList, planLater(child), offset)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
| case Limit(IntegerLiteral(limit), Sort(order, true, child)) | ||
| if limit < conf.topKSortFallbackThreshold => | ||
| TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil | ||
| Some(TakeOrderedAndProjectExec( | ||
| limit, order, child.output, planLater(child))) | ||
| case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) | ||
| if limit < conf.topKSortFallbackThreshold => | ||
| TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil | ||
| // This is a global LIMIT and OFFSET over a logical sorting operator, | ||
| // where the sum of specified limit and specified offset is less than a heuristic threshold. | ||
| // In this case we generate a physical top-K sorting operator, passing down | ||
| // the limit and offset values to be evaluated inline during the physical | ||
| // sorting operation for greater efficiency. | ||
| case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), | ||
| Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold => | ||
| TakeOrderedAndProjectExec( | ||
| limit, order, child.output, planLater(child), offset) :: Nil | ||
| case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), | ||
| Project(projectList, Sort(order, true, child))) | ||
| if limit + offset < conf.topKSortFallbackThreshold => | ||
| TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), offset) :: Nil | ||
| case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => | ||
| GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil | ||
| case _ => | ||
| Nil | ||
| Some(TakeOrderedAndProjectExec( | ||
| limit, order, projectList, planLater(child))) | ||
| case _ => None | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -814,12 +815,19 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
| case logical.LocalRelation(output, data, _) => | ||
| LocalTableScanExec(output, data) :: Nil | ||
| case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil | ||
| // We should match the combination of limit and offset first, to get the optimal physical | ||
| // plan, instead of planning limit and offset separately. | ||
| case LimitAndOffset(limit, offset, child) => | ||
| GlobalLimitExec(limit, planLater(child), offset) :: Nil | ||
| case OffsetAndLimit(offset, limit, child) => | ||
| // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. | ||
| GlobalLimitExec(limit = offset + limit, child = planLater(child), offset = offset) :: Nil | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
| case logical.LocalLimit(IntegerLiteral(limit), child) => | ||
| execution.LocalLimitExec(limit, planLater(child)) :: Nil | ||
| case logical.GlobalLimit(IntegerLiteral(limit), child) => | ||
| execution.GlobalLimitExec(limit, planLater(child)) :: Nil | ||
| case logical.Offset(IntegerLiteral(offset), child) => | ||
| GlobalLimitAndOffsetExec(offset = offset, child = planLater(child)) :: Nil | ||
| GlobalLimitExec(child = planLater(child), offset = offset) :: Nil | ||
| case union: logical.Union => | ||
| execution.UnionExec(union.children.map(planLater)) :: Nil | ||
| case g @ logical.Generate(generator, _, outer, _, _, child) => | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, we not use
globalLimitin physical plan.It seems we can return
localLimithere. Then we can avoid+in physical plan.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern match is to match a logical offset + limit, and we care more about semantics here. Returning
localLimitis super confusing.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to go for better readability, instead of saving a bit typing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK