Skip to content

Commit c6e339d

Browse files
srowenJames Z.M. Gao
authored andcommitted
SPARK 1084.1 (resubmitted)
(Ported from https://github.com/apache/incubator-spark/pull/637 ) Author: Sean Owen <[email protected]> Closes apache#31 from srowen/SPARK-1084.1 and squashes the following commits: 6c4a32c [Sean Owen] Suppress warnings about legitimate unchecked array creations, or change code to avoid it f35b833 [Sean Owen] Fix two misc javadoc problems 254e8ef [Sean Owen] Fix one new style error introduced in scaladoc warning commit 5b2fce2 [Sean Owen] Fix scaladoc invocation warning, and enable javac warnings properly, with plugin config updates 007762b [Sean Owen] Remove dead scaladoc links b8ff8cb [Sean Owen] Replace deprecated Ant <tasks> with <target> Conflicts: bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala core/src/main/scala/org/apache/spark/util/StatCounter.scala streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
1 parent d8fc8e0 commit c6e339d

File tree

15 files changed

+177
-103
lines changed

15 files changed

+177
-103
lines changed

bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,24 @@ object Bagel extends Logging {
2727

2828
/**
2929
* Runs a Bagel program.
30-
* @param sc [[org.apache.spark.SparkContext]] to use for the program.
31-
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
32-
* the vertex id.
33-
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
34-
* empty array, i.e. sc.parallelize(Array[K, Message]()).
35-
* @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
36-
* message before sending (which often involves network I/O).
37-
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
38-
* and provides the result to each vertex in the next superstep.
39-
* @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
30+
* @param sc org.apache.spark.SparkContext to use for the program.
31+
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
32+
* Key will be the vertex id.
33+
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
34+
* this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
35+
* @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
36+
* given vertex into one message before sending (which often involves network
37+
* I/O).
38+
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
39+
* after each superstep and provides the result to each vertex in the next
40+
* superstep.
41+
* @param partitioner org.apache.spark.Partitioner partitions values by key
4042
* @param numPartitions number of partitions across which to split the graph.
4143
* Default is the default parallelism of the SparkContext
42-
* @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
43-
* Defaults to caching in memory.
44-
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
45-
* optional Aggregator and the current superstep,
44+
* @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of
45+
* intermediate RDDs in each superstep. Defaults to caching in memory.
46+
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to
47+
* the Vertex, optional Aggregator and the current superstep,
4648
* and returns a set of (Vertex, outgoing Messages) pairs
4749
* @tparam K key
4850
* @tparam V vertex type
@@ -127,8 +129,8 @@ object Bagel extends Logging {
127129
}
128130

129131
/**
130-
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]]
131-
* and default storage level
132+
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
133+
* org.apache.spark.HashPartitioner and default storage level
132134
*/
133135
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
134136
sc: SparkContext,
@@ -140,7 +142,10 @@ object Bagel extends Logging {
140142
compute: (V, Option[C], Int) => (V, Array[M])
141143
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
142144

143-
/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/
145+
/**
146+
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
147+
* default org.apache.spark.HashPartitioner
148+
*/
144149
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
145150
sc: SparkContext,
146151
vertices: RDD[(K, V)],
@@ -158,7 +163,8 @@ object Bagel extends Logging {
158163
}
159164

160165
/**
161-
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]],
166+
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
167+
* default org.apache.spark.HashPartitioner,
162168
* [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
163169
*/
164170
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -171,7 +177,8 @@ object Bagel extends Logging {
171177
): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
172178

173179
/**
174-
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]]
180+
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
181+
* the default org.apache.spark.HashPartitioner
175182
* and [[org.apache.spark.bagel.DefaultCombiner]]
176183
*/
177184
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](

core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@
231231
</goals>
232232
<configuration>
233233
<exportAntProperties>true</exportAntProperties>
234-
<tasks>
234+
<target>
235235
<property name="spark.classpath" refid="maven.test.classpath" />
236236
<property environment="env" />
237237
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
@@ -244,7 +244,7 @@
244244
</not>
245245
</condition>
246246
</fail>
247-
</tasks>
247+
</target>
248248
</configuration>
249249
</execution>
250250
</executions>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ class SparkContext(
350350
* using the older MapReduce API (`org.apache.hadoop.mapred`).
351351
*
352352
* @param conf JobConf for setting up the dataset
353-
* @param inputFormatClass Class of the [[InputFormat]]
353+
* @param inputFormatClass Class of the InputFormat
354354
* @param keyClass Class of the keys
355355
* @param valueClass Class of the values
356356
* @param minSplits Minimum number of Hadoop Splits to generate.

core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class JobLogger(val user: String, val logDirName: String)
8080
/**
8181
* Create a log file for one job
8282
* @param jobID ID of the job
83-
* @exception FileNotFoundException Fail to create log file
83+
* @throws FileNotFoundException Fail to create log file
8484
*/
8585
protected def createLogWriter(jobID: Int) {
8686
try {

core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import scala.util.control.{ControlThrowable, NonFatal}
2323
import com.typesafe.config.Config
2424

2525
/**
26-
* An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
26+
* An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception
2727
* This is necessary as Spark Executors are allowed to recover from fatal exceptions
28-
* (see [[org.apache.spark.executor.Executor]]).
28+
* (see org.apache.spark.executor.Executor)
2929
*/
3030
object IndestructibleActorSystem {
3131
def apply(name: String, config: Config): ActorSystem =

core/src/main/scala/org/apache/spark/util/StatCounter.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ package org.apache.spark.util
1919

2020
/**
2121
* A class for tracking the statistics of a set of numbers (count, mean and variance) in a
22-
* numerically robust way. Includes support for merging two StatCounters. Based on
23-
* [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]].
22+
* numerically robust way. Includes support for merging two StatCounters. Based on Welford
23+
* and Chan's [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance algorithms]]
24+
* for running variance.
2425
*
2526
* @constructor Initialize the StatCounter with the given values.
2627
*/

core/src/main/scala/org/apache/spark/util/Vector.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ object Vector {
128128

129129
/**
130130
* Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
131-
* between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
131+
* between 0.0 and 1.0. Optional scala.util.Random number generator can be provided.
132132
*/
133133
def random(length: Int, random: Random = new XORShiftRandom()) = Vector(length, _ => random.nextDouble())
134134

core/src/test/scala/org/apache/spark/JavaAPISuite.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ public int compare(Integer a, Integer b) {
7676
else if (a < b) return 1;
7777
else return 0;
7878
}
79-
};
79+
}
8080

81+
@SuppressWarnings("unchecked")
8182
@Test
8283
public void sparkContextUnion() {
8384
// Union of non-specialized JavaRDDs
@@ -149,6 +150,7 @@ public void call(String s) {
149150
Assert.assertEquals(2, foreachCalls);
150151
}
151152

153+
@SuppressWarnings("unchecked")
152154
@Test
153155
public void lookup() {
154156
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
@@ -180,6 +182,7 @@ public Boolean call(Integer x) {
180182
Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
181183
}
182184

185+
@SuppressWarnings("unchecked")
183186
@Test
184187
public void cogroup() {
185188
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
@@ -198,6 +201,7 @@ public void cogroup() {
198201
cogrouped.collect();
199202
}
200203

204+
@SuppressWarnings("unchecked")
201205
@Test
202206
public void leftOuterJoin() {
203207
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
@@ -244,6 +248,7 @@ public Integer call(Integer a, Integer b) {
244248
Assert.assertEquals(33, sum);
245249
}
246250

251+
@SuppressWarnings("unchecked")
247252
@Test
248253
public void foldByKey() {
249254
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
@@ -266,6 +271,7 @@ public Integer call(Integer a, Integer b) {
266271
Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
267272
}
268273

274+
@SuppressWarnings("unchecked")
269275
@Test
270276
public void reduceByKey() {
271277
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
@@ -321,8 +327,8 @@ public void approximateResults() {
321327
public void take() {
322328
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
323329
Assert.assertEquals(1, rdd.first().intValue());
324-
List<Integer> firstTwo = rdd.take(2);
325-
List<Integer> sample = rdd.takeSample(false, 2, 42);
330+
rdd.take(2);
331+
rdd.takeSample(false, 2, 42);
326332
}
327333

328334
@Test
@@ -360,8 +366,8 @@ public Boolean call(Double x) {
360366
Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
361367
Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);
362368

363-
Double first = rdd.first();
364-
List<Double> take = rdd.take(5);
369+
rdd.first();
370+
rdd.take(5);
365371
}
366372

367373
@Test
@@ -439,11 +445,11 @@ public Iterable<Double> call(String s) {
439445
return lengths;
440446
}
441447
});
442-
Double x = doubles.first();
443-
Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
448+
Assert.assertEquals(5.0, doubles.first(), 0.01);
444449
Assert.assertEquals(11, pairs.count());
445450
}
446451

452+
@SuppressWarnings("unchecked")
447453
@Test
448454
public void mapsFromPairsToPairs() {
449455
List<Tuple2<Integer, String>> pairs = Arrays.asList(
@@ -510,6 +516,7 @@ public void repartition() {
510516
}
511517
}
512518

519+
@SuppressWarnings("unchecked")
513520
@Test
514521
public void persist() {
515522
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
@@ -574,6 +581,7 @@ public void textFilesCompressed() throws IOException {
574581
Assert.assertEquals(expected, readRDD.collect());
575582
}
576583

584+
@SuppressWarnings("unchecked")
577585
@Test
578586
public void sequenceFile() {
579587
File tempDir = Files.createTempDir();
@@ -603,6 +611,7 @@ public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
603611
Assert.assertEquals(pairs, readRDD.collect());
604612
}
605613

614+
@SuppressWarnings("unchecked")
606615
@Test
607616
public void writeWithNewAPIHadoopFile() {
608617
File tempDir = Files.createTempDir();
@@ -633,6 +642,7 @@ public String call(Tuple2<IntWritable, Text> x) {
633642
}).collect().toString());
634643
}
635644

645+
@SuppressWarnings("unchecked")
636646
@Test
637647
public void readWithNewAPIHadoopFile() throws IOException {
638648
File tempDir = Files.createTempDir();
@@ -675,6 +685,7 @@ public void objectFilesOfInts() {
675685
Assert.assertEquals(expected, readRDD.collect());
676686
}
677687

688+
@SuppressWarnings("unchecked")
678689
@Test
679690
public void objectFilesOfComplexTypes() {
680691
File tempDir = Files.createTempDir();
@@ -691,6 +702,7 @@ public void objectFilesOfComplexTypes() {
691702
Assert.assertEquals(pairs, readRDD.collect());
692703
}
693704

705+
@SuppressWarnings("unchecked")
694706
@Test
695707
public void hadoopFile() {
696708
File tempDir = Files.createTempDir();
@@ -720,6 +732,7 @@ public String call(Tuple2<IntWritable, Text> x) {
720732
}).collect().toString());
721733
}
722734

735+
@SuppressWarnings("unchecked")
723736
@Test
724737
public void hadoopFileCompressed() {
725738
File tempDir = Files.createTempDir();
@@ -825,7 +838,7 @@ public Float zero(Float initialValue) {
825838
}
826839
};
827840

828-
final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
841+
final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
829842
rdd.foreach(new VoidFunction<Integer>() {
830843
public void call(Integer x) {
831844
floatAccum.add((float) x);
@@ -877,6 +890,7 @@ public void checkpointAndRestore() {
877890
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
878891
}
879892

893+
@SuppressWarnings("unchecked")
880894
@Test
881895
public void mapOnPairRDD() {
882896
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
@@ -901,6 +915,7 @@ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Excepti
901915

902916
}
903917

918+
@SuppressWarnings("unchecked")
904919
@Test
905920
public void collectPartitions() {
906921
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
@@ -969,14 +984,14 @@ public void countApproxDistinctByKey() {
969984
@Test
970985
public void collectAsMapWithIntArrayValues() {
971986
// Regression test for SPARK-1040
972-
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
987+
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
973988
JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
974989
@Override
975990
public Tuple2<Integer, int[]> call(Integer x) throws Exception {
976991
return new Tuple2<Integer, int[]>(x, new int[] { x });
977992
}
978993
});
979994
pairRDD.collect(); // Works fine
980-
Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
995+
pairRDD.collectAsMap(); // Used to crash with ClassCastException
981996
}
982997
}

pom.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -607,12 +607,13 @@
607607
<plugin>
608608
<groupId>org.apache.maven.plugins</groupId>
609609
<artifactId>maven-compiler-plugin</artifactId>
610-
<version>2.5.1</version>
610+
<version>3.1</version>
611611
<configuration>
612612
<source>${java.version}</source>
613613
<target>${java.version}</target>
614614
<encoding>UTF-8</encoding>
615615
<maxmem>1024m</maxmem>
616+
<fork>true</fork>
616617
</configuration>
617618
</plugin>
618619
<plugin>
@@ -627,7 +628,7 @@
627628
<plugin>
628629
<groupId>org.scalatest</groupId>
629630
<artifactId>scalatest-maven-plugin</artifactId>
630-
<version>1.0-M2</version>
631+
<version>1.0-RC2</version>
631632
<configuration>
632633
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
633634
<junitxml>.</junitxml>

repl/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
</goals>
113113
<configuration>
114114
<exportAntProperties>true</exportAntProperties>
115-
<tasks>
115+
<target>
116116
<property name="spark.classpath" refid="maven.test.classpath" />
117117
<property environment="env" />
118118
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
@@ -125,7 +125,7 @@
125125
</not>
126126
</condition>
127127
</fail>
128-
</tasks>
128+
</target>
129129
</configuration>
130130
</execution>
131131
</executions>

0 commit comments

Comments
 (0)