Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
================================================================================================
WITHOUT SPILL
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 6378 / 6550 16.1 62.3 1.0X
ExternalAppendOnlyUnsafeRowArray 6196 / 6242 16.5 60.5 1.0X

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 11988 / 12027 21.9 45.7 1.0X
ExternalAppendOnlyUnsafeRowArray 37480 / 37574 7.0 143.0 0.3X

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 23536 / 23538 20.9 47.9 1.0X
ExternalAppendOnlyUnsafeRowArray 31275 / 31277 15.7 63.6 0.8X


================================================================================================
WITH SPILL
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter 29241 / 29279 9.0 111.5 1.0X
ExternalAppendOnlyUnsafeRowArray 14309 / 14313 18.3 54.6 2.0X

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter 11 / 11 14.8 67.4 1.0X
ExternalAppendOnlyUnsafeRowArray 9 / 9 17.6 56.8 1.2X

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the original master branch and get the following. Since the trend is the same, this refactoring PR looks safe.

$ bin/spark-submit --class org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark --jars core/target/scala-2.12/spark-core_2.12-3.0.0-SNAPSHOT-tests.jar sql/core/target/scala-2.12/spark-sql_2.12-3.0.0-SNAPSHOT-tests.jar
...
Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                   9556 / 9633         27.4          36.5       1.0X
ExternalAppendOnlyUnsafeRowArray            18514 / 18700         14.2          70.6       0.5X

Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                 22180 / 22195         22.2          45.1       1.0X
ExternalAppendOnlyUnsafeRowArray            24254 / 24331         20.3          49.3       0.9X

Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                   4998 / 5052         20.5          48.8       1.0X
ExternalAppendOnlyUnsafeRowArray              4778 / 4821         21.4          46.7       1.0X

Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter                        17536 / 17596         14.9          66.9       1.0X
ExternalAppendOnlyUnsafeRowArray            10380 / 10451         25.3          39.6       1.7X

Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter                             6 /    7         25.3          39.5       1.0X
ExternalAppendOnlyUnsafeRowArray                 6 /    7         26.3          38.0       1.0X


Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,57 @@ package org.apache.spark.sql.execution
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

object ExternalAppendOnlyUnsafeRowArrayBenchmark {
/**
* Benchmark ExternalAppendOnlyUnsafeRowArray.
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar>
* 2. build/sbt build/sbt ";project sql;set javaOptions
* in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions
* in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I was confused with -=.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, i did it a bit confusing way, but updated now to += ...=false in a new commit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, in PR description, runMain is repeated twice; test:runMain test:runMain

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* Results will be written to
* "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt".
* }}}
*/
object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {

def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = {
private val conf = new SparkConf(false)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
.set("spark.serializer.objectStreamReset", "1")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

private def withFakeTaskContext(f: => Unit): Unit = {
val sc = new SparkContext("local", "test", conf)
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
TaskContext.setTaskContext(taskContext)
f
sc.stop()
}

private def testRows(numRows: Int): Seq[UnsafeRow] = {
val random = new java.util.Random()
val rows = (1 to numRows).map(_ => {
(1 to numRows).map(_ => {
val row = new UnsafeRow(1)
row.pointTo(new Array[Byte](64), 16)
row.setLong(0, random.nextLong())
row
})
}

val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows)
def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = {
val rows = testRows(numRows)

val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows,
output = output)

// Internally, `ExternalAppendOnlyUnsafeRowArray` will create an
// in-memory buffer of size `numSpillThreshold`. This will mimic that
Expand Down Expand Up @@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
}
}

val conf = new SparkConf(false)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

val sc = new SparkContext("local", "test", conf)
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
TaskContext.setTaskContext(taskContext)
benchmark.run()
sc.stop()
withFakeTaskContext {
benchmark.run()
}
}

def testAgainstRawUnsafeExternalSorter(
numSpillThreshold: Int,
numRows: Int,
iterations: Int): Unit = {
val rows = testRows(numRows)

val random = new java.util.Random()
val rows = (1 to numRows).map(_ => {
val row = new UnsafeRow(1)
row.pointTo(new Array[Byte](64), 16)
row.setLong(0, random.nextLong())
row
})

val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows)
val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows,
output = output)

benchmark.addCase("UnsafeExternalSorter") { _: Int =>
var sum = 0L
Expand Down Expand Up @@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
}
}

val conf = new SparkConf(false)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

val sc = new SparkContext("local", "test", conf)
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
TaskContext.setTaskContext(taskContext)
benchmark.run()
sc.stop()
withFakeTaskContext {
benchmark.run()
}
}

def main(args: Array[String]): Unit = {

// ========================================================================================= //
// WITHOUT SPILL
// ========================================================================================= //

val spillThreshold = 100 * 1000

/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 7821 / 7941 33.5 29.8 1.0X
ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X
*/
testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)

/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 19200 / 19206 25.6 39.1 1.0X
ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X
*/
testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)

/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 5949 / 6028 17.2 58.1 1.0X
ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X
*/
testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)

// ========================================================================================= //
// WITH SPILL
// ========================================================================================= //

/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X
ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X
*/
testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)

/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X
ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X
*/
testAgainstRawUnsafeExternalSorter(
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4)
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("WITHOUT SPILL") {
val spillThreshold = 100 * 1000
testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the original sequence; 1000 -> 30 * 1000 -> 100 * 1000. Increasing order is more intuitive.
Ah, I got it. This is reordered by the calculation. Please forgot about the above comment.

>>> 1000 * (1<<18)
262144000
>>> 30 * 1000 * (1<<14)
491520000
>>> 100 * 1000 * (1<<10)
102400000

}

runBenchmark("WITH SPILL") {
testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
testAgainstRawUnsafeExternalSorter(
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4)
}
}
}