Skip to content

Conversation

@ahshahid
Copy link

@ahshahid ahshahid commented Oct 29, 2020

What changes were proposed in this pull request?

This PR proposes new logic to store the constraint and track aliases in projection which eliminates the need of pessimistically generating all the permutations of a given constraint. It is also more effective in correctly identifying the filters which can be pruned, apart from minimizing the memory used as compared to the current code. This also has changes to push compound filters if the join condition is on multiple attributes and the constraint comprises of more than 1 attributes of the join conditions.

Right now the code retains the old logic of constraint management along with the new logic. It is controlled by a sql conf property "spark.sql.optimizer.optimizedConstraintPropagation.enabled" which is by default true. Once the PR is approved it would make sense to remove the old code & merge the code of ConstraintSet into ExpressionSet and removing certain if else blocks in the Optimizer & the function Optimizer.getAllConstraints and LogicalPlan.getAllValidConstraints.
It is the code of getAllValidConstraints which generates all the permutations of the base constraint.

The new logic is as follows:
In the class ConstraintSet which extends ExpressionSet, we track the aliases , along with the base constraint.
Any constraint which is added to the ConstraintSet is stored in the most canonicalized form (i.e consisting of only those attributes which are part of the output set and NOT the Alias's attribute).

for eg consider a hypothetical plan
Projection1 ( a, a as a1, a as a2, b , b as b1, b as b2, c, a +b as z)
|
Filter ( a + b > 10)
|
base relation (a, b, c, d)

At the node Filter the constraint set will just have constraint a + b > 10
At the Node Projection1 , the constraint set will have
constraint a + b > 10
and maintain following buffers
buff1 -> a , a1.attribute, a2. attribute
buff2 -> b, b1.attribute, b2.attribute
buff3 -> a + b, z.attribute

constraint a + b > 10 is already canonicalized in terms of output attributes.

Now suppose there are two filters on top of projection1
Filter( z > 10) and Filter ( a1 + b2 > 10)

To prune the above two filters, we canonicalize z as a + b ( from the data maintained in the constraintset) & check if the underlying set contains a +b > 10 & so can be pruned.
For Filter a1 + b2 > 10, we identify the buffer to which a1 & b2 belong to and replace it with 0th elements of the buffer, which will yield a +b > 10, and so filter can be pruned.

Now suppose there is another Project2 ( a1, a2, b1, b2, z, c)
i.e say attributes a & b are no longer part of outputset.
The idea is that "as much as possible try to make a constraint survive).
So in Project2 , the atttributes a & b are being eliminated.
we have a constraint a + b > 10 which is dependent on it.
so in the constraintset of the ProjectP2, we update it such that
a + b > 10 becomes ----> a1.attr + b1.attr > 10
buff1 a , a1.attribute, a2. attribute ---> a1.attribute, a2. attribute
buff2 b , b1.attribute, b2. attribute ---> b1.attribute, b2. attribute
buff3 a +b , z.attribute -->. a1.attr + b1.attr , z.attr

This way by tracking aliases & just storing the canonicalized base constraints we can eliminate the need of pessimistically generating all combination of constraints.

For inferring new Filter from constraints ,
we use following logic
New Filter = Filter.constraints -- ( Filter.child.constraints ++ Filter.constraints.convertToCanonicalizedIfRequired(Filter.conditions) )
So the idea is that new filter conditions without redundancy can be obtained by difference of current node's constraints & the child node's constraints & the condition itself properly canonicalized in terms of base attributes which will be part of the output set of filter node.

For inferring new filters for Join push down, we identify all the equality conditions & then the attributes are segregated on the lines of LHS & RHS of joins. So to identify filters for push down on RHS side, we get all equality atttributes of LHS side & ask the constraintset to return all the constraints which are subset of the passed LHS attributes. The LHS attributes are appropriately canonicalized & the constrainst identified.
Once the constraints are know, we can replace the attributes with the corresponding RHS attributes. This helps in identifying the compound filters for push down & not just single attribute filters.

Below is a description of the changes proposed.

ExpressionSet: Apart from adding some new functions, fixed the two bugs in the ExpressionSet where in filter & filterNot, e.canonicalized was being used. e is already canonicalized. Also in very complex expressions canonicalization of a canonicalized object ( especially if contains a join expression ) does not behave correctly in current spark code.
The ExpressionSet has added methods just to retain the existing constraints code.

ConstraintSet: This is the class which does the tracking of the aliases , stores the constraints in the canonicalized form, updates the constraints using available aliases if any of the attribute comprising constraint is getting eliminated. The contains method of this class is used for filter pruning. It also identifies those constraints which can generated new filters for push down in join nodes.

Rest all the changes are just to integrate the new logic as well as retain the old constraints logic.

Pls notice that related to tpcds plan stability , I had to add new golden files for q75. The change as such is trivial.
previously pushed filter was generated as
PushedFilters: [IsNotNull(cr_order_number), IsNotNull(cr_item_sk)]
and with the change it is
PushedFilters: [IsNotNull(cr_item_sk), IsNotNull(cr_order_number)]

This PR also eliminates the need of EqualNullSafe constraints for the alias.
It also is able to handle the literal boolean constraints.

Why are the changes needed?

  1. This issue if not fixed can cause OutOfMemory issue or unacceptable query compilation times.
    Added a test "plan equivalence with case statements and performance comparison with benefit of more than 10x conservatively"
    in
    org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite

with this PR the compilation time is 247 ms vs 13958 ms without the change
2) It is more effective in filter pruning as is evident in some of the tests in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite where current code is not able to identify the redundant filter in some cases.

  1. It is able to generate a better optimized plan for join queries as it can push compound predicates.

Does this PR introduce any user-facing change?

No

How was this patch tested?

I have been running the target /build/mvn clean install.
As seen below the sql, catalyst modules are passing cleanly.
The hive module reports failure due to 1 suite aborted, but no test failures.
I have been told by my colleagues that there is an issue in Hive module ( unrelated to this change).
I am looking further as to what is causing the abort.
In case if you have any information, please add.
02:25:27.539 WARN org.apache.hadoop.hive.conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
02:25:27.539 WARN org.apache.hadoop.hive.conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
Run completed in 2 hours, 27 minutes, 59 seconds.
Total number of tests run: 3123
Suites: completed 108, aborted 1
Tests: succeeded 3123, failed 0, canceled 0, ignored 18, pending 0
*** 1 SUITE ABORTED ***

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Spark Project Parent POM 3.1.0-SNAPSHOT:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [ 17.115 s]
[INFO] Spark Project Tags ................................. SUCCESS [ 21.426 s]
[INFO] Spark Project Sketch ............................... SUCCESS [ 41.413 s]
[INFO] Spark Project Local DB ............................. SUCCESS [ 20.063 s]
[INFO] Spark Project Networking ........................... SUCCESS [01:12 min]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 24.297 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [ 32.305 s]
[INFO] Spark Project Launcher ............................. SUCCESS [ 17.877 s]
[INFO] Spark Project Core ................................. SUCCESS [59:57 min]
[INFO] Spark Project ML Local Library ..................... SUCCESS [01:12 min]
[INFO] Spark Project GraphX ............................... SUCCESS [05:22 min]
[INFO] Spark Project Streaming ............................ SUCCESS [08:48 min]
[INFO] Spark Project Catalyst ............................. SUCCESS [16:55 min]
[INFO] Spark Project SQL .................................. SUCCESS [ 02:25 h]
[INFO] Spark Project ML Library ........................... SUCCESS [42:14 min]
[INFO] Spark Project Tools ................................ SUCCESS [ 19.337 s]
[INFO] Spark Project Hive ................................. FAILURE [ 02:33 h]
[INFO] Spark Project REPL ................................. SKIPPED
[INFO] Spark Project Assembly ............................. SKIPPED
[INFO] Kafka 0.10+ Token Provider for Streaming ........... SKIPPED
[INFO] Spark Integration for Kafka 0.10 ................... SKIPPED
[INFO] Kafka 0.10+ Source for Structured Streaming ........ SKIPPED
[INFO] Spark Project Examples ............................. SKIPPED
[INFO] Spark Integration for Kafka 0.10 Assembly .......... SKIPPED
[INFO] Spark Avro ......................................... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 07:18 h
[INFO] Finished at: 2020-10-29T02:27:10-07:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.scalatest:scalatest-maven-plugin:2.0.0:test (test) on project spark-hive_2.12: There are test failures -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command

@ahshahid ahshahid changed the title Spark 33152 Spark-33152: This PR proposes a new logic to maintain & track constraints which solves the OOM or performance issues in query compilation Oct 29, 2020
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@ahshahid
Copy link
Author

I think I will have to rebase correctly as it is showing 526 files modified...

@ahshahid
Copy link
Author

The PR is now ready for review.

@dongjoon-hyun dongjoon-hyun changed the title Spark-33152: This PR proposes a new logic to maintain & track constraints which solves the OOM or performance issues in query compilation [SPARK-33152][SQL] This PR proposes a new logic to maintain & track constraints which solves the OOM or performance issues in query compilation Oct 29, 2020
@ahshahid
Copy link
Author

The hive suit which got aborted is
org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite

Through IDE , below exception is seen.
Looking into it
org.scalatest.exceptions.TestFailedException: spark-submit returned with exit code 1.
Command line: './bin/spark-submit' '--name' 'prepare testing tables' '--master' 'local[2]' '--conf' 'spark.ui.enabled=false' '--conf' 'spark.master.rest.enabled=false' '--conf' 'spark.sql.hive.metastore.version=1.2.1' '--conf' 'spark.sql.hive.metastore.jars=maven' '--conf' 'spark.sql.warehouse.dir=/private/var/folders/vl/gk1hr_957qs1jmbwhmvkq2d80000gp/T/warehouse-33cd9b12-1a96-40cd-afad-4b57880723a6' '--conf' 'spark.sql.test.version.index=0' '--driver-java-options' '-Dderby.system.home=/private/var/folders/vl/gk1hr_957qs1jmbwhmvkq2d80000gp/T/warehouse-33cd9b12-1a96-40cd-afad-4b57880723a6' '/private/var/folders/vl/gk1hr_957qs1jmbwhmvkq2d80000gp/T/test8221341981962562358.py'

2020-10-29 18:20:28.01 - stderr> 20/10/29 18:20:28 WARN Utils: Your hostname, ALTERVICTIMFEWER.workdayinternal.com resolves to a loopback address: 127.0.0.1; using 192.168.0.10 instead (on interface en0)
2020-10-29 18:20:28.011 - stderr> 20/10/29 18:20:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2020-10-29 18:20:29.793 - stderr> 20/10/29 18:20:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-10-29 18:20:31.692 - stdout> Traceback (most recent call last):
2020-10-29 18:20:31.693 - stdout> File "/private/var/folders/vl/gk1hr_957qs1jmbwhmvkq2d80000gp/T/test8221341981962562358.py", line 2, in
2020-10-29 18:20:31.693 - stdout> from pyspark.sql import SparkSession
2020-10-29 18:20:31.693 - stdout> File "", line 991, in _find_and_load
2020-10-29 18:20:31.694 - stdout> File "", line 975, in _find_and_load_unlocked
2020-10-29 18:20:31.694 - stdout> File "", line 655, in _load_unlocked
2020-10-29 18:20:31.695 - stdout> File "", line 618, in _load_backward_compatible
2020-10-29 18:20:31.695 - stdout> File "", line 259, in load_module
2020-10-29 18:20:31.696 - stdout> File "/private/tmp/test-spark/spark-2.4.7/python/lib/pyspark.zip/pyspark/init.py", line 51, in
2020-10-29 18:20:31.697 - stdout> File "", line 991, in _find_and_load
2020-10-29 18:20:31.697 - stdout> File "", line 975, in _find_and_load_unlocked
2020-10-29 18:20:31.698 - stdout> File "", line 655, in _load_unlocked
2020-10-29 18:20:31.699 - stdout> File "", line 618, in _load_backward_compatible
2020-10-29 18:20:31.699 - stdout> File "", line 259, in load_module
2020-10-29 18:20:31.7 - stdout> File "/private/tmp/test-spark/spark-2.4.7/python/lib/pyspark.zip/pyspark/context.py", line 31, in
2020-10-29 18:20:31.7 - stdout> File "", line 991, in _find_and_load
2020-10-29 18:20:31.701 - stdout> File "", line 975, in _find_and_load_unlocked
2020-10-29 18:20:31.701 - stdout> File "", line 655, in _load_unlocked
2020-10-29 18:20:31.702 - stdout> File "", line 618, in _load_backward_compatible
2020-10-29 18:20:31.702 - stdout> File "", line 259, in load_module
2020-10-29 18:20:31.703 - stdout> File "/private/tmp/test-spark/spark-2.4.7/python/lib/pyspark.zip/pyspark/accumulators.py", line 97, in
2020-10-29 18:20:31.704 - stdout> File "", line 991, in _find_and_load
2020-10-29 18:20:31.704 - stdout> File "", line 975, in _find_and_load_unlocked
2020-10-29 18:20:31.705 - stdout> File "", line 655, in _load_unlocked
2020-10-29 18:20:31.705 - stdout> File "", line 618, in _load_backward_compatible
2020-10-29 18:20:31.706 - stdout> File "", line 259, in load_module
2020-10-29 18:20:31.707 - stdout> File "/private/tmp/test-spark/spark-2.4.7/python/lib/pyspark.zip/pyspark/serializers.py", line 72, in
2020-10-29 18:20:31.708 - stdout> File "", line 991, in _find_and_load
2020-10-29 18:20:31.709 - stdout> File "", line 975, in _find_and_load_unlocked
2020-10-29 18:20:31.709 - stdout> File "", line 655, in _load_unlocked
2020-10-29 18:20:31.71 - stdout> File "", line 618, in _load_backward_compatible
2020-10-29 18:20:31.71 - stdout> File "", line 259, in load_module
2020-10-29 18:20:31.711 - stdout> File "/private/tmp/test-spark/spark-2.4.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 145, in
2020-10-29 18:20:31.712 - stdout> File "/private/tmp/test-spark/spark-2.4.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 126, in _make_cell_set_template_code
2020-10-29 18:20:31.712 - stdout> TypeError: an integer is required (got type bytes)
2020-10-29 18:20:31.737 - stderr> log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
2020-10-29 18:20:31.738 - stderr> log4j:WARN Please initialize the log4j system properly.
2020-10-29 18:20:31.

@ahshahid
Copy link
Author

I took the latest master & merged it with this PR and ran the /dev/run-tests target.
I now have a clean run:

=== Metrics of Whole-stage Codegen ===
Total code generation time: 103.481957 seconds
Total compile time: 194.250971571 seconds

19:44:29.389 WARN org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.hive.thriftserver.ThriftServerQueryTestSuite, thread names: rpc-boss-3-1, derby.rawStoreDaemon, com.google.common.base.internal.Finalizer, Timer-3, BoneCP-keep-alive-scheduler, shuffle-boss-6-1, BoneCP-pool-watch-thread =====

[info] ScalaTest
[info] Run completed in 11 hours, 44 minutes, 19 seconds.
[info] Total number of tests run: 466
[info] Suites: completed 19, aborted 0
[info] Tests: succeeded 466, failed 0, canceled 0, ignored 17, pending 0
[info] All tests passed.
[info] Passed: Total 466, Failed 0, Errors 0, Passed 466, Ignored 17

^[[24~^[[24~

:/GC overhead

@srowen
Copy link
Member

srowen commented Nov 3, 2020

This still doesn't look right. I'd squash your branch changes and rebase on master.

@ahshahid
Copy link
Author

ahshahid commented Nov 3, 2020

@srowen Yes, while running the merge with changes locally I accidentally pushed the changes. I will fix it.
I have made some more changes which alleviates the need to generate EqualNullSafe constraints for projection aliases. They were in any case not needed with new logic. I was generating them only to ensure existing spark tests asserting on constraints pass. I have made some minimal changes in spark & fixed a test in existing InferFiltersFromConstraintsSuite where it was asserting on a not so optimal filter.

@ahshahid
Copy link
Author

ahshahid commented Nov 3, 2020

@srowen Fixed the merge issues. I will rebase it into a single commit.

@ahshahid
Copy link
Author

ahshahid commented Nov 4, 2020

i will fix the scala 2.13 issue

@ahshahid
Copy link
Author

ahshahid commented Nov 7, 2020

@srowen Hi. This PR is now stabilized. I have also done away with the need of creating EqualNullSafe constraints for the aliases as with the new logic they are not needed. I will add a couple of tests more which show that these changes produce a more optimized plan using filter push down of constraints like literal "false" in outer join conditions as compared to the current code.

@ahshahid ahshahid changed the title [SPARK-33152][SQL] This PR proposes a new logic to maintain & track constraints which solves the OOM or performance issues in query compilation [SPARK-33152][SQL] This PR proposes a new logic to maintain & track constraints which solves the OOM or performance issues in query compilation due to Constraint Propagation Rule Nov 24, 2020
Asif Shahid added 4 commits December 14, 2020 09:51
No longer adding an EqualNullSafe constraint for the aliases. Previously I was adding 1 EqualNullSafe(a, a1) for a projection set of a, a1,a2.... This was being done so that existing spark tests which expected that constraint pass without change. As such the new logic does not require this EqualNullSafe constraint for alias. So removed that & also modified the existing spark test which were expecting these constraints. This change in no way impacts the optimized query plan. This also completely eliminates the need of expand function for testing purposes also.  The subtle change is that if an alias is of the form  2, a, a1... i.e  a literal is aliased, then I convert it into EqualTo which allows IsNotNull constraint to be generated. One of the existing spark test had to be modified because as per the old logic it expected IsNotNull(a) as well as EqualsNullSafe(2, a).  The thing is that if Isnotnull(a) is generated, then EqualsNullSafe(2, a) should not be there. It should ideally be only EqualTo(2,a), which is now what the output will be. SPARK-33152
…e not getting conflated in a Set due to difference in their qualifier
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 25, 2021
@github-actions github-actions bot closed this Mar 26, 2021
@ahshahid
Copy link
Author

attempting to reopen the pr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants