Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
// This batch pushes filters and projections into scan nodes. Before this batch, the logical
// plan may contain nodes that do not report stats. Anything that uses stats must run after
// this batch.
Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: _*) :+
Batch("Early Scan Push-Down", Once, earlyScanPushDownRules: _*) :+
Batch("Update CTE Relation Stats", Once, UpdateCTERelationStats) :+
// Since join costs in AQP can change between multiple runs, there is no reason that we have an
// idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
* <li>Check Analysis Rules.</li>
* <li>Optimizer Rules.</li>
* <li>Pre CBO Rules.</li>
* <li>Early Scan Push-Down</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After so many discussions in #30808 , I'm really worried about the naming of this new extension point.

In general, this new extension point allows people to inject custom data source operator pushdown rules, which run after the built-in ones. But then the existing Pre CBO rules becomes a confusing name, as the pushdown rules are also pre-CBO.

We may need more time to think about the naming, or think if really need custom data source pushdown rules.

Copy link
Contributor Author

@beliefer beliefer Dec 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that the name of Pre-CBO Rules is too wide and the meaning in the comment is not the same. In my opinion we should not limit the flexibility because of this name.

* <li>Planning Strategies.</li>
* <li>Customized Parser.</li>
* <li>(External) Catalog listeners.</li>
Expand Down Expand Up @@ -226,6 +227,24 @@ class SparkSessionExtensions {
preCBORules += builder
}

private[this] val earlyScanPushDownRules = mutable.Buffer.empty[RuleBuilder]

private[sql] def buildEarlyScanPushDownRules(session: SparkSession): Seq[Rule[LogicalPlan]] = {
earlyScanPushDownRules.map(_.apply(session)).toSeq
}

/**
* Inject an optimizer `Rule` builder that rewrites logical plans into the [[SparkSession]].
* The injected rules will be executed once after the operator optimization batch and
* after any push down optimization rules.
* 'Pre CBO Rules' and 'Early Scan Push-Down' are executed before and after
* `V2ScanRelationPushDown`. So the user can apply the custom rules related to pushdown
* after `V2ScanRelationPushDown` fails.
*/
def injectEarlyScanPushDownRule(builder: RuleBuilder): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the classdoc, which only mentions

 * This current provides the following extension points:
 *
 * <ul>
 * <li>Analyzer Rules.</li>
 * <li>Check Analysis Rules.</li>
 * <li>Optimizer Rules.</li>
 * <li>Pre CBO Rules.</li>
 * <li>Planning Strategies.</li>
 * <li>Customized Parser.</li>
 * <li>(External) Catalog listeners.</li>
 * <li>Columnar Rules.</li>
 * <li>Adaptive Query Stage Preparation Rules.</li>
 * </ul>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to clarify how is this different from Pre CBO Rules

earlyScanPushDownRules += builder
}

private[this] val plannerStrategyBuilders = mutable.Buffer.empty[StrategyBuilder]

private[sql] def buildPlannerStrategies(session: SparkSession): Seq[Strategy] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ abstract class BaseSessionStateBuilder(
*
* Note that this may NOT depend on the `optimizer` function.
*/
protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil
protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = {
extensions.buildEarlyScanPushDownRules(session)
}

/**
* Custom rules for rewriting plans after operator optimization and before CBO.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
}
}

test("SPARK-37518: inject a early scan push down rule") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we only require JIRA id for bug fixes and regressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just references

withSession(Seq(_.injectEarlyScanPushDownRule(MyRule))) { session =>
assert(session.sessionState.optimizer.earlyScanPushDownRules.contains(MyRule(session)))
}
}

test("inject spark planner strategy") {
withSession(Seq(_.injectPlannerStrategy(MySparkStrategy))) { session =>
assert(session.sessionState.planner.strategies.contains(MySparkStrategy(session)))
Expand Down