Skip to content

Conversation

@tanelk
Copy link
Contributor

@tanelk tanelk commented Dec 22, 2020

What changes were proposed in this pull request?

To improve performance, changed the UnaryNode.getAllValidConstraints to discard constraints, that will not change the end result.

Why are the changes needed?

There has been at least two attempts at speeding up the constraint system: #30185 and #26257. Both of them seem to have stalled.

Optimizing project and aggregate nodes could have exponential memory and time complexity in relation to the number of aliases they have. Most simple example would be a Project, that has incoming columns a1, a2, ..., an and a child constraint a1 + a2 + ... + an > 0. If it would alias its columns a1 as b1, a2 as b2, ..., an as bn, then UnaryNode.getAllValidConstraints would return 2 to the power of n constraints (plus some isNotNull constraints) - each column is replaced by its alias in half of the constraints. All except one of these constraints would get filtered out later on - the one where all aliases are replaced is kept. Eagerly filtering these out will improve the performance and avoids possible OOM.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New and existing UTs.
Manually verified the performance gain by an example provided in #30185 :

  object Optimize extends RuleExecutor[LogicalPlan] {
    val batches =
      Batch("InferAndPushDownFilters", FixedPoint(100),
        PushPredicateThroughJoin,
        PushPredicateThroughNonJoin,
        InferFiltersFromConstraints,
        CombineFilters,
        SimplifyBinaryComparison,
        BooleanSimplification,
        PruneFilters) :: Nil
  }

  test("benchmark") {
    val tr = LocalRelation(
      'a.int, 'b.int, 'c.int, 'd.int, 'e.int,
      'f.int, 'g.int, 'h.int, 'i.int, 'j.int,
      'k.int, 'l.int, 'm.int, 'n.int)

    val plan = tr.select('a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j, 'k, 'l, 'm, 'n,
      CaseWhen(Seq(('a.attr + 'b.attr + 'c.attr + 'd.attr + 'e.attr + 'f.attr + 'g.attr
        + 'h.attr + 'i.attr + 'j.attr + 'k.attr + 'l.attr + 'm.attr + 'n.attr > Literal(1),
        Literal(1)),
        ('a.attr + 'b.attr + 'c.attr + 'd.attr + 'e.attr + 'f.attr + 'g.attr + 'h.attr +
          'i.attr + 'j.attr + 'k.attr + 'l.attr + 'm.attr + 'n.attr > Literal(2), Literal(2))),
        Option(Literal(0))).as("JoinKey1")
    ).select('a.attr.as("a1"), 'b.attr.as("b1"), 'c.attr.as("c1"),
      'd.attr.as("d1"), 'e.attr.as("e1"), 'f.attr.as("f1"),
      'g.attr.as("g1"), 'h.attr.as("h1"), 'i.attr.as("i1"),
      'j.attr.as("j1"), 'k.attr.as("k1"), 'l.attr.as("l1"),
      'm.attr.as("m1"), 'n.attr.as("n1"), 'JoinKey1.attr.as("cf1"),
      'JoinKey1.attr).select('a1, 'b1, 'c1, 'd1, 'e1, 'f1, 'g1, 'h1, 'i1, 'j1, 'k1,
      'l1, 'm1, 'n1, 'cf1, 'JoinKey1).join(tr, condition = Option('a.attr <=> 'JoinKey1.attr))

    val t1 = System.currentTimeMillis()
    Optimize.execute(plan.analyze)
    val t2 = System.currentTimeMillis()

    val timeTaken = t2 - t1
    // scalastyle:off println
    println(s"Time taken to optimize = $timeTaken ms")
    // scalastyle:on println
  }

The optimization time for this was reduced from 25s to 0.2s.

@github-actions github-actions bot added the SQL label Dec 22, 2020
@SparkQA
Copy link

SparkQA commented Dec 22, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37840/

@SparkQA
Copy link

SparkQA commented Dec 22, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37840/

@SparkQA
Copy link

SparkQA commented Dec 23, 2020

Test build #133242 has finished for PR 30894 at commit 08dc723.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 23, 2020

Test build #133240 has finished for PR 30894 at commit 03c6e56.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 23, 2020

Test build #133262 has finished for PR 30894 at commit 4bd8c06.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 23, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37860/

@SparkQA
Copy link

SparkQA commented Dec 23, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37860/

@SparkQA
Copy link

SparkQA commented Dec 23, 2020

Test build #133318 has finished for PR 30894 at commit e554605.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ConstraintPropagationSuite extends SparkFunSuite with PlanTest with PrivateMethodTester

@tanelk
Copy link
Contributor Author

tanelk commented Dec 24, 2020

cc @maropu , @HyukjinKwon

@HyukjinKwon
Copy link
Member

cc @gengliangwang too

}

/**
We keep the child constraints and equality between original and aliased attributes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since allConstraints is initially assigned as child.constraints, why do we need to keep the child constraints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we might have filtered out some of them

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you show an example for why we should keep child.constraints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example this test would fail:

  test("SPARK-33152: infer from child constraint") {
    val plan = LocalRelation('a.int, 'b.int)
      .where('a === 'b)
      .select('a, ('b + 1) as 'b2)
      .analyze

    verifyConstraints(plan.constraints, ExpressionSet(Seq(
      IsNotNull(resolveColumn(plan, "a")),
      resolveColumn(plan, "a") + 1  <=> resolveColumn(plan, "b2")
    )))
  }

To infer a + 1 <=> b2, it would need the child constraints.

so [[ConstraintHelper.inferAdditionalConstraints]] would have the full information available.
*/
projectList.foreach {
case alias @ Alias(expr, _) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we just need to handle a @ Alias(l: Literal, _) here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be right, that only the literal aliases are used currently, but all aliases were kept in the previous code (lines 180 & 187) and when somebody wants to improve inferAdditionalConstraints, then they might need these.
Removing non-literal aliases would be marginal performance improvement and I would rather keep the existing behavior.


// For each expression collect its aliases
val aliasMap = projectList.collect{
case alias @ Alias(expr, _) if !expr.foldable => (expr.canonicalized, alias)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to filter the non-deterministic expressions here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are correct. I'll add this as a safety feature.

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38215/

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38215/

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Test build #133626 has finished for PR 30894 at commit 8cf2da9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38238/

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38238/

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Test build #133649 has finished for PR 30894 at commit 0c156f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hammertank
Copy link

Checked the 2 git action errors:

  1. CliSuite.SPARK-29022: Commands using SerDe provided in --hive.aux.jars.path
    It is a timeout error and cannot be reproduced in my local enviroment.
    This test will download jar file from maven repository. Maybe it is a temporary network issue.
  2. SparkScriptTransformationSuite.SPARK-33934: Add SparkFile's root dir to env property PATH
    Caused by PR [SPARK-33934][SQL] Add SparkFile's root dir to env property PATH #30973 and has been fixed by PR [SPARK-33934][SQL][FOLLOW-UP] Use SubProcessor's exit code as assert condition to fix flaky test #31046

Look forward to see this PR get merged.

@SparkQA
Copy link

SparkQA commented Mar 31, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41354/

@SparkQA
Copy link

SparkQA commented Mar 31, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41354/

@SparkQA
Copy link

SparkQA commented Mar 31, 2021

Test build #136771 has finished for PR 30894 at commit f1332eb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tanelk
Copy link
Contributor Author

tanelk commented May 24, 2021

@cloud-fan, there have been some reviews allready, but perhaps you could also take a look.

@github-actions
Copy link

github-actions bot commented Sep 2, 2021

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 2, 2021
@github-actions github-actions bot closed this Sep 3, 2021
@boneanxs
Copy link

boneanxs commented Sep 8, 2022

Hi @tanelk @HyukjinKwon @maropu any updates for this pr, we've also met this issue for many spark jobs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants