Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Feb 10, 2020

What changes were proposed in this pull request?

In our production environment, there are many queries similar to this pattern:

CREATE TABLE spark_30768_1(user_id DECIMAL(18,0), src_cre_dt DATE /*other columns*/);
CREATE TABLE spark_30768_2(user_id DECIMAL(18,0), start_dt DATE, end_dt DATE /*other columns*/);

SELECT spark_30768_1.src_cre_dt /*other columns*/                                                
FROM                                                                            
  spark_30768_1                                                                 
  INNER JOIN spark_30768_2 ON spark_30768_1.user_id = spark_30768_2.user_id     
  AND spark_30768_1.src_cre_dt >= spark_30768_2.start_dt                        
  AND spark_30768_1.src_cre_dt < spark_30768_2.end_dt                           
WHERE spark_30768_1.src_cre_dt BETWEEN date '2020-02-01' AND date '2020-02-07'      

In this case, we can infer more constraints to improve query performance. E.g. spark_30768_2.start_dt <= '2020-02-07' inferred from spark_30768_2.start_dt <= spark_30768_1.src_cre_dt <= '2020-02-07' and spark_30768_2.end_dt > '2020-02-01' inferred from spark_30768_2.end_dt > spark_30768_1.src_cre_dt >= '2020-02-01'. This PR add support infer these constraints from inequality attributes.

Why are the changes needed?

Improve query performance. Teradata support this optimization:
https://docs.teradata.com/reader/Ws7YT1jvRK2vEr1LpVURug/V~FCwD9BL7gY4ac3WwHInw?section=xcg1472241575102__application_of_transitive_closure_section

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and benchmark test.

Benchmark code and benchmark result:

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SaveMode.Overwrite

val numRows = 1024 * 1024
spark.range(numRows).selectExpr("id as c1", "id as c2").write.saveAsTable("t1")
spark.range(numRows).selectExpr("id as c1", "id as c2").write.saveAsTable("t2")

val title = "Constraints inferred from inequality constraint"
val benchmark = new Benchmark(title, numRows, minNumIters = 5)
benchmark.addCase(s"t1.c1 > ${numRows - 1000}") { _ =>
  spark.sql(s"select count(*) from t1 join t2 on (t1.c1 > t2.c1 and t2.c1 > ${numRows - 1000})").write.format("noop").mode(Overwrite).save()
}

benchmark.addCase("t1.c1 < 1000") { _ =>
  spark.sql("select count(*) from t1 join t2 on (t1.c1 < t2.c1 and t2.c1 < 1000)").write.format("noop").mode(Overwrite).save()
}
benchmark.run()

Before this PR:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Constraints inferred from inequality constraint:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
t1.c1 > 1047576                                   30519          31171         567          0.0       29105.2       1.0X
t1.c1 < 1000                                      50525          50906         331          0.0       48184.7       0.6X

After this PR:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Constraints inferred from inequality constraint:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
t1.c1 > 1047576                                     179            191          15          5.9         170.5       1.0X
t1.c1 < 1000                                        175            195          27          6.0         166.7       1.0X

Also test this feature in our production environment, it can significantly improve the query performance of at least 6 SQLs (a total of 200 SQLs):

SQL ID Before this PR(seconds) After this PR(seconds)
323 108 73
368 56 26
372 72 28
373 60 26
374 113 44
375 82 55

For e.g. SQL 372. It prevents (18,413,424,580 - 162,205,133 = 18,251,219,447) rows from participating in shuffle:
Before this PR:
before

After this PR:
after

@SparkQA
Copy link

SparkQA commented Feb 10, 2020

Test build #118120 has finished for PR 27518 at commit b59a810.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Feb 10, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Feb 10, 2020

Test build #118131 has finished for PR 27518 at commit b59a810.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Feb 10, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Feb 10, 2020

Test build #118163 has finished for PR 27518 at commit b59a810.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor

Is the title correct? Isn't what you do here is constant propagation among constraints?
Obviously it helps with inequalities of attributes, but also helps where you have complex equalities like select t1.* from SPARK_30768_1 t1 join SPARK_30768_2 t2 on f(t1.c1) = t2.c1 where t1.c1 = 3 (where f is a deterministic function).

BTW, I have a PR open to enhance ConstantPropagation: #24553 where among other enhancements I try to extend propagation of attribute => constant mapping to propagation of deterministic expression => constant mapping. Maybe you could do expression => constant propagation here as well.

@wangyum
Copy link
Member Author

wangyum commented Feb 12, 2020

@peter-toth I'd like to handle another case in this PR:

spark.sql("CREATE TABLE `dw_user_state_history` (`id` DECIMAL(18,0), `change_time` date, `to_state` DECIMAL(9,0)) USING parquet")

spark.sql("CREATE TABLE `dw_user_cntry_hist` (`user_id` DECIMAL(18,0), `start_dt` DATE, `end_dt` DATE) USING parquet")

spark.sql(
  """
    |SELECT
    |	 count(*)
    |FROM
    |	 dw_user_state_history ush
    |	 INNER JOIN dw_user_cntry_hist uch ON (uch.user_id = ush.id AND CAST(ush.change_time AS date) >= uch.start_dt AND CAST(ush.change_time AS date) < uch.end_dt)
    |WHERE
    |	 change_time between '2019-07-01 00:00:00' AND '2019-07-02 00:00:00'
    |""".stripMargin).explain()

The exepected physical plan:

== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition, true, [id=#139]
   +- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(2) Project
         +- *(2) BroadcastHashJoin [id#219], [user_id#222], Inner, BuildRight, ((change_time#220 >= start_dt#223) AND (change_time#220 < end_dt#224))
            :- *(2) Project [id#219, change_time#220]
            :  +- *(2) Filter (((isnotnull(change_time#220) AND (change_time#220 >= 18078)) AND (change_time#220 <= 18079)) AND isnotnull(id#219))
            :     +- *(2) ColumnarToRow
            :        +- FileScan parquet default.dw_user_state_history[id#219,change_time#220] Batched: true, DataFilters: [isnotnull(change_time#220), (change_time#220 >= 18078), (change_time#220 <= 18079), isnotnull(id..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(change_time), GreaterThanOrEqual(change_time,2019-07-01), LessThanOrEqual(change_time,..., ReadSchema: struct<id:decimal(18,0),change_time:date>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(18,0), true])), [id=#133]
               +- *(1) Project [user_id#222, start_dt#223, end_dt#224]
                  +- *(1) Filter (((((isnotnull(end_dt#224) AND (start_dt#223 <= 18079)) AND isnotnull(user_id#222)) AND (start_dt#223 < end_dt#224)) AND isnotnull(start_dt#223)) AND (end_dt#224 > 18078))
                     +- *(1) ColumnarToRow
                        +- FileScan parquet default.dw_user_cntry_hist[user_id#222,start_dt#223,end_dt#224] Batched: true, DataFilters: [isnotnull(end_dt#224), (start_dt#223 <= 18079), isnotnull(user_id#222), (start_dt#223 < end_dt#2..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(end_dt), LessThanOrEqual(start_dt,2019-07-02), IsNotNull(user_id), IsNotNull(start_dt)..., ReadSchema: struct<user_id:decimal(18,0),start_dt:date,end_dt:date>

@wangyum wangyum changed the title [SPARK-30768][SQL] Constraints should be inferred from inequality attributes [WIP][SPARK-30768][SQL] Constraints should be inferred from inequality attributes Feb 12, 2020
@peter-toth
Copy link
Contributor

peter-toth commented Feb 12, 2020

@wangyum, I'm a but confused now. As far as I see based on your changes, in this PR you substitute attribute = constant form of constraints into other constraints to infer new constraints.
So this PR will help with your example query as from select t1.* from SPARK_30768_1 t1 join SPARK_30768_2 t2 on (t1.c1 > t2.c1) where t1.c1 = 3 the new constraint 3 > t2.c1 is inferred.
It will also help with those cases I mentioned:
select t1.* from SPARK_30768_1 t1 join SPARK_30768_2 t2 on abs(t1.c1) = t2.c1 where t1.c1 = 3 as abs(3) = t2.c1 will be inferred.
It will also help here: #27252 (comment) as after both this PR and #27252 are merged, it will solve the issue in the comment of @cloud-fan:

If we have cast(a, dt) = b and a = 1, seems we can also infer cast(1, dt) = b

But, I don't see how it will help with the example query here: #27518 (comment)

@wangyum
Copy link
Member Author

wangyum commented Feb 12, 2020

@peter-toth 999126c helps the example query.

@SparkQA
Copy link

SparkQA commented Feb 12, 2020

Test build #118309 has finished for PR 27518 at commit 999126c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor

peter-toth commented Feb 13, 2020

@peter-toth 999126c helps the example query.

Thanks @wangyum. I see now that you basically reverted your first commit.
If you don't mind I would open a PR that does some kind of constant propagation among constraints. Probably a bit similar to the first commit in this PR.

@wangyum
Copy link
Member Author

wangyum commented Feb 13, 2020

@peter-toth Please go ahead. I reverted the first commit because this test will fail:

test("Constraints shouldn't be inferred from cast equality constraint(filter lower data type)") {
  val testRelation1 = LocalRelation('a.int)
  val testRelation2 = LocalRelation('b.long)
  val originalLeft = testRelation1.where('a === 1).subquery('left)
  val originalRight = testRelation2.subquery('right)

  val left = testRelation1.where(IsNotNull('a) && 'a === 1).subquery('left)
  val right = testRelation2.where(IsNotNull('b)).subquery('right)

  Seq(Some("left.a".attr.cast(LongType) === "right.b".attr),
    Some("right.b".attr === "left.a".attr.cast(LongType))).foreach { condition =>
    testConstraintsAfterJoin(originalLeft, originalRight, left, right, Inner, condition)
  }
}
== FAIL: Plans do not match ===
 'Join Inner, (b#0L = cast(a#0 as bigint))                                                  'Join Inner, (b#0L = cast(a#0 as bigint))
!:- Filter (((1 = a#0) AND isnotnull(a#0)) AND (cast(a#0 as bigint) = cast(1 as bigint)))   :- Filter ((1 = a#0) AND isnotnull(a#0))
 :  +- LocalRelation <empty>, [a#0]                                                         :  +- LocalRelation <empty>, [a#0]
!+- Filter (isnotnull(b#0L) AND (b#0L = cast(1 as bigint)))                                 +- Filter isnotnull(b#0L)
    +- LocalRelation <empty>, [b#0L]                                                           +- LocalRelation <empty>, [b#0L]

# Conflicts:
#	sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@SparkQA
Copy link

SparkQA commented Feb 14, 2020

Test build #118434 has finished for PR 27518 at commit d4eb2d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 16, 2020

Test build #118505 has finished for PR 27518 at commit df679e6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 17, 2020

Test build #118526 has finished for PR 27518 at commit 3c9968a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WorkerDecommission(
  • case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage

@wangyum
Copy link
Member Author

wangyum commented Feb 17, 2020

Metrics of Analyzer/Optimizer Rules for TPCDSQuerySuite
Before this PR:

02:32:26.475 WARN org.apache.spark.sql.TPCDSQuerySuite: 
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 224786
Total time: 45.803692546 seconds

Rule                                                                                               Effective Time / Total Time                     Effective Runs / Total Runs                    

org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries                               6898974687 / 8840587642                         47 / 772                                       
org.apache.spark.sql.catalyst.optimizer.ColumnPruning                                              671684614 / 2740648387                          327 / 2364                                     
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery                                    1637851386 / 1780942834                         51 / 2159                                      
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions                          624455301 / 1744459696                          49 / 2159                                      
org.apache.spark.sql.catalyst.analysis.DecimalPrecision                                            1136163267 / 1355664883                         361 / 2159                                     
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                                  958843179 / 1132184862                          813 / 2159                                     
org.apache.spark.sql.catalyst.optimizer.PruneFilters                                               21041535 / 875480663                            5 / 1978                                       
org.apache.spark.sql.catalyst.optimizer.BooleanSimplification                                      7949392 / 757989983                             4 / 1592                                       
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts                              301831588 / 657645402                           78 / 2159                                      
org.apache.spark.sql.catalyst.optimizer.PushDownPredicates                                         374351767 / 641828516                           758 / 1979                                     
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences                           14541719 / 632536654                            10 / 2159                                      
org.apache.spark.sql.catalyst.optimizer.NullPropagation                                            39937489 / 587603524                            42 / 1592                                      
org.apache.spark.sql.catalyst.optimizer.ReorderJoin                                                244728546 / 572349474                           177 / 1592                                     
org.apache.spark.sql.catalyst.optimizer.ConstantFolding                                            158687531 / 564225840                           194 / 1592                                     
org.apache.spark.sql.catalyst.optimizer.ReorderAssociativeOperator                                 0 / 547165121                                   0 / 1592                                       
org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals                                       0 / 532921745                                   0 / 1592                                       
org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison                                   0 / 527530671                                   0 / 1592                                       
org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators                                        48706059 / 526075410                            116 / 2364                                     
org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps                                    0 / 511807420                                   0 / 1592                                       
org.apache.spark.sql.catalyst.optimizer.SimplifyCasts                                              45722796 / 510124455                            83 / 1592                                      
org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions                          0 / 493219669                                   0 / 1592                                       
org.apache.spark.sql.catalyst.optimizer.OptimizeIn                                                 12300150 / 491448204                            27 / 1592                                      
org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability                                  20208595 / 484413473                            12 / 674                                       
org.apache.spark.sql.catalyst.optimizer.RemoveDispensableExpressions                               0 / 480755936                                   0 / 1592                                       
org.apache.spark.sql.catalyst.optimizer.LikeSimplification                                         799298 / 463609379                              1 / 1592                                       
org.apache.spark.sql.catalyst.optimizer.CollapseProject                                            92682031 / 457556748                            215 / 1978                                     
org.apache.spark.sql.catalyst.optimizer.ReplaceNullWithFalseInPredicate                            0 / 456658852                                   0 / 1592                                       
org.apache.spark.sql.catalyst.analysis.TypeCoercion$FunctionArgumentConversion                     227217097 / 442233854                           56 / 2159                                      
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints                                406969136 / 435459925                           278 / 386    

After this PR:

02:28:49.937 WARN org.apache.spark.sql.TPCDSQuerySuite: 
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 224786
Total time: 47.011460872 seconds

Rule                                                                                               Effective Time / Total Time                     Effective Runs / Total Runs                    

org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries                               7073196527 / 8950854926                         47 / 772                                       
org.apache.spark.sql.catalyst.optimizer.ColumnPruning                                              724531405 / 2931435267                          327 / 2364                                     
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery                                    1789988207 / 1942196179                         51 / 2159                                      
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions                          717122415 / 1838219499                          49 / 2159                                      
org.apache.spark.sql.catalyst.analysis.DecimalPrecision                                            1276704718 / 1524939842                         361 / 2159                                     
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                                  1003010234 / 1192920997                         813 / 2159                                     
org.apache.spark.sql.catalyst.optimizer.PruneFilters                                               21471500 / 952660373                            5 / 1978                                       
org.apache.spark.sql.catalyst.optimizer.BooleanSimplification                                      9761473 / 773205469                             4 / 1592                                       
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts                              329430374 / 734067405                           78 / 2159                                      
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences                           12112700 / 709719203                            10 / 2159                                      
org.apache.spark.sql.catalyst.optimizer.PushDownPredicates                                         399755753 / 663934266                           758 / 1979                                     
org.apache.spark.sql.catalyst.optimizer.ReorderJoin                                                266383835 / 593070778                           177 / 1592                                     
org.apache.spark.sql.catalyst.optimizer.NullPropagation                                            35781183 / 578417823                            42 / 1592                                      
org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators                                        48387593 / 540807035                            116 / 2364                                     
org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison                                   0 / 535708634                                   0 / 1592                                       
org.apache.spark.sql.catalyst.optimizer.ConstantFolding                                            114115625 / 533552246                           194 / 1592                                     
org.apache.spark.sql.catalyst.optimizer.OptimizeIn                                                 12632788 / 529487507                            27 / 1592                                      
org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions                          0 / 513745956                                   0 / 1592                                       
org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals                                       0 / 508586160                                   0 / 1592                                       
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints                                459543325 / 490828096                           278 / 386

@SparkQA
Copy link

SparkQA commented Feb 17, 2020

Test build #118570 has finished for PR 27518 at commit f9a90aa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Feb 25, 2020

Test build #118895 has finished for PR 27518 at commit f9a90aa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 26, 2020

Test build #118980 has finished for PR 27518 at commit bfa6039.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum changed the title [WIP][SPARK-30768][SQL] Constraints should be inferred from inequality attributes [SPARK-30768][SQL] Constraints inferred from inequality attributes Mar 4, 2020
@SparkQA
Copy link

SparkQA commented Mar 7, 2020

Test build #119516 has finished for PR 27518 at commit 248e3cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 20, 2020

Test build #120106 has finished for PR 27518 at commit 248e3cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123638 has finished for PR 27518 at commit af55a08.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 12, 2020

Test build #123919 has finished for PR 27518 at commit 5c76b9d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

cc @maryannxue and @cloud-fan FYI

Copy link
Contributor

@tanelk tanelk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both this and #29650 introduce an while loop to infer constraints from a chain of existing constraints. If either of there gets accepted, the other must be changed to unify these loops.

case _: GreaterThanOrEqual => true
case _: LessThan => true
case _: LessThanOrEqual => true
case _: EqualTo => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EqualTo should not be needed here, as the inferEqualityConstraints should cover all cases including it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inferEqualityConstraints can not handle all cases, such as constraint with cast.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example: cast(a as double) > cast(b as double) and cast(b as double) = 1

Comment on lines +116 to +121
val lessThans = binaryComparisons.map {
case EqualTo(l, r) if l.foldable => EqualTo(r, l)
case GreaterThan(l, r) => LessThan(r, l)
case GreaterThanOrEqual(l, r) => LessThanOrEqual(r, l)
case other => other
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this duplicate the greaterThans block?
Here you have a < b < c and in the other block you have c > b > a

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. for example:
a > b and 5 > a. we can not infer anything. but we can infer that b < 5 after rewriting a > b and 5 > a as b < a and a < 5.

Copy link
Contributor

@tanelk tanelk Sep 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because of the foldable check? Without it, it should be inferable.

var inferredConstraints = Set.empty[Expression]
greaterThans.foreach {
case op @ BinaryComparison(source: Attribute, destination: Expression)
if destination.foldable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the foldability is not needed here. The new constraints do not have to only involve constants, but also any attribute.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid generating too many constraints. For example: a > b > c > 1. The expected inferred constraints are: a > 1 and b > 1. a > c is useless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a and c are in tihe same side of a join, then it can be pushed down.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to push down a > c if both a and c are not foldable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I used a wrong word. I meant pushed through the join into one of the sides.

Comment on lines +64 to +67
do {
lastInequalityInferred = inferInequalityConstraints(constraints ++ inferred)
inferred ++= lastInequalityInferred
} while (lastInequalityInferred.nonEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you hit a infinite loop with non deterministic filters? As they are never semantically equal to any other expression (including themselves). I hit that problem in #29650, where I was also working on constraint inference , but from EqualNullSafe.

@wangyum
Copy link
Member Author

wangyum commented Dec 14, 2020

Thank you all. Merged it to our internal Spark version.

@wangyum wangyum closed this Dec 14, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants