Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/SynthBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package org.apache.spark.graphx.lib

import org.apache.spark.SparkContext._
import org.apache.spark.graphx.PartitionStrategy
import org.apache.spark.graphx.PartitionStrategy.{CanonicalRandomVertexCut, EdgePartition2D, EdgePartition1D, RandomVertexCut}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx.util.GraphGenerators
import java.io.{PrintWriter, FileOutputStream}

/**
* Created by jegonzal on 5/5/14.
*/
object SynthBenchmark {

def pickPartitioner(v: String): PartitionStrategy = {
// TODO: Use reflection rather than listing all the partitioning strategies here.
v match {
case "RandomVertexCut" => RandomVertexCut
case "EdgePartition1D" => EdgePartition1D
case "EdgePartition2D" => EdgePartition2D
case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v)
}
}


def main(args: Array[String]): Unit = {
val options = args.map {
arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}

var host: String = null
var app = "pagerank"
var niter = 10
var numVertices = 1000000
var numEPart: Option[Int] = None
var partitionStrategy: Option[PartitionStrategy] = None
var mu: Double = 4.0
var sigma: Double = 1.3
var degFile: String = ""


options.foreach {
case ("host", v) => host = v
case ("app", v) => app = v
case ("niter", v) => niter = v.toInt
case ("nverts", v) => numVertices = v.toInt
case ("numEPart", v) => numEPart = Some(v.toInt)
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case ("mu", v) => mu = v.toDouble
case ("sigma", v) => sigma = v.toDouble
case ("degFile", v) => degFile = v
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}

if (host == null) {
println("No -host option specified!")
System.exit(1)
}

val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")

val sc = new SparkContext(host, s"GraphX Synth Benchmark (nverts = $numVertices)", conf)

// Create the graph
var graph = GraphGenerators.logNormalGraph(sc, numVertices, numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
// Repartition the graph
if (!partitionStrategy.isEmpty) {
graph = graph.partitionBy(partitionStrategy.get)
}
graph.cache

var startTime = System.currentTimeMillis()
val numEdges = graph.edges.count()
println(s"Num Vertices: $numVertices")
println(s"Num Edges: $numEdges}")
val loadTime = System.currentTimeMillis() - startTime

// Collect the degree distribution (if desired)
if (!degFile.isEmpty) {
val fos = new FileOutputStream(degFile)
val pos = new PrintWriter(fos)
val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0))
.map(p => p._2).countByValue()
hist.foreach {
case (deg, count) => pos.println(s"$deg \t $count")
}
}

// Run PageRank
startTime = System.currentTimeMillis()
if (app == "pagerank") {
println("Running PageRank")
val totalPR = graph.staticPageRank(niter).vertices.map(p => p._2).sum
println(s"Total pagerank = $totalPR")
} else if (app == "cc") {
println("Connected Components")
val maxCC = graph.staticPageRank(niter).vertices.map(v => v._2).reduce((a,b)=>math.max(a,b))
println(s"Max CC = $maxCC")
}
val runTime = System.currentTimeMillis() - startTime

sc.stop
println(s"Num Vertices: $numVertices")
println(s"Num Edges: $numEdges")
println(s"Load time: ${loadTime/1000.0} seconds")
println(s"Run time: ${runTime/1000.0} seconds")

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,41 @@ object GraphGenerators {
val RMATc = 0.15
val RMATd = 0.25

// Right now it just generates a bunch of edges where
// the edge data is the weight (default 1)
/**
* Generate a graph whose vertex out degree is log normal.
*
* The default values for mu and sigma are taken from the Pregel paper:
*
* Grzegorz Malewicz, Matthew H. Austern, Aart J.C Bik, James C. Dehnert,
* Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010.
* Pregel: a system for large-scale graph processing. SIGMOD '10.
*
* @param sc
* @param numVertices
* @param mu
* @param sigma
* @return
*/
def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = {
// based on Pregel settings
val mu = 4
val sigma = 1.3

val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{
src => (src, sampleLogNormal(mu, sigma, numVertices))
def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int,
mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = {
val vertices: RDD[(VertexId, Long)] = sc.parallelize(0 until numVertices, numEParts).map { src =>
// Initialize the random number generator with the source vertex id
val rand = new Random(src)
val degree: Long = math.min(numVertices.toLong, math.exp(rand.nextGaussian()*sigma + mu).toLong)
(src.toLong, degree)
}
val edges = vertices.flatMap { v =>
generateRandomEdges(v._1.toInt, v._2, numVertices)
val edges: RDD[Edge[Int]] = vertices.flatMap { case (src, degree) =>
new Iterator[Edge[Int]] {
// Initialize the random number generator with the source vertex id
val rand = new Random(src)
var i = 0
override def hasNext(): Boolean = { i < degree }
override def next(): Edge[Int] = {
val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i)
i += 1
nextEdge
}
}
}
Graph(vertices, edges, 0)
}
Expand Down