Skip to content

Commit a1fe59d

Browse files
committed
[SPARK-6765] Fix test code style for core.
Author: Reynold Xin <[email protected]> Closes #5484 from rxin/test-style-core and squashes the following commits: e0b0100 [Reynold Xin] [SPARK-6765] Fix test code style for core.
1 parent 04bcd67 commit a1fe59d

File tree

59 files changed

+386
-304
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+386
-304
lines changed

core/src/test/scala/org/apache/spark/AccumulatorSuite.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,20 @@ import org.scalatest.Matchers
2727
class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
2828

2929

30-
implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
31-
def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
32-
t1 ++= t2
33-
t1
34-
}
35-
def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
36-
t1 += t2
37-
t1
38-
}
39-
def zero(t: mutable.Set[A]) : mutable.Set[A] = {
40-
new mutable.HashSet[A]()
30+
implicit def setAccum[A]: AccumulableParam[mutable.Set[A], A] =
31+
new AccumulableParam[mutable.Set[A], A] {
32+
def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
33+
t1 ++= t2
34+
t1
35+
}
36+
def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
37+
t1 += t2
38+
t1
39+
}
40+
def zero(t: mutable.Set[A]) : mutable.Set[A] = {
41+
new mutable.HashSet[A]()
42+
}
4143
}
42-
}
4344

4445
test ("basic accumulation"){
4546
sc = new SparkContext("local", "test")
@@ -49,11 +50,10 @@ class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
4950
d.foreach{x => acc += x}
5051
acc.value should be (210)
5152

52-
53-
val longAcc = sc.accumulator(0l)
53+
val longAcc = sc.accumulator(0L)
5454
val maxInt = Integer.MAX_VALUE.toLong
5555
d.foreach{x => longAcc += maxInt + x}
56-
longAcc.value should be (210l + maxInt * 20)
56+
longAcc.value should be (210L + maxInt * 20)
5757
}
5858

5959
test ("value not assignable from tasks") {

core/src/test/scala/org/apache/spark/CacheManagerSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,17 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
4545
rdd = new RDD[Int](sc, Nil) {
4646
override def getPartitions: Array[Partition] = Array(split)
4747
override val getDependencies = List[Dependency[_]]()
48-
override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
48+
override def compute(split: Partition, context: TaskContext): Iterator[Int] =
49+
Array(1, 2, 3, 4).iterator
4950
}
5051
rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
5152
override def getPartitions: Array[Partition] = firstParent[Int].partitions
52-
override def compute(split: Partition, context: TaskContext) =
53+
override def compute(split: Partition, context: TaskContext): Iterator[Int] =
5354
firstParent[Int].iterator(split, context)
5455
}.cache()
5556
rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
5657
override def getPartitions: Array[Partition] = firstParent[Int].partitions
57-
override def compute(split: Partition, context: TaskContext) =
58+
override def compute(split: Partition, context: TaskContext): Iterator[Int] =
5859
firstParent[Int].iterator(split, context)
5960
}.cache()
6061
}

core/src/test/scala/org/apache/spark/CheckpointSuite.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
7575
assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result)
7676
assert(parCollection.dependencies != Nil)
7777
assert(parCollection.partitions.length === numPartitions)
78-
assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList)
78+
assert(parCollection.partitions.toList ===
79+
parCollection.checkpointData.get.getPartitions.toList)
7980
assert(parCollection.collect() === result)
8081
}
8182

@@ -102,13 +103,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
102103
}
103104

104105
test("UnionRDD") {
105-
def otherRDD = sc.makeRDD(1 to 10, 1)
106+
def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1)
106107
testRDD(_.union(otherRDD))
107108
testRDDPartitions(_.union(otherRDD))
108109
}
109110

110111
test("CartesianRDD") {
111-
def otherRDD = sc.makeRDD(1 to 10, 1)
112+
def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1)
112113
testRDD(new CartesianRDD(sc, _, otherRDD))
113114
testRDDPartitions(new CartesianRDD(sc, _, otherRDD))
114115

@@ -223,7 +224,8 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
223224
val partitionAfterCheckpoint = serializeDeserialize(
224225
unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
225226
assert(
226-
partitionBeforeCheckpoint.parents.head.getClass != partitionAfterCheckpoint.parents.head.getClass,
227+
partitionBeforeCheckpoint.parents.head.getClass !=
228+
partitionAfterCheckpoint.parents.head.getClass,
227229
"PartitionerAwareUnionRDDPartition.parents not updated after parent RDD is checkpointed"
228230
)
229231
}
@@ -358,7 +360,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
358360
* Generate an pair RDD (with partitioner) such that both the RDD and its partitions
359361
* have large size.
360362
*/
361-
def generateFatPairRDD() = {
363+
def generateFatPairRDD(): RDD[(Int, Int)] = {
362364
new FatPairRDD(sc.makeRDD(1 to 100, 4), partitioner).mapValues(x => x)
363365
}
364366

@@ -445,7 +447,8 @@ class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int,
445447
object CheckpointSuite {
446448
// This is a custom cogroup function that does not use mapValues like
447449
// the PairRDDFunctions.cogroup()
448-
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
450+
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
451+
: RDD[(K, Array[Iterable[V]])] = {
449452
new CoGroupedRDD[K](
450453
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
451454
part

core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha
6464
}
6565
}
6666

67-
//------ Helper functions ------
67+
// ------ Helper functions ------
6868

6969
protected def newRDD() = sc.makeRDD(1 to 10)
7070
protected def newPairRDD() = newRDD().map(_ -> 1)
@@ -370,7 +370,7 @@ class CleanerTester(
370370
val cleanerListener = new CleanerListener {
371371
def rddCleaned(rddId: Int): Unit = {
372372
toBeCleanedRDDIds -= rddId
373-
logInfo("RDD "+ rddId + " cleaned")
373+
logInfo("RDD " + rddId + " cleaned")
374374
}
375375

376376
def shuffleCleaned(shuffleId: Int): Unit = {

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
222222
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
223223
nums.saveAsSequenceFile(outputDir)
224224
val output =
225-
sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
225+
sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
226226
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
227227
}
228228

@@ -451,16 +451,19 @@ class FileSuite extends FunSuite with LocalSparkContext {
451451

452452
test ("prevent user from overwriting the empty directory (new Hadoop API)") {
453453
sc = new SparkContext("local", "test")
454-
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
454+
val randomRDD = sc.parallelize(
455+
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
455456
intercept[FileAlreadyExistsException] {
456457
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
457458
}
458459
}
459460

460461
test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
461462
sc = new SparkContext("local", "test")
462-
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
463-
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
463+
val randomRDD = sc.parallelize(
464+
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
465+
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
466+
tempDir.getPath + "/output")
464467
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
465468
intercept[FileAlreadyExistsException] {
466469
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
@@ -471,16 +474,20 @@ class FileSuite extends FunSuite with LocalSparkContext {
471474
val sf = new SparkConf()
472475
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
473476
sc = new SparkContext(sf)
474-
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
475-
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
477+
val randomRDD = sc.parallelize(
478+
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
479+
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
480+
tempDir.getPath + "/output")
476481
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
477-
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
482+
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
483+
tempDir.getPath + "/output")
478484
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
479485
}
480486

481487
test ("save Hadoop Dataset through old Hadoop API") {
482488
sc = new SparkContext("local", "test")
483-
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
489+
val randomRDD = sc.parallelize(
490+
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
484491
val job = new JobConf()
485492
job.setOutputKeyClass(classOf[String])
486493
job.setOutputValueClass(classOf[String])
@@ -492,7 +499,8 @@ class FileSuite extends FunSuite with LocalSparkContext {
492499

493500
test ("save Hadoop Dataset through new Hadoop API") {
494501
sc = new SparkContext("local", "test")
495-
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
502+
val randomRDD = sc.parallelize(
503+
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
496504
val job = new Job(sc.hadoopConfiguration)
497505
job.setOutputKeyClass(classOf[String])
498506
job.setOutputValueClass(classOf[String])

core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private object ImplicitOrderingSuite {
5151
override def compare(o: OrderedClass): Int = ???
5252
}
5353

54-
def basicMapExpectations(rdd: RDD[Int]) = {
54+
def basicMapExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
5555
List((rdd.map(x => (x, x)).keyOrdering.isDefined,
5656
"rdd.map(x => (x, x)).keyOrdering.isDefined"),
5757
(rdd.map(x => (1, x)).keyOrdering.isDefined,
@@ -68,7 +68,7 @@ private object ImplicitOrderingSuite {
6868
"rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined"))
6969
}
7070

71-
def otherRDDMethodExpectations(rdd: RDD[Int]) = {
71+
def otherRDDMethodExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
7272
List((rdd.groupBy(x => x).keyOrdering.isDefined,
7373
"rdd.groupBy(x => x).keyOrdering.isDefined"),
7474
(rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty,
@@ -82,4 +82,4 @@ private object ImplicitOrderingSuite {
8282
(rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined,
8383
"rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined"))
8484
}
85-
}
85+
}

core/src/test/scala/org/apache/spark/JobCancellationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
188188
val rdd = sc.parallelize(1 to 10, 2).map { i =>
189189
JobCancellationSuite.twoJobsSharingStageSemaphore.acquire()
190190
(i, i)
191-
}.reduceByKey(_+_)
191+
}.reduceByKey(_ + _)
192192
val f1 = rdd.collectAsync()
193193
val f2 = rdd.countAsync()
194194

core/src/test/scala/org/apache/spark/LocalSparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
3737
super.afterEach()
3838
}
3939

40-
def resetSparkContext() = {
40+
def resetSparkContext(): Unit = {
4141
LocalSparkContext.stop(sc)
4242
sc = null
4343
}
@@ -54,7 +54,7 @@ object LocalSparkContext {
5454
}
5555

5656
/** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
57-
def withSpark[T](sc: SparkContext)(f: SparkContext => T) = {
57+
def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = {
5858
try {
5959
f(sc)
6060
} finally {

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
9292
test("RangePartitioner for keys that are not Comparable (but with Ordering)") {
9393
// Row does not extend Comparable, but has an implicit Ordering defined.
9494
implicit object RowOrdering extends Ordering[Row] {
95-
override def compare(x: Row, y: Row) = x.value - y.value
95+
override def compare(x: Row, y: Row): Int = x.value - y.value
9696
}
9797

9898
val rdd = sc.parallelize(1 to 4500).map(x => (Row(x), Row(x)))
@@ -212,20 +212,24 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
212212
val arrPairs: RDD[(Array[Int], Int)] =
213213
sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x))
214214

215-
assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
215+
def verify(testFun: => Unit): Unit = {
216+
intercept[SparkException](testFun).getMessage.contains("array")
217+
}
218+
219+
verify(arrs.distinct())
216220
// We can't catch all usages of arrays, since they might occur inside other collections:
217221
// assert(fails { arrPairs.distinct() })
218-
assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
219-
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
220-
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
221-
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
222-
assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
223-
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
224-
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
225-
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
226-
assert(intercept[SparkException]{ arrPairs.cogroup(arrPairs) }.getMessage.contains("array"))
227-
assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
228-
assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
222+
verify(arrPairs.partitionBy(new HashPartitioner(2)))
223+
verify(arrPairs.join(arrPairs))
224+
verify(arrPairs.leftOuterJoin(arrPairs))
225+
verify(arrPairs.rightOuterJoin(arrPairs))
226+
verify(arrPairs.fullOuterJoin(arrPairs))
227+
verify(arrPairs.groupByKey())
228+
verify(arrPairs.countByKey())
229+
verify(arrPairs.countByKeyApprox(1))
230+
verify(arrPairs.cogroup(arrPairs))
231+
verify(arrPairs.reduceByKeyLocally(_ + _))
232+
verify(arrPairs.reduceByKey(_ + _))
229233
}
230234

231235
test("zero-length partitions should be correctly handled") {

core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
3636
conf.set("spark.ssl.keyPassword", "password")
3737
conf.set("spark.ssl.trustStore", trustStorePath)
3838
conf.set("spark.ssl.trustStorePassword", "password")
39-
conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
39+
conf.set("spark.ssl.enabledAlgorithms",
40+
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
4041
conf.set("spark.ssl.protocol", "SSLv3")
4142

4243
val opts = SSLOptions.parse(conf, "spark.ssl")
@@ -52,7 +53,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
5253
assert(opts.keyStorePassword === Some("password"))
5354
assert(opts.keyPassword === Some("password"))
5455
assert(opts.protocol === Some("SSLv3"))
55-
assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
56+
assert(opts.enabledAlgorithms ===
57+
Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
5658
}
5759

5860
test("test resolving property with defaults specified ") {
@@ -66,7 +68,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
6668
conf.set("spark.ssl.keyPassword", "password")
6769
conf.set("spark.ssl.trustStore", trustStorePath)
6870
conf.set("spark.ssl.trustStorePassword", "password")
69-
conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
71+
conf.set("spark.ssl.enabledAlgorithms",
72+
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
7073
conf.set("spark.ssl.protocol", "SSLv3")
7174

7275
val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
@@ -83,7 +86,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
8386
assert(opts.keyStorePassword === Some("password"))
8487
assert(opts.keyPassword === Some("password"))
8588
assert(opts.protocol === Some("SSLv3"))
86-
assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
89+
assert(opts.enabledAlgorithms ===
90+
Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
8791
}
8892

8993
test("test whether defaults can be overridden ") {
@@ -99,7 +103,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
99103
conf.set("spark.ssl.keyPassword", "password")
100104
conf.set("spark.ssl.trustStore", trustStorePath)
101105
conf.set("spark.ssl.trustStorePassword", "password")
102-
conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
106+
conf.set("spark.ssl.enabledAlgorithms",
107+
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
103108
conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF")
104109
conf.set("spark.ssl.protocol", "SSLv3")
105110

0 commit comments

Comments
 (0)