@@ -25,10 +25,13 @@ import org.apache.spark._
2525import org .apache .spark .SparkContext ._
2626
2727class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
28+ private def createCombiner [T ](i : T ) = ArrayBuffer [T ](i)
29+ private def mergeValue [T ](buffer : ArrayBuffer [T ], i : T ): ArrayBuffer [T ] = buffer += i
30+ private def mergeCombiners [T ](buf1 : ArrayBuffer [T ], buf2 : ArrayBuffer [T ]): ArrayBuffer [T ] =
31+ buf1 ++= buf2
2832
29- private def createCombiner (i : Int ) = ArrayBuffer [Int ](i)
30- private def mergeValue (buffer : ArrayBuffer [Int ], i : Int ) = buffer += i
31- private def mergeCombiners (buf1 : ArrayBuffer [Int ], buf2 : ArrayBuffer [Int ]) = buf1 ++= buf2
33+ private def createExternalMap [T ] = new ExternalAppendOnlyMap [T , T , ArrayBuffer [T ]](
34+ createCombiner[T ], mergeValue[T ], mergeCombiners[T ])
3235
3336 private def createSparkConf (loadDefaults : Boolean ): SparkConf = {
3437 val conf = new SparkConf (loadDefaults)
@@ -42,37 +45,33 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
4245 }
4346
4447 test(" simple insert" ) {
45- val conf = createSparkConf(false )
48+ val conf = createSparkConf(loadDefaults = false )
4649 sc = new SparkContext (" local" , " test" , conf)
47-
48- val map = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](createCombiner,
49- mergeValue, mergeCombiners)
50+ val map = createExternalMap[Int ]
5051
5152 // Single insert
5253 map.insert(1 , 10 )
5354 var it = map.iterator
5455 assert(it.hasNext)
5556 val kv = it.next()
56- assert(kv._1 == 1 && kv._2 == ArrayBuffer [Int ](10 ))
57+ assert(kv._1 === 1 && kv._2 = == ArrayBuffer [Int ](10 ))
5758 assert(! it.hasNext)
5859
5960 // Multiple insert
6061 map.insert(2 , 20 )
6162 map.insert(3 , 30 )
6263 it = map.iterator
6364 assert(it.hasNext)
64- assert(it.toSet == Set [(Int , ArrayBuffer [Int ])](
65+ assert(it.toSet === Set [(Int , ArrayBuffer [Int ])](
6566 (1 , ArrayBuffer [Int ](10 )),
6667 (2 , ArrayBuffer [Int ](20 )),
6768 (3 , ArrayBuffer [Int ](30 ))))
6869 }
6970
7071 test(" insert with collision" ) {
71- val conf = createSparkConf(false )
72+ val conf = createSparkConf(loadDefaults = false )
7273 sc = new SparkContext (" local" , " test" , conf)
73-
74- val map = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](createCombiner,
75- mergeValue, mergeCombiners)
74+ val map = createExternalMap[Int ]
7675
7776 map.insertAll(Seq (
7877 (1 , 10 ),
@@ -84,30 +83,27 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
8483 val it = map.iterator
8584 assert(it.hasNext)
8685 val result = it.toSet[(Int , ArrayBuffer [Int ])].map(kv => (kv._1, kv._2.toSet))
87- assert(result == Set [(Int , Set [Int ])](
86+ assert(result === Set [(Int , Set [Int ])](
8887 (1 , Set [Int ](10 , 100 , 1000 )),
8988 (2 , Set [Int ](20 , 200 )),
9089 (3 , Set [Int ](30 ))))
9190 }
9291
9392 test(" ordering" ) {
94- val conf = createSparkConf(false )
93+ val conf = createSparkConf(loadDefaults = false )
9594 sc = new SparkContext (" local" , " test" , conf)
9695
97- val map1 = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](createCombiner,
98- mergeValue, mergeCombiners)
96+ val map1 = createExternalMap[Int ]
9997 map1.insert(1 , 10 )
10098 map1.insert(2 , 20 )
10199 map1.insert(3 , 30 )
102100
103- val map2 = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](createCombiner,
104- mergeValue, mergeCombiners)
101+ val map2 = createExternalMap[Int ]
105102 map2.insert(2 , 20 )
106103 map2.insert(3 , 30 )
107104 map2.insert(1 , 10 )
108105
109- val map3 = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](createCombiner,
110- mergeValue, mergeCombiners)
106+ val map3 = createExternalMap[Int ]
111107 map3.insert(3 , 30 )
112108 map3.insert(1 , 10 )
113109 map3.insert(2 , 20 )
@@ -119,33 +115,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
119115 var kv1 = it1.next()
120116 var kv2 = it2.next()
121117 var kv3 = it3.next()
122- assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
123- assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
118+ assert(kv1._1 === kv2._1 && kv2._1 = == kv3._1)
119+ assert(kv1._2 === kv2._2 && kv2._2 = == kv3._2)
124120
125121 kv1 = it1.next()
126122 kv2 = it2.next()
127123 kv3 = it3.next()
128- assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
129- assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
124+ assert(kv1._1 === kv2._1 && kv2._1 = == kv3._1)
125+ assert(kv1._2 === kv2._2 && kv2._2 = == kv3._2)
130126
131127 kv1 = it1.next()
132128 kv2 = it2.next()
133129 kv3 = it3.next()
134- assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
135- assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
130+ assert(kv1._1 === kv2._1 && kv2._1 = == kv3._1)
131+ assert(kv1._2 === kv2._2 && kv2._2 = == kv3._2)
136132 }
137133
138134 test(" null keys and values" ) {
139- val conf = createSparkConf(false )
135+ val conf = createSparkConf(loadDefaults = false )
140136 sc = new SparkContext (" local" , " test" , conf)
141137
142- val map = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](createCombiner,
143- mergeValue, mergeCombiners)
138+ val map = createExternalMap[Int ]
144139 map.insert(1 , 5 )
145140 map.insert(2 , 6 )
146141 map.insert(3 , 7 )
147142 assert(map.size === 3 )
148- assert(map.iterator.toSet == Set [(Int , Seq [Int ])](
143+ assert(map.iterator.toSet === Set [(Int , Seq [Int ])](
149144 (1 , Seq [Int ](5 )),
150145 (2 , Seq [Int ](6 )),
151146 (3 , Seq [Int ](7 ))
@@ -155,7 +150,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
155150 val nullInt = null .asInstanceOf [Int ]
156151 map.insert(nullInt, 8 )
157152 assert(map.size === 4 )
158- assert(map.iterator.toSet == Set [(Int , Seq [Int ])](
153+ assert(map.iterator.toSet === Set [(Int , Seq [Int ])](
159154 (1 , Seq [Int ](5 )),
160155 (2 , Seq [Int ](6 )),
161156 (3 , Seq [Int ](7 )),
@@ -167,7 +162,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
167162 map.insert(nullInt, nullInt)
168163 assert(map.size === 5 )
169164 val result = map.iterator.toSet[(Int , ArrayBuffer [Int ])].map(kv => (kv._1, kv._2.toSet))
170- assert(result == Set [(Int , Set [Int ])](
165+ assert(result === Set [(Int , Set [Int ])](
171166 (1 , Set [Int ](5 )),
172167 (2 , Set [Int ](6 )),
173168 (3 , Set [Int ](7 )),
@@ -177,100 +172,90 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
177172 }
178173
179174 test(" simple aggregator" ) {
180- val conf = createSparkConf(false )
175+ val conf = createSparkConf(loadDefaults = false )
181176 sc = new SparkContext (" local" , " test" , conf)
182177
183178 // reduceByKey
184179 val rdd = sc.parallelize(1 to 10 ).map(i => (i% 2 , 1 ))
185180 val result1 = rdd.reduceByKey(_+_).collect()
186- assert(result1.toSet == Set [(Int , Int )]((0 , 5 ), (1 , 5 )))
181+ assert(result1.toSet === Set [(Int , Int )]((0 , 5 ), (1 , 5 )))
187182
188183 // groupByKey
189184 val result2 = rdd.groupByKey().collect().map(x => (x._1, x._2.toList)).toSet
190- assert(result2.toSet == Set [(Int , Seq [Int ])]
185+ assert(result2.toSet === Set [(Int , Seq [Int ])]
191186 ((0 , List [Int ](1 , 1 , 1 , 1 , 1 )), (1 , List [Int ](1 , 1 , 1 , 1 , 1 ))))
192187 }
193188
194189 test(" simple cogroup" ) {
195- val conf = createSparkConf(false )
190+ val conf = createSparkConf(loadDefaults = false )
196191 sc = new SparkContext (" local" , " test" , conf)
197192 val rdd1 = sc.parallelize(1 to 4 ).map(i => (i, i))
198193 val rdd2 = sc.parallelize(1 to 4 ).map(i => (i% 2 , i))
199194 val result = rdd1.cogroup(rdd2).collect()
200195
201196 result.foreach { case (i, (seq1, seq2)) =>
202197 i match {
203- case 0 => assert(seq1.toSet == Set [Int ]() && seq2.toSet == Set [Int ](2 , 4 ))
204- case 1 => assert(seq1.toSet == Set [Int ](1 ) && seq2.toSet == Set [Int ](1 , 3 ))
205- case 2 => assert(seq1.toSet == Set [Int ](2 ) && seq2.toSet == Set [Int ]())
206- case 3 => assert(seq1.toSet == Set [Int ](3 ) && seq2.toSet == Set [Int ]())
207- case 4 => assert(seq1.toSet == Set [Int ](4 ) && seq2.toSet == Set [Int ]())
198+ case 0 => assert(seq1.toSet === Set [Int ]() && seq2.toSet = == Set [Int ](2 , 4 ))
199+ case 1 => assert(seq1.toSet === Set [Int ](1 ) && seq2.toSet = == Set [Int ](1 , 3 ))
200+ case 2 => assert(seq1.toSet === Set [Int ](2 ) && seq2.toSet = == Set [Int ]())
201+ case 3 => assert(seq1.toSet === Set [Int ](3 ) && seq2.toSet = == Set [Int ]())
202+ case 4 => assert(seq1.toSet === Set [Int ](4 ) && seq2.toSet = == Set [Int ]())
208203 }
209204 }
210205 }
211206
212207 test(" spilling" ) {
213- val conf = createSparkConf(true ) // Load defaults, otherwise SPARK_HOME is not found
208+ val conf = createSparkConf(loadDefaults = true ) // Load defaults for Spark home
214209 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
215210 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
216211
217212 // reduceByKey - should spill ~8 times
218213 val rddA = sc.parallelize(0 until 100000 ).map(i => (i/ 2 , i))
219214 val resultA = rddA.reduceByKey(math.max).collect()
220- assert(resultA.length == 50000 )
221- resultA.foreach { case (k, v) =>
222- if (v != k * 2 + 1 ) {
223- fail(s " Value for ${k} was wrong: expected ${k * 2 + 1 }, got ${v}" )
224- }
215+ assert(resultA.length === 50000 )
216+ resultA.foreach { case (k, v) =>
217+ assert(v === k * 2 + 1 , s " Value for $k was wrong: expected ${k * 2 + 1 }, got $v" )
225218 }
226219
227220 // groupByKey - should spill ~17 times
228221 val rddB = sc.parallelize(0 until 100000 ).map(i => (i/ 4 , i))
229222 val resultB = rddB.groupByKey().collect()
230- assert(resultB.length == 25000 )
231- resultB.foreach { case (i, seq) =>
223+ assert(resultB.length === 25000 )
224+ resultB.foreach { case (i, seq) =>
232225 val expected = Set (i * 4 , i * 4 + 1 , i * 4 + 2 , i * 4 + 3 )
233- if (seq.toSet != expected) {
234- fail(s " Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}" )
235- }
226+ assert(seq.toSet === expected,
227+ s " Value for $i was wrong: expected $expected, got ${seq.toSet}" )
236228 }
237229
238230 // cogroup - should spill ~7 times
239231 val rddC1 = sc.parallelize(0 until 10000 ).map(i => (i, i))
240232 val rddC2 = sc.parallelize(0 until 10000 ).map(i => (i% 1000 , i))
241233 val resultC = rddC1.cogroup(rddC2).collect()
242- assert(resultC.length == 10000 )
243- resultC.foreach { case (i, (seq1, seq2)) =>
234+ assert(resultC.length === 10000 )
235+ resultC.foreach { case (i, (seq1, seq2)) =>
244236 i match {
245237 case 0 =>
246- assert(seq1.toSet == Set [Int ](0 ))
247- assert(seq2.toSet == Set [Int ](0 , 1000 , 2000 , 3000 , 4000 , 5000 , 6000 , 7000 , 8000 , 9000 ))
238+ assert(seq1.toSet === Set [Int ](0 ))
239+ assert(seq2.toSet === Set [Int ](0 , 1000 , 2000 , 3000 , 4000 , 5000 , 6000 , 7000 , 8000 , 9000 ))
248240 case 1 =>
249- assert(seq1.toSet == Set [Int ](1 ))
250- assert(seq2.toSet == Set [Int ](1 , 1001 , 2001 , 3001 , 4001 , 5001 , 6001 , 7001 , 8001 , 9001 ))
241+ assert(seq1.toSet === Set [Int ](1 ))
242+ assert(seq2.toSet === Set [Int ](1 , 1001 , 2001 , 3001 , 4001 , 5001 , 6001 , 7001 , 8001 , 9001 ))
251243 case 5000 =>
252- assert(seq1.toSet == Set [Int ](5000 ))
253- assert(seq2.toSet == Set [Int ]())
244+ assert(seq1.toSet === Set [Int ](5000 ))
245+ assert(seq2.toSet === Set [Int ]())
254246 case 9999 =>
255- assert(seq1.toSet == Set [Int ](9999 ))
256- assert(seq2.toSet == Set [Int ]())
247+ assert(seq1.toSet === Set [Int ](9999 ))
248+ assert(seq2.toSet === Set [Int ]())
257249 case _ =>
258250 }
259251 }
260252 }
261253
262254 test(" spilling with hash collisions" ) {
263- val conf = createSparkConf(true )
255+ val conf = createSparkConf(loadDefaults = true )
264256 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
265257 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
266-
267- def createCombiner (i : String ) = ArrayBuffer [String ](i)
268- def mergeValue (buffer : ArrayBuffer [String ], i : String ) = buffer += i
269- def mergeCombiners (buffer1 : ArrayBuffer [String ], buffer2 : ArrayBuffer [String ]) =
270- buffer1 ++= buffer2
271-
272- val map = new ExternalAppendOnlyMap [String , String , ArrayBuffer [String ]](
273- createCombiner, mergeValue, mergeCombiners)
258+ val map = createExternalMap[String ]
274259
275260 val collisionPairs = Seq (
276261 (" Aa" , " BB" ), // 2112
@@ -315,10 +300,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
315300 }
316301
317302 test(" spilling with many hash collisions" ) {
318- val conf = createSparkConf(true )
303+ val conf = createSparkConf(loadDefaults = true )
319304 conf.set(" spark.shuffle.memoryFraction" , " 0.0001" )
320305 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
321-
322306 val map = new ExternalAppendOnlyMap [FixedHashObject , Int , Int ](_ => 1 , _ + _, _ + _)
323307
324308 // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
@@ -340,12 +324,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
340324 }
341325
342326 test(" spilling with hash collisions using the Int.MaxValue key" ) {
343- val conf = createSparkConf(true )
327+ val conf = createSparkConf(loadDefaults = true )
344328 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
345329 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
346-
347- val map = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](
348- createCombiner, mergeValue, mergeCombiners)
330+ val map = createExternalMap[Int ]
349331
350332 (1 to 100000 ).foreach { i => map.insert(i, i) }
351333 map.insert(Int .MaxValue , Int .MaxValue )
@@ -358,12 +340,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
358340 }
359341
360342 test(" spilling with null keys and values" ) {
361- val conf = createSparkConf(true )
343+ val conf = createSparkConf(loadDefaults = true )
362344 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
363345 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
364-
365- val map = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](
366- createCombiner, mergeValue, mergeCombiners)
346+ val map = createExternalMap[Int ]
367347
368348 map.insertAll((1 to 100000 ).iterator.map(i => (i, i)))
369349 map.insert(null .asInstanceOf [Int ], 1 )
0 commit comments