Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object BroadcastTest {
val arr1 = (0 until num).toArray

for (i <- 0 until 3) {
println("Iteration " + i)
println(s"Iteration $i")
println("===========")
val startTime = System.nanoTime
val barr1 = sc.broadcast(arr1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,10 @@ object DFSReadWriteTest {
}

private def printUsage(): Unit = {
val usage: String = "DFS Read-Write Test\n" +
"\n" +
"Usage: localFile dfsDir\n" +
"\n" +
"localFile - (string) local file to use in test\n" +
"dfsDir - (string) DFS directory for read/write tests\n"
val usage = """DFS Read-Write Test
|Usage: localFile dfsDir
|localFile - (string) local file to use in test
|dfsDir - (string) DFS directory for read/write tests""".stripMargin

println(usage)
}
Expand All @@ -69,13 +67,13 @@ object DFSReadWriteTest {

localFilePath = new File(args(i))
if (!localFilePath.exists) {
System.err.println("Given path (" + args(i) + ") does not exist.\n")
System.err.println(s"Given path (${args(i)}) does not exist")
printUsage()
System.exit(1)
}

if (!localFilePath.isFile) {
System.err.println("Given path (" + args(i) + ") is not a file.\n")
System.err.println(s"Given path (${args(i)}) is not a file")
printUsage()
System.exit(1)
}
Expand Down Expand Up @@ -108,7 +106,7 @@ object DFSReadWriteTest {
.getOrCreate()

println("Writing local file to DFS")
val dfsFilename = dfsDirPath + "/dfs_read_write_test"
val dfsFilename = s"$dfsDirPath/dfs_read_write_test"
val fileRDD = spark.sparkContext.parallelize(fileContents)
fileRDD.saveAsTextFile(dfsFilename)

Expand All @@ -127,11 +125,11 @@ object DFSReadWriteTest {
spark.stop()

if (localWordCount == dfsWordCount) {
println(s"Success! Local Word Count ($localWordCount) " +
s"and DFS Word Count ($dfsWordCount) agree.")
println(s"Success! Local Word Count $localWordCount and " +
s"DFS Word Count $dfsWordCount agree.")
} else {
println(s"Failure! Local Word Count ($localWordCount) " +
s"and DFS Word Count ($dfsWordCount) disagree.")
println(s"Failure! Local Word Count $localWordCount " +
s"and DFS Word Count $dfsWordCount disagree.")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object HdfsTest {
val start = System.currentTimeMillis()
for (x <- mapped) { x + 2 }
val end = System.currentTimeMillis()
println("Iteration " + iter + " took " + (end-start) + " ms")
println(s"Iteration $iter took ${end-start} ms")
}
spark.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ object LocalALS {
println(s"Iteration $iter:")
ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
println("RMSE = " + rmse(R, ms, us))
println()
println(s"RMSE = ${rmse(R, ms, us)}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ object LocalFileLR {

// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = DenseVector.zeros[Double](D)
for (p <- points) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
Expand All @@ -71,7 +71,7 @@ object LocalFileLR {
}

fileSrc.close()
println("Final w: " + w)
println(s"Final w: $w")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object LocalKMeans {
kPoints.put(i, iter.next())
}

println("Initial centers: " + kPoints)
println(s"Initial centers: $kPoints")

while(tempDist > convergeDist) {
val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
Expand All @@ -114,7 +114,7 @@ object LocalKMeans {
}
}

println("Final centers: " + kPoints)
println(s"Final centers: $kPoints")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ object LocalLR {
val data = generateData
// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = DenseVector.zeros[Double](D)
for (p <- data) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
Expand All @@ -73,7 +73,7 @@ object LocalLR {
w -= gradient
}

println("Final w: " + w)
println(s"Final w: $w")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object LocalPi {
val y = random * 2 - 1
if (x*x + y*y <= 1) count += 1
}
println("Pi is roughly " + 4 * count / 100000.0)
println(s"Pi is roughly ${4 * count / 100000.0}")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object SimpleSkewedGroupByTest {
// Enforce that everything has been calculated and in cache
pairs1.count

println("RESULT: " + pairs1.groupByKey(numReducers).count)
println(s"RESULT: ${pairs1.groupByKey(numReducers).count}")
// Print how many keys each reducer got (for debugging)
// println("RESULT: " + pairs1.groupByKey(numReducers)
// .map{case (k,v) => (k, v.size)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,8 @@ object SparkALS {
.map(i => update(i, usb.value(i), msb.value, Rc.value.transpose()))
.collect()
usb = sc.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))
println()
println(s"RMSE = ${rmse(R, ms, us)}")
}

spark.stop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ object SparkHdfsLR {

// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = points.map { p =>
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}

println("Final w: " + w)
println(s"Final w: $w")
spark.stop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object SparkKMeans {
for (newP <- newPoints) {
kPoints(newP._1) = newP._2
}
println("Finished iteration (delta = " + tempDist + ")")
println(s"Finished iteration (delta = $tempDist)")
}

println("Final centers:")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ object SparkLR {

// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = points.map { p =>
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}

println("Final w: " + w)
println(s"Final w: $w")

spark.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object SparkPageRank {
}

val output = ranks.collect()
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
output.foreach(tup => println(s"${tup._1} has rank: ${tup._2} ."))

spark.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object SparkPi {
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / (n - 1))
println(s"Pi is roughly ${4.0 * count / (n - 1)}")
spark.stop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object SparkTC {
nextCount = tc.count()
} while (nextCount != oldCount)

println("TC has " + tc.count() + " edges.")
println(s"TC has ${tc.count()} edges.")
spark.stop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@ import org.apache.spark.graphx.lib._
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel


/**
* Driver program for running graph algorithms.
*/
object Analytics extends Logging {

def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println(
"Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]")
System.err.println("Supported 'taskType' as follows:")
System.err.println(" pagerank Compute PageRank")
System.err.println(" cc Compute the connected components of vertices")
System.err.println(" triangles Count the number of triangles")
val usage = """Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions>
|[other options] Supported 'taskType' as follows:
|pagerank Compute PageRank
|cc Compute the connected components of vertices
|triangles Count the number of triangles""".stripMargin
System.err.println(usage)
System.exit(1)
}

Expand All @@ -48,7 +49,7 @@ object Analytics extends Logging {
val optionsList = args.drop(2).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
case _ => throw new IllegalArgumentException(s"Invalid argument: $arg")
}
}
val options = mutable.Map(optionsList: _*)
Expand All @@ -74,68 +75,68 @@ object Analytics extends Logging {
val numIterOpt = options.remove("numIter").map(_.toInt)

options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}

println("======================================")
println("| PageRank |")
println("======================================")

val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
val sc = new SparkContext(conf.setAppName(s"PageRank($fname)"))

val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

println("GRAPHX: Number of vertices " + graph.vertices.count)
println("GRAPHX: Number of edges " + graph.edges.count)
println(s"GRAPHX: Number of vertices ${graph.vertices.count}")
println(s"GRAPHX: Number of edges ${graph.edges.count}")

val pr = (numIterOpt match {
case Some(numIter) => PageRank.run(graph, numIter)
case None => PageRank.runUntilConvergence(graph, tol)
}).vertices.cache()

println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
println(s"GRAPHX: Total rank: ${pr.map(_._2).reduce(_ + _)}")

if (!outFname.isEmpty) {
logWarning("Saving pageranks of pages to " + outFname)
logWarning(s"Saving pageranks of pages to $outFname")
pr.map { case (id, r) => id + "\t" + r }.saveAsTextFile(outFname)
}

sc.stop()

case "cc" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}

println("======================================")
println("| Connected Components |")
println("======================================")

val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
val sc = new SparkContext(conf.setAppName(s"ConnectedComponents($fname)"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

val cc = ConnectedComponents.run(graph)
println("Components: " + cc.vertices.map { case (vid, data) => data }.distinct())
println(s"Components: ${cc.vertices.map { case (vid, data) => data }.distinct()}")
sc.stop()

case "triangles" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}

println("======================================")
println("| Triangle Count |")
println("======================================")

val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
val sc = new SparkContext(conf.setAppName(s"TriangleCount($fname)"))
val graph = GraphLoader.edgeListFile(sc, fname,
canonicalOrientation = true,
numEdgePartitions = numEPart,
Expand Down