@@ -23,8 +23,10 @@ import org.scalatest.FunSuite
2323
2424import org .apache .spark ._
2525import org .apache .spark .SparkContext ._
26+ import org .apache .spark .io .CompressionCodec
2627
2728class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
29+ private val allCompressionCodecs = CompressionCodec .ALL_COMPRESSION_CODECS
2830 private def createCombiner [T ](i : T ) = ArrayBuffer [T ](i)
2931 private def mergeValue [T ](buffer : ArrayBuffer [T ], i : T ): ArrayBuffer [T ] = buffer += i
3032 private def mergeCombiners [T ](buf1 : ArrayBuffer [T ], buf2 : ArrayBuffer [T ]): ArrayBuffer [T ] =
@@ -33,12 +35,14 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
3335 private def createExternalMap [T ] = new ExternalAppendOnlyMap [T , T , ArrayBuffer [T ]](
3436 createCombiner[T ], mergeValue[T ], mergeCombiners[T ])
3537
36- private def createSparkConf (loadDefaults : Boolean ): SparkConf = {
38+ private def createSparkConf (loadDefaults : Boolean , codec : Option [ String ] = None ): SparkConf = {
3739 val conf = new SparkConf (loadDefaults)
3840 // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
3941 // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
4042 conf.set(" spark.serializer.objectStreamReset" , " 1" )
4143 conf.set(" spark.serializer" , " org.apache.spark.serializer.JavaSerializer" )
44+ conf.set(" spark.shuffle.spill.compress" , codec.isDefined.toString)
45+ codec.foreach { c => conf.set(" spark.io.compression.codec" , c) }
4246 // Ensure that we actually have multiple batches per spill file
4347 conf.set(" spark.shuffle.spill.batchSize" , " 10" )
4448 conf
@@ -204,8 +208,63 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
204208 }
205209 }
206210
211+ /**
212+ * For tests that involve spilling, run them multiple times with different compression settings.
213+ */
214+
207215 test(" spilling" ) {
208- val conf = createSparkConf(loadDefaults = true ) // Load defaults for Spark home
216+ runSpillingTest(testSpilling)
217+ }
218+
219+ test(" spilling with hash collisions" ) {
220+ runSpillingTest(testSpillingWithCollisions)
221+ }
222+
223+ test(" spilling with many hash collisions" ) {
224+ runSpillingTest(testSpillingWithManyCollisions)
225+ }
226+
227+ test(" spilling with hash collisions using the Int.MaxValue key" ) {
228+ runSpillingTest(testSpillingWithCollisionsMaxInt)
229+ }
230+
231+ test(" spilling with null keys and values" ) {
232+ runSpillingTest(testSpillingWithNullKeysAndValues)
233+ }
234+
235+ /* ------------------------------------ *
236+ * Actual test logic for spilling tests *
237+ * ------------------------------------ */
238+
239+ /**
240+ * Run a spilling test multiple times, with and without compression and using all codecs.
241+ */
242+ private def runSpillingTest (test : Option [String ] => Unit ): Unit = {
243+ var lastCompressionCodec : Option [String ] = None
244+ try {
245+ test(None )
246+ allCompressionCodecs.foreach { c =>
247+ lastCompressionCodec = Some (c)
248+ test(Some (c))
249+ }
250+ } catch {
251+ // Include compression codec used in test failure message
252+ // We need to catch Throwable here because assertion failures are not covered by Exceptions
253+ case t : Throwable =>
254+ val compressionMessage = lastCompressionCodec
255+ .map { c => " with compression using codec " + c }
256+ .getOrElse(" without compression" )
257+ val newException = new Exception (s " Test failed $compressionMessage: \n\n ${t.getMessage}" )
258+ newException.setStackTrace(t.getStackTrace)
259+ throw newException
260+ }
261+ }
262+
263+ /**
264+ * Test spilling through simple aggregations and cogroups.
265+ */
266+ private def testSpilling (codec : Option [String ]): Unit = {
267+ val conf = createSparkConf(loadDefaults = true , codec) // Load defaults for Spark home
209268 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
210269 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
211270
@@ -251,8 +310,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
251310 }
252311 }
253312
254- test(" spilling with hash collisions" ) {
255- val conf = createSparkConf(loadDefaults = true )
313+ /**
314+ * Test spilling with key hash collisions.
315+ */
316+ private def testSpillingWithCollisions (codec : Option [String ]): Unit = {
317+ val conf = createSparkConf(loadDefaults = true , codec)
256318 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
257319 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
258320 val map = createExternalMap[String ]
@@ -299,8 +361,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
299361 assert(count === 100000 + collisionPairs.size * 2 )
300362 }
301363
302- test(" spilling with many hash collisions" ) {
303- val conf = createSparkConf(loadDefaults = true )
364+ /**
365+ * Test spilling with many key hash collisions.
366+ */
367+ private def testSpillingWithManyCollisions (codec : Option [String ]): Unit = {
368+ val conf = createSparkConf(loadDefaults = true , codec)
304369 conf.set(" spark.shuffle.memoryFraction" , " 0.0001" )
305370 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
306371 val map = new ExternalAppendOnlyMap [FixedHashObject , Int , Int ](_ => 1 , _ + _, _ + _)
@@ -323,8 +388,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
323388 assert(count === 10000 )
324389 }
325390
326- test(" spilling with hash collisions using the Int.MaxValue key" ) {
327- val conf = createSparkConf(loadDefaults = true )
391+ /**
392+ * Test spilling with key hash collisions involving Int.MaxValue.
393+ */
394+ private def testSpillingWithCollisionsMaxInt (codec : Option [String ]): Unit = {
395+ val conf = createSparkConf(loadDefaults = true , codec)
328396 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
329397 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
330398 val map = createExternalMap[Int ]
@@ -339,8 +407,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
339407 }
340408 }
341409
342- test(" spilling with null keys and values" ) {
343- val conf = createSparkConf(loadDefaults = true )
410+ /**
411+ * Test spilling with null keys and values.
412+ */
413+ private def testSpillingWithNullKeysAndValues (codec : Option [String ]): Unit = {
414+ val conf = createSparkConf(loadDefaults = true , codec)
344415 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
345416 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
346417 val map = createExternalMap[Int ]
0 commit comments