Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Jun 21, 2018

What changes were proposed in this pull request?

The original pr is: #18424

Add a new optimizer rule to convert an IN predicate to an equivalent Parquet filter and add spark.sql.parquet.pushdown.inFilterThreshold to control limit thresholds. Different data types have different limit thresholds, this is a copy of data for reference:

Type limit threshold
string 370
int 210
long 285
double 270
float 220
decimal Won't provide better performance before SPARK-24549

How was this patch tested?

unit tests and manual tests

case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not)

case sources.In(name, values) if canMakeFilterOn(name) && values.length < 20 =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The threshold is 20. Too many values may be OOM, for example:

spark.range(10000000).coalesce(1).write.option("parquet.block.size", 1048576).parquet("/tmp/spark/parquet/SPARK-17091")
val df = spark.read.parquet("/tmp/spark/parquet/SPARK-17091/")
df.where(s"id in(${Range(1, 10000).mkString(",")})").count
Exception in thread "SIGINT handler" 18/06/21 13:00:54 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 8, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3664)
        at java.lang.String.<init>(String.java:207)
        at java.lang.StringBuilder.toString(StringBuilder.java:407)
        at org.apache.parquet.filter2.predicate.Operators$BinaryLogicalFilterPredicate.<init>(Operators.java:263)
        at org.apache.parquet.filter2.predicate.Operators$Or.<init>(Operators.java:316)
        at org.apache.parquet.filter2.predicate.FilterApi.or(FilterApi.java:261)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters$$anonfun$createFilter$15.apply(ParquetFilters.scala:276)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters$$anonfun$createFilter$15.apply(ParquetFilters.scala:276)
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about making this threshold configurable?

Copy link
Member

@gatorsmile gatorsmile Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it configurable. Use spark.sql.parquet.pushdown.inFilterThreshold. By default, it should be around 10. Please also check the perf.

cc @jiangxb1987

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the push-down performance is better when threshold is less than 300:
spark-17091-perf

The code:

    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
      import testImplicits._
      withTempPath { path =>
        val total = 10000000
        (0 to total).toDF().coalesce(1)
          .write.option("parquet.block.size", 512)
          .parquet(path.getAbsolutePath)
        val df = spark.read.parquet(path.getAbsolutePath)
        // scalastyle:off println
        var lastSize = -1
        var i = 16000
        while (i < total) {
          val filter = Range(0, total).filter(_ % i == 0)
          i += 100
          if (lastSize != filter.size) {
            if (lastSize == -1) println(s"start size: ${filter.size}")
            lastSize = filter.size
            sql("set spark.sql.parquet.pushdown.inFilterThreshold=1000000")
            val begin1 = System.currentTimeMillis()
            df.where(s"id in(${filter.mkString(",")})").count()
            val end1 = System.currentTimeMillis()
            val time1 = end1 - begin1

            sql("set spark.sql.parquet.pushdown.inFilterThreshold=10")
            val begin2 = System.currentTimeMillis()
            df.where(s"id in(${filter.mkString(",")})").count()
            val end2 = System.currentTimeMillis()
            val time2 = end2 - begin2
            if (time1 <= time2) println(s"Max threshold: $lastSize")
          }
        }
      }
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this benchmark, this shall be useful, while I still have some questions:

  1. For small table (with columns <= 1000000), is the performance of InFilters still better than InSet?
  2. Can you also forge different filters? Currently your filters are distributed evenly, which don't always happen on real workload. We shall at least benchmark with different filter ratio (#rows filtered / #total rows)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It mainly depends on how many row groups can skip. for small table (assuming only one row group). There is no obvious difference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have prepared a test case that you can verify it:

  test("Benchmark") {
    def benchmark(func: () => Unit): Long = {
      val start = System.currentTimeMillis()
      func()
      val end = System.currentTimeMillis()
      end - start
    }
    // scalastyle:off
    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
      withTempPath { path =>
        Seq(1000000, 10000000).foreach { count =>
          Seq(1048576, 10485760, 104857600).foreach { blockSize =>
            spark.range(count).toDF().selectExpr("id", "cast(id as string) as d1",
              "cast(id as double) as d2", "cast(id as float) as d3", "cast(id as int) as d4",
              "cast(id as decimal(38)) as d5")
              .coalesce(1).write.mode("overwrite")
              .option("parquet.block.size", blockSize).parquet(path.getAbsolutePath)
            val df = spark.read.parquet(path.getAbsolutePath)
            println(s"path: ${path.getAbsolutePath}")
            Seq(1000, 100, 10, 1).foreach { ratio =>
              println(s"##########[ count: $count, blockSize: $blockSize, ratio: $ratio ]#########")
              var i = 1
              while (i < 300) {
                val filter = Range(0, i).map(r => scala.util.Random.nextInt(count / ratio))
                i += 4

                sql("set spark.sql.parquet.pushdown.inFilterThreshold=1")
                val vanillaTime = benchmark(() => df.where(s"id in(${filter.mkString(",")})").count())
                sql("set spark.sql.parquet.pushdown.inFilterThreshold=1000")
                val pushDownTime = benchmark(() => df.where(s"id in(${filter.mkString(",")})").count())

                if (pushDownTime > vanillaTime) {
                  println(s"vanilla is better, threshold: ${filter.size}, $pushDownTime, $vanillaTime")
                } else {
                  println(s"push down is better, threshold: ${filter.size}, $pushDownTime, $vanillaTime")
                }
              }
            }
          }
        }
      }
    }
  }

@SparkQA
Copy link

SparkQA commented Jun 21, 2018

Test build #92156 has finished for PR 21603 at commit 4f96881.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jun 21, 2018

Jenkins, retest this please.

case sources.In(name, values) if canMakeFilterOn(name) && values.length < 20 =>
values.flatMap { v =>
makeEq.lift(nameToType(name)).map(_(name, v))
}.reduceLeftOption(FilterApi.or)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about null handling? Do we get the same result as before? Anyway, can we add a test for it?

@SparkQA
Copy link

SparkQA commented Jun 21, 2018

Test build #92161 has finished for PR 21603 at commit 4f96881.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

cc @gatorsmile, @viirya, @jiangxb1987 and @a10y too

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92198 has finished for PR 21603 at commit b9b3160.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jun 22, 2018

retest this please.


import testImplicits._
withTempPath { path =>
(0 to 1024).toDF("a").coalesce(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a null here?

"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.intConf
.checkValue(threshold => threshold > 0 && threshold <= 300,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we enforcing that the threshold <= 300 despite this might be a reasonable value, I think that the user should be able to set the value he/she wants. If the value the user sets is a bad one, its his/her problem to set a proper value. We can probably mention that from the experiments 300 seems to be a limit threshold in the description eventually, in order to give an hint to the user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think to set a maximum value 300 is a bit weird.

* Some utility function to convert Spark data source filters to Parquet filters.
*/
private[parquet] class ParquetFilters(pushDownDate: Boolean) {
private[parquet] class ParquetFilters(pushDownDate: Boolean, inFilterThreshold: Int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not a big deal, but since now SQLConf are available also on executor side, can we get its value inside the class rather than outside? If we add more configurations this constructor might explode...

val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
.doc("The maximum number of values to filter push-down optimization for IN predicate. " +
"Large threshold will not provide much better performance. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Large threshold won't necessarily provide much better performance.

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92203 has finished for PR 21603 at commit b9b3160.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92210 has finished for PR 21603 at commit d57f44c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
.doc("The maximum number of values to filter push-down optimization for IN predicate. " +
"Large threshold won't necessarily provide much better performance. " +
"The experiment argued that 300 is the limit threshold. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also depends on the data types, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

Type limit threshold
string 370
int 210
long 285
double 270
float 220
decimal Will not provide better performance

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting finding. Thanks for the update. Maybe you do not need to mention this limit threshold in the doc?

How about post your finding in the PR description?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment #21603 (comment)

@SparkQA
Copy link

SparkQA commented Jun 25, 2018

Test build #92280 has finished for PR 21603 at commit 8218596.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jun 26, 2018

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 26, 2018

Test build #92340 has finished for PR 21603 at commit 8218596.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jun 30, 2018

Benchmark result:

##########################[ Pushdown benchmark for InSet -> InFilters ]##########################
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (threshold: 10, values count: 5, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7649 / 7678          2.1         486.3       1.0X
Parquet Vectorized (Pushdown)                  316 /  325         49.8          20.1      24.2X
Native ORC Vectorized                         6787 / 7353          2.3         431.5       1.1X
Native ORC Vectorized (Pushdown)              1010 / 1020         15.6          64.2       7.6X

InSet -> InFilters (threshold: 10, values count: 5, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7537 / 7944          2.1         479.2       1.0X
Parquet Vectorized (Pushdown)                  297 /  306         52.9          18.9      25.3X
Native ORC Vectorized                         6768 / 6779          2.3         430.3       1.1X
Native ORC Vectorized (Pushdown)               998 / 1017         15.8          63.4       7.6X

InSet -> InFilters (threshold: 10, values count: 5, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7500 / 7592          2.1         476.8       1.0X
Parquet Vectorized (Pushdown)                  299 /  306         52.5          19.0      25.1X
Native ORC Vectorized                         6758 / 6797          2.3         429.7       1.1X
Native ORC Vectorized (Pushdown)               982 /  993         16.0          62.4       7.6X

InSet -> InFilters (threshold: 10, values count: 10, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7566 / 8153          2.1         481.1       1.0X
Parquet Vectorized (Pushdown)                  319 /  328         49.3          20.3      23.7X
Native ORC Vectorized                         6761 / 6812          2.3         429.8       1.1X
Native ORC Vectorized (Pushdown)               995 / 1013         15.8          63.3       7.6X

InSet -> InFilters (threshold: 10, values count: 10, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7512 / 7581          2.1         477.6       1.0X
Parquet Vectorized (Pushdown)                  315 /  322         50.0          20.0      23.9X
Native ORC Vectorized                         6712 / 6774          2.3         426.8       1.1X
Native ORC Vectorized (Pushdown)              1001 / 1032         15.7          63.6       7.5X

InSet -> InFilters (threshold: 10, values count: 10, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7603 / 7689          2.1         483.4       1.0X
Parquet Vectorized (Pushdown)                  308 /  317         51.0          19.6      24.7X
Native ORC Vectorized                         7011 / 7605          2.2         445.7       1.1X
Native ORC Vectorized (Pushdown)              1038 / 1067         15.2          66.0       7.3X

InSet -> InFilters (threshold: 10, values count: 50, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7750 / 7796          2.0         492.7       1.0X
Parquet Vectorized (Pushdown)                 7855 / 7961          2.0         499.4       1.0X
Native ORC Vectorized                         7120 / 7820          2.2         452.7       1.1X
Native ORC Vectorized (Pushdown)              1085 / 1122         14.5          69.0       7.1X

InSet -> InFilters (threshold: 10, values count: 50, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7920 / 8012          2.0         503.5       1.0X
Parquet Vectorized (Pushdown)                 7855 / 8159          2.0         499.4       1.0X
Native ORC Vectorized                         7087 / 7105          2.2         450.6       1.1X
Native ORC Vectorized (Pushdown)              1098 / 1118         14.3          69.8       7.2X

InSet -> InFilters (threshold: 10, values count: 50, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7809 / 7918          2.0         496.5       1.0X
Parquet Vectorized (Pushdown)                 7800 / 7857          2.0         495.9       1.0X
Native ORC Vectorized                         7089 / 7145          2.2         450.7       1.1X
Native ORC Vectorized (Pushdown)              1102 / 1123         14.3          70.1       7.1X

InSet -> InFilters (threshold: 10, values count: 100, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7793 / 7823          2.0         495.5       1.0X
Parquet Vectorized (Pushdown)                 7765 / 7863          2.0         493.7       1.0X
Native ORC Vectorized                         7066 / 7175          2.2         449.2       1.1X
Native ORC Vectorized (Pushdown)              1194 / 1210         13.2          75.9       6.5X

InSet -> InFilters (threshold: 10, values count: 100, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7782 / 7816          2.0         494.8       1.0X
Parquet Vectorized (Pushdown)                 7737 / 7782          2.0         491.9       1.0X
Native ORC Vectorized                         7056 / 7100          2.2         448.6       1.1X
Native ORC Vectorized (Pushdown)              1193 / 1264         13.2          75.9       6.5X

InSet -> InFilters (threshold: 10, values count: 100, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7726 / 8463          2.0         491.2       1.0X
Parquet Vectorized (Pushdown)                 8759 / 9317          1.8         556.9       0.9X
Native ORC Vectorized                         7067 / 7379          2.2         449.3       1.1X
Native ORC Vectorized (Pushdown)              1352 / 1520         11.6          86.0       5.7X

InSet -> InFilters (threshold: 100, values count: 5, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                           8694 / 10591          1.8         552.7       1.0X
Parquet Vectorized (Pushdown)                  288 /  313         54.5          18.3      30.1X
Native ORC Vectorized                         6898 / 7754          2.3         438.6       1.3X
Native ORC Vectorized (Pushdown)              1037 / 1279         15.2          65.9       8.4X

InSet -> InFilters (threshold: 100, values count: 5, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7584 / 8641          2.1         482.2       1.0X
Parquet Vectorized (Pushdown)                  293 /  299         53.7          18.6      25.9X
Native ORC Vectorized                         6849 / 6918          2.3         435.5       1.1X
Native ORC Vectorized (Pushdown)               996 / 1020         15.8          63.3       7.6X

InSet -> InFilters (threshold: 100, values count: 5, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7617 / 7947          2.1         484.3       1.0X
Parquet Vectorized (Pushdown)                  311 /  341         50.5          19.8      24.5X
Native ORC Vectorized                         7468 / 8006          2.1         474.8       1.0X
Native ORC Vectorized (Pushdown)              1095 / 1173         14.4          69.6       7.0X

InSet -> InFilters (threshold: 100, values count: 10, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            8364 / 9682          1.9         531.8       1.0X
Parquet Vectorized (Pushdown)                  325 /  498         48.4          20.7      25.7X
Native ORC Vectorized                         6931 / 7797          2.3         440.7       1.2X
Native ORC Vectorized (Pushdown)              1010 / 1032         15.6          64.2       8.3X

InSet -> InFilters (threshold: 100, values count: 10, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7647 / 8096          2.1         486.2       1.0X
Parquet Vectorized (Pushdown)                  315 /  409         49.9          20.1      24.2X
Native ORC Vectorized                         6839 / 7307          2.3         434.8       1.1X
Native ORC Vectorized (Pushdown)              1033 / 1077         15.2          65.7       7.4X

InSet -> InFilters (threshold: 100, values count: 10, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7653 / 8725          2.1         486.6       1.0X
Parquet Vectorized (Pushdown)                  319 /  367         49.3          20.3      24.0X
Native ORC Vectorized                         7121 / 8047          2.2         452.7       1.1X
Native ORC Vectorized (Pushdown)              1066 / 1133         14.8          67.8       7.2X

InSet -> InFilters (threshold: 100, values count: 50, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7804 / 8926          2.0         496.2       1.0X
Parquet Vectorized (Pushdown)                  476 /  568         33.0          30.3      16.4X
Native ORC Vectorized                         7891 / 8248          2.0         501.7       1.0X
Native ORC Vectorized (Pushdown)              1158 / 1195         13.6          73.6       6.7X

InSet -> InFilters (threshold: 100, values count: 50, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            8576 / 9488          1.8         545.2       1.0X
Parquet Vectorized (Pushdown)                  522 /  593         30.1          33.2      16.4X
Native ORC Vectorized                         7199 / 7692          2.2         457.7       1.2X
Native ORC Vectorized (Pushdown)              1180 / 1280         13.3          75.0       7.3X

InSet -> InFilters (threshold: 100, values count: 50, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                           9142 / 10012          1.7         581.2       1.0X
Parquet Vectorized (Pushdown)                  536 /  620         29.3          34.1      17.0X
Native ORC Vectorized                         7720 / 9655          2.0         490.9       1.2X
Native ORC Vectorized (Pushdown)              1110 / 1212         14.2          70.6       8.2X

InSet -> InFilters (threshold: 100, values count: 100, distribution: 10): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            8478 / 9150          1.9         539.0       1.0X
Parquet Vectorized (Pushdown)                  700 /  900         22.5          44.5      12.1X
Native ORC Vectorized                         7427 / 8069          2.1         472.2       1.1X
Native ORC Vectorized (Pushdown)              1185 / 1633         13.3          75.3       7.2X

InSet -> InFilters (threshold: 100, values count: 100, distribution: 50): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7919 / 9670          2.0         503.5       1.0X
Parquet Vectorized (Pushdown)                  731 /  750         21.5          46.5      10.8X
Native ORC Vectorized                         7205 / 7306          2.2         458.1       1.1X
Native ORC Vectorized (Pushdown)              1191 / 1224         13.2          75.7       6.6X

InSet -> InFilters (threshold: 100, values count: 100, distribution: 90): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized                            7845 / 8146          2.0         498.8       1.0X
Parquet Vectorized (Pushdown)                  761 /  838         20.7          48.4      10.3X
Native ORC Vectorized                         7081 / 7741          2.2         450.2       1.1X
Native ORC Vectorized (Pushdown)              1289 / 1459         12.2          82.0       6.1X

# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
#	sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@SparkQA
Copy link

SparkQA commented Jul 4, 2018

Test build #92605 has finished for PR 21603 at commit fdb79b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

@wangyum, mind resolving the conflicts please? I think it's quite close.

wangyum added 2 commits July 6, 2018 13:11
# Conflicts:
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
Native ORC Vectorized 6840 / 8073 2.3 434.9 1.1X
Native ORC Vectorized (Pushdown) 1041 / 1075 15.1 66.2 7.4X
Parquet Vectorized 7967 / 8175 2.0 506.5 1.0X
Parquet Vectorized (Pushdown) 8155 / 8434 1.9 518.5 1.0X
Copy link
Member

@viirya viirya Jul 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this benchmark numbers are more meaningful if we show IN predicate is pushdown or not (or threshold).

@SparkQA
Copy link

SparkQA commented Jul 6, 2018

Test build #92666 has finished for PR 21603 at commit 5e748f9.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jul 6, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 6, 2018

Test build #92676 has finished for PR 21603 at commit 5e748f9.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jul 6, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 6, 2018

Test build #92684 has finished for PR 21603 at commit 5e748f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jul 7, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 7, 2018

Test build #92702 has finished for PR 21603 at commit 5e748f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.intConf
.checkValue(threshold => threshold > 0, "The threshold must be greater than 0.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shell we allow, for example, -1 here to disable this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case sources.In(name, values) if canMakeFilterOn(name) && shouldConvertInPredicate(name) 
  && values.distinct.length <= pushDownInFilterThreshold =>

How about 0. values.distinct.length will not be less than 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 10)))
}

assertResult(Some(or(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove this test because it looks basically a duplicate of the below.

.coalesce(1).write.option("parquet.block.size", 512)
.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
Seq(true, false).foreach { pushEnabled =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, does this really test the Parquet itself push down fine? I think you should stripSparkFilter and check if they are actually filtered out when spark.sql.parquet.filterPushdown is enabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to:

val actual = stripSparkFilter(df.where(filter)).collect().length
if (pushEnabled && count <= conf.parquetFilterPushDownInFilterThreshold) {
  assert(actual > 1 && actual < data.length)
} else {
  assert(actual === data.length)
}

@HyukjinKwon
Copy link
Member

LGTM otherwise.

def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".")

// All DataTypes that support `makeEq` can provide better performance.
def shouldConvertInPredicate(name: String): Boolean = nameToType(name) match {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon How about remove this?
Timestamp type and Decimal type will be support soon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us keep it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on which PR will be merged first. The corresponding PRs should update this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to update the benchmark suite.

// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us create val parquetFilters = new ParquetFilters(pushDownDate, pushDownStringStartWith, pushDownInFilterThreshold )

@SparkQA
Copy link

SparkQA commented Jul 14, 2018

Test build #92995 has finished for PR 21603 at commit c386e02.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

assert(df.where(filter).count() === count)
val actual = stripSparkFilter(df.where(filter)).collect().length
if (pushEnabled && count <= conf.parquetFilterPushDownInFilterThreshold) {
assert(actual > 1 && actual < data.length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah okay this tests block level filtering. lgtm

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@HyukjinKwon
Copy link
Member

Merged to master

@asfgit asfgit closed this in e1de341 Jul 14, 2018
@tanishgupta1
Copy link

I see this benchmarking is done for parquet filters. Did we have similar kind of benchmarking for ORC format too ?
Also why there is no push down threshold flag on ORC filters ?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 29, 2019

Ask questions to mailing list. ORC is completely orthogonal with this Parquet PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants