From 94ba4c377b110fe3f3955cbb8654270d4c2b842b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 16 Jun 2020 15:53:18 +0300 Subject: [PATCH 1/6] Fix ExtractBenchmark --- .../spark/sql/execution/benchmark/ExtractBenchmark.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala index 287854dc3646c..8372698fb47ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala @@ -59,10 +59,10 @@ object ExtractBenchmark extends SqlBasedBenchmark { } private def castExpr(from: String): String = from match { - case "timestamp" => "cast(id as timestamp)" - case "date" => "cast(cast(id as timestamp) as date)" - case "interval" => "(cast(cast(id as timestamp) as date) - date'0001-01-01') + " + - "(cast(id as timestamp) - timestamp'1000-01-01 01:02:03.123456')" + case "timestamp" => "timestamp_seconds(id)" + case "date" => "cast(timestamp_seconds(id) as date)" + case "interval" => "(cast(timestamp_seconds(id) as date) - date'0001-01-01') + " + + "(timestamp_seconds(id) - timestamp'1000-01-01 01:02:03.123456')" case other => throw new IllegalArgumentException( s"Unsupported column type $other. Valid column types are 'timestamp' and 'date'") } From d183856363a0ea9fec0a8396cc1fbf3094a7feeb Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 16 Jun 2020 15:55:28 +0300 Subject: [PATCH 2/6] Fix DateTimeBenchmark --- .../benchmark/DateTimeBenchmark.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala index c7b8737b7a753..b06ca71b04ecc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala @@ -54,7 +54,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark { private def run(cardinality: Int, func: String): Unit = { codegenBenchmark(s"$func of timestamp", cardinality) { - doBenchmark(cardinality, s"$func(cast(id as timestamp))") + doBenchmark(cardinality, s"$func(timestamp_seconds(id))") } } @@ -64,7 +64,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark { val N = 10000000 runBenchmark("datetime +/- interval") { val benchmark = new Benchmark("datetime +/- interval", N, output = output) - val ts = "cast(id as timestamp)" + val ts = "timestamp_seconds(id)" val dt = s"cast($ts as date)" benchmark.addCase("date + interval(m)") { _ => doBenchmark(N, s"$dt + interval 1 month") @@ -105,7 +105,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark { benchmark.run() } runBenchmark("Extract components") { - run(N, "cast to timestamp", "cast(id as timestamp)") + run(N, "cast to timestamp", "timestamp_seconds(id)") run(N, "year") run(N, "quarter") run(N, "month") @@ -124,7 +124,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark { run(N, "current_timestamp", "current_timestamp") } runBenchmark("Date arithmetic") { - val dateExpr = "cast(cast(id as timestamp) as date)" + val dateExpr = "cast(timestamp_seconds(id) as date)" run(N, "cast to date", dateExpr) run(N, "last_day", s"last_day($dateExpr)") run(N, "next_day", s"next_day($dateExpr, 'TU')") @@ -133,31 +133,31 @@ object DateTimeBenchmark extends SqlBasedBenchmark { run(N, "add_months", s"add_months($dateExpr, 10)") } runBenchmark("Formatting dates") { - val dateExpr = "cast(cast(id as timestamp) as date)" + val dateExpr = "cast(timestamp_seconds(id) as date)" run(N, "format date", s"date_format($dateExpr, 'MMM yyyy')") } runBenchmark("Formatting timestamps") { run(N, "from_unixtime", "from_unixtime(id, 'yyyy-MM-dd HH:mm:ss.SSSSSS')") } runBenchmark("Convert timestamps") { - val timestampExpr = "cast(id as timestamp)" + val timestampExpr = "timestamp_seconds(id)" run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 'CET')") run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')") } runBenchmark("Intervals") { - val (start, end) = ("cast(id as timestamp)", "cast((id+8640000) as timestamp)") + val (start, end) = ("timestamp_seconds(id)", "timestamp_seconds(id+8640000)") run(N, "cast interval", start, end) run(N, "datediff", s"datediff($start, $end)") run(N, "months_between", s"months_between($start, $end)") run(1000000, "window", s"window($start, 100, 10, 1)") } runBenchmark("Truncation") { - val timestampExpr = "cast(id as timestamp)" + val timestampExpr = "timestamp_seconds(id)" Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM", "DAY", "DD", "HOUR", "MINUTE", "SECOND", "WEEK", "QUARTER").foreach { level => run(N, s"date_trunc $level", s"date_trunc('$level', $timestampExpr)") } - val dateExpr = "cast(cast(id as timestamp) as date)" + val dateExpr = "cast(timestamp_seconds(id) as date)" Seq("year", "yyyy", "yy", "mon", "month", "mm").foreach { level => run(N, s"trunc $level", s"trunc('$level', $dateExpr)") } From ce0465fbb3d2f9ba2bf7133d430db0b3a6b46ffc Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 16 Jun 2020 15:57:05 +0300 Subject: [PATCH 3/6] Fix FilterPushdownBenchmark --- .../sql/execution/benchmark/FilterPushdownBenchmark.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index b3f65d40ad95b..ff451a357cbc9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -336,7 +336,7 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) - Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr => + Seq(s"value = timestamp_seconds($mid)").foreach { whereExpr => val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)" .replace("value AND value", "value") filterPushDownBenchmark(numRows, title, whereExpr) @@ -348,8 +348,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { filterPushDownBenchmark( numRows, s"Select $percent% timestamp stored as $fileType rows " + - s"(value < CAST(${numRows * percent / 100} AS timestamp))", - s"value < CAST(${numRows * percent / 100} as timestamp)", + s"(value < timestamp_seconds(${numRows * percent / 100}))", + s"value < timestamp_seconds(${numRows * percent / 100})", selectExpr ) } From 4a97fb29ffa5015fba8efb82f87960c14dec8c8e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 16 Jun 2020 15:58:43 +0300 Subject: [PATCH 4/6] Fix InExpressionBenchmark --- .../spark/sql/execution/benchmark/InExpressionBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala index caf3387875813..d35b9443f49e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala @@ -128,7 +128,7 @@ object InExpressionBenchmark extends SqlBasedBenchmark { private def runTimestampBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = { val name = s"$numItems timestamps" - val values = (1 to numItems).map(m => s"CAST('1970-01-01 01:00:00.$m' AS timestamp)") + val values = (1 to numItems).map(m => s"timestamp_seconds('1970-01-01 01:00:00.$m')") val df = spark.range(0, numRows).select($"id".cast(TimestampType)) runBenchmark(name, df, values, numRows, minNumIters) } From b3f3c39c5403218c1f7b02469b3b5ce400404a3a Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 16 Jun 2020 16:15:55 +0300 Subject: [PATCH 5/6] Fix InExpressionBenchmark --- .../execution/benchmark/InExpressionBenchmark.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala index d35b9443f49e0..704227e4b4db4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions.{array, struct} +import org.apache.spark.sql.functions.{array, struct, timestamp_seconds} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -128,15 +128,15 @@ object InExpressionBenchmark extends SqlBasedBenchmark { private def runTimestampBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = { val name = s"$numItems timestamps" - val values = (1 to numItems).map(m => s"timestamp_seconds('1970-01-01 01:00:00.$m')") - val df = spark.range(0, numRows).select($"id".cast(TimestampType)) + val values = (1 to numItems).map(m => s"timestamp'1970-01-01 01:00:00.$m'") + val df = spark.range(0, numRows).select(timestamp_seconds($"id").as("id")) runBenchmark(name, df, values, numRows, minNumIters) } private def runDateBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = { val name = s"$numItems dates" - val values = (1 to numItems).map(n => 1970 + n).map(y => s"CAST('$y-01-01' AS date)") - val df = spark.range(0, numRows).select($"id".cast(TimestampType).cast(DateType)) + val values = (1 to numItems).map(n => 1970 + n).map(y => s"date'$y-01-01'") + val df = spark.range(0, numRows).select(timestamp_seconds($"id").cast(DateType).as("id")) runBenchmark(name, df, values, numRows, minNumIters) } From 97b5f97e8ad10e07781f2797ea2f581129a93cf0 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 16 Jun 2020 16:42:09 +0300 Subject: [PATCH 6/6] Fix FilterPushdownBenchmark --- .../sql/execution/benchmark/FilterPushdownBenchmark.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index ff451a357cbc9..9ade8b14f59b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -24,7 +24,7 @@ import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, TimestampType} @@ -332,7 +332,7 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) { val columns = (1 to width).map(i => s"CAST(id AS string) c$i") val df = spark.range(numRows).selectExpr(columns: _*) - .withColumn("value", monotonically_increasing_id().cast(TimestampType)) + .withColumn("value", timestamp_seconds(monotonically_increasing_id())) withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir)