From b99d26407598423fb5cf9e0a6863e9409bd508cb Mon Sep 17 00:00:00 2001 From: SherlockYang72 Date: Thu, 8 Oct 2015 11:02:36 +0800 Subject: [PATCH 1/7] = --- .../org/apache/spark/graphx/GraphOps.scala | 8 +++ .../lib/GlobalClusteringCoefficient.scala | 50 +++++++++++++++++++ .../spark/graphx/lib/SampleTriangle.scala | 30 +++++++++++ 3 files changed, 88 insertions(+) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/GlobalClusteringCoefficient.scala create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 377d9d6bd..9e942c2de 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -372,4 +372,12 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] = { StronglyConnectedComponents.run(graph, numIter) } + + def globalClusteringCoefficient(): Float = { + GlobalClusteringCoefficient.run(graph) + } + + def sampleTriangle(p: Double): Double = { + SampleTriangle.run(graph, p) + } } // end of GraphOps diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/GlobalClusteringCoefficient.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/GlobalClusteringCoefficient.scala new file mode 100644 index 000000000..3b67d2b25 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/GlobalClusteringCoefficient.scala @@ -0,0 +1,50 @@ +package org.apache.spark.graphx.lib + +import org.apache.spark.graphx._ + +import scala.reflect.ClassTag + +/** + * Created by SherlockYang. + * calculate global clustering coefficient for a given graph + * GCC = (3 * number_of_triangles) / number_of_connected_vertex_triplets + * Note: directions of edges will be ignored + */ +object GlobalClusteringCoefficient { + + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Float = { + // Remove redundant edges + val g = graph.groupEdges((a, b) => a).cache() + + // count number of connected vertex triplets + var numberOfTriplets: Int = 0 + g.collectNeighborIds(EdgeDirection.Either).collect().foreach { case (vid, nbrs) => + val set = new VertexSet(4) + var i = 0 + while (i < nbrs.size) { + // prevent self cycle + if(nbrs(i) != vid) { + set.add(nbrs(i)) + } + i += 1 + } + numberOfTriplets += set.size + } + + // count triangles + var numberOfTriangles: Int = 0 + val triangleCounter = graph.triangleCount() + val verts = triangleCounter.vertices + verts.collect().foreach { case (vid, count) => + numberOfTriangles += count + } + numberOfTriangles /= 3 + + println("[GCC] #triangles: " + numberOfTriangles) + println("[GCC] #triplets: " + numberOfTriplets) + if (numberOfTriplets == 0) + 0 + else + 3 * numberOfTriangles.toFloat / numberOfTriplets + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala new file mode 100644 index 000000000..bcb24f362 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala @@ -0,0 +1,30 @@ +package org.apache.spark.graphx.lib + +import org.apache.spark.graphx.Graph + +import scala.reflect.ClassTag + +/** + * Created by SherlockYang. + */ +object SampleTriangle { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED], p: Double): Double = { + // Remove redundant edges + val g = graph.groupEdges((a, b) => a).cache() + + // sample graph + scala.util.Random.setSeed(745623) + val subgraph = g.subgraph(epred = (edge) => scala.util.Random.nextFloat() <= p) + + // count triangles in subgraph + val triangleCount = subgraph.triangleCount() + val verts = triangleCount.vertices + + // approximation + var res: Double = 0 + verts.collect().foreach { case (vid, count) => + res += count / (p * p * p) + } + res / 3 + } +} From 662b2deb57bbfa44708a78d0d3ce972a9c29c692 Mon Sep 17 00:00:00 2001 From: SherlockYang72 Date: Mon, 12 Oct 2015 14:30:40 +0800 Subject: [PATCH 2/7] =add local clustering coefficient computation --- .../org/apache/spark/graphx/GraphOps.scala | 4 + .../lib/LocalClusteringCoefficient.scala | 113 ++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficient.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 9e942c2de..cc93dffb5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -380,4 +380,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali def sampleTriangle(p: Double): Double = { SampleTriangle.run(graph, p) } + + def localClusteringCoefficient(): Graph[Double, ED] = { + LocalClusteringCoefficient.run(graph) + } } // end of GraphOps diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficient.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficient.scala new file mode 100644 index 000000000..b4ef1b6b0 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficient.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.apache.spark.graphx._ + +import scala.reflect.ClassTag + +import scala.collection.mutable.ListBuffer + +/** + * Local clustering coefficient algorithm + * + * In a directed graph G=(V, E), we define the neighbourhood N_i of a vertex v_i as + * N_i={v_j: e_ij \in E or e_ji \in E} + * + * The local clustering coefficient C_i of a vertex v_i is then defined as + * C_i = |{e_jk: v_j, v_k \in N_i, e_jk \in E}| / (K_i * (K_i - 1)) + * where K_i=|N_i| is the number of neighbors of v_i + * + * Note that the input graph must have been partitioned using [[org.apache.spark.graphx.Graph#partitionBy]]. + */ +object LocalClusteringCoefficient { + /** + * Compute the local clustering coefficient for each vertex and + * return a graph with vertex value representing the local clustering coefficient of that vertex + * + * @param graph the graph for which to compute the connected components + * + * @return a graph with vertex attributes containing the local clustering coefficient of that vertex + * + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Double, ED] = { + // Remove redundant edges + val g = graph.groupEdges((a, b) => a).cache() + + // Construct set representations of the neighborhoods + // () + val nbrSets: VertexRDD[VertexSet] = + g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => + val set = new VertexSet(4) + var i = 0 + while (i < nbrs.size) { + // prevent self cycle + if(nbrs(i) != vid) { + set.add(nbrs(i)) + } + i += 1 + } + set + } + + // join the sets with the graph + val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) { + (vid, _, optSet) => optSet.getOrElse(null) + } + + // Edge function computes intersection of smaller vertex with larger vertex + def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexId, Double)] = { + assert(et.srcAttr != null) + assert(et.dstAttr != null) + val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) { + (et.srcAttr, et.dstAttr) + } else { + (et.dstAttr, et.srcAttr) + } + val iter = smallSet.iterator + val buf = new ListBuffer[(VertexId, Double)] + while (iter.hasNext) { + val vid = iter.next() + if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { + buf += ((vid, 1.0)) + } + } + buf.toIterator + } + + // compute the intersection along edges + val counters: VertexRDD[Double] = setGraph.mapReduceTriplets(edgeFunc, _ + _) + + // count number of neighbors for each vertex + var nbNumMap = Map[VertexId, Int]() + nbrSets.collect().foreach { case (vid, nbVal) => + nbNumMap += (vid -> nbVal.size) + } + + // Merge counters with the graph + g.outerJoinVertices(counters) { + (vid, _, optCounter: Option[Double]) => + val dblCount: Double = optCounter.getOrElse(0) + val nbNum = nbNumMap(vid) + if (nbNum > 1) + dblCount / (nbNum * (nbNum - 1)) + else + 0 + } + } +} From 6fa04a155e838cda3014bce1ef9668b26d6f39d0 Mon Sep 17 00:00:00 2001 From: SherlockYang72 Date: Mon, 12 Oct 2015 14:35:45 +0800 Subject: [PATCH 3/7] = --- .../org/apache/spark/graphx/GraphOps.scala | 13 ++--- .../lib/GlobalClusteringCoefficient.scala | 50 ------------------- .../spark/graphx/lib/SampleTriangle.scala | 30 ----------- 3 files changed, 5 insertions(+), 88 deletions(-) delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/GlobalClusteringCoefficient.scala delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index cc93dffb5..1ad50d201 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -373,14 +373,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali StronglyConnectedComponents.run(graph, numIter) } - def globalClusteringCoefficient(): Float = { - GlobalClusteringCoefficient.run(graph) - } - - def sampleTriangle(p: Double): Double = { - SampleTriangle.run(graph, p) - } - + /** + * Compute the local clustering coefficient for each vertex + * + * @see [[org.apache.spark.graphx.lib.LocalClusteringCoefficient#run]] + */ def localClusteringCoefficient(): Graph[Double, ED] = { LocalClusteringCoefficient.run(graph) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/GlobalClusteringCoefficient.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/GlobalClusteringCoefficient.scala deleted file mode 100644 index 3b67d2b25..000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/GlobalClusteringCoefficient.scala +++ /dev/null @@ -1,50 +0,0 @@ -package org.apache.spark.graphx.lib - -import org.apache.spark.graphx._ - -import scala.reflect.ClassTag - -/** - * Created by SherlockYang. - * calculate global clustering coefficient for a given graph - * GCC = (3 * number_of_triangles) / number_of_connected_vertex_triplets - * Note: directions of edges will be ignored - */ -object GlobalClusteringCoefficient { - - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Float = { - // Remove redundant edges - val g = graph.groupEdges((a, b) => a).cache() - - // count number of connected vertex triplets - var numberOfTriplets: Int = 0 - g.collectNeighborIds(EdgeDirection.Either).collect().foreach { case (vid, nbrs) => - val set = new VertexSet(4) - var i = 0 - while (i < nbrs.size) { - // prevent self cycle - if(nbrs(i) != vid) { - set.add(nbrs(i)) - } - i += 1 - } - numberOfTriplets += set.size - } - - // count triangles - var numberOfTriangles: Int = 0 - val triangleCounter = graph.triangleCount() - val verts = triangleCounter.vertices - verts.collect().foreach { case (vid, count) => - numberOfTriangles += count - } - numberOfTriangles /= 3 - - println("[GCC] #triangles: " + numberOfTriangles) - println("[GCC] #triplets: " + numberOfTriplets) - if (numberOfTriplets == 0) - 0 - else - 3 * numberOfTriangles.toFloat / numberOfTriplets - } -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala deleted file mode 100644 index bcb24f362..000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala +++ /dev/null @@ -1,30 +0,0 @@ -package org.apache.spark.graphx.lib - -import org.apache.spark.graphx.Graph - -import scala.reflect.ClassTag - -/** - * Created by SherlockYang. - */ -object SampleTriangle { - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED], p: Double): Double = { - // Remove redundant edges - val g = graph.groupEdges((a, b) => a).cache() - - // sample graph - scala.util.Random.setSeed(745623) - val subgraph = g.subgraph(epred = (edge) => scala.util.Random.nextFloat() <= p) - - // count triangles in subgraph - val triangleCount = subgraph.triangleCount() - val verts = triangleCount.vertices - - // approximation - var res: Double = 0 - verts.collect().foreach { case (vid, count) => - res += count / (p * p * p) - } - res / 3 - } -} From 93141c089bf581929f7079cccaa5f034377513fb Mon Sep 17 00:00:00 2001 From: SherlockYang72 Date: Mon, 12 Oct 2015 15:18:38 +0800 Subject: [PATCH 4/7] =add network sampling algorithms --- .../org/apache/spark/graphx/GraphOps.scala | 5 ++++ .../spark/graphx/lib/SampleTriangle.scala | 30 +++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 1ad50d201..8065563cd 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -373,6 +373,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali StronglyConnectedComponents.run(graph, numIter) } + + def sampleTriangle(p: Double): Double = { + SampleTriangle.run(graph, p) + } + /** * Compute the local clustering coefficient for each vertex * diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala new file mode 100644 index 000000000..5c4641e74 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala @@ -0,0 +1,30 @@ +package org.apache.spark.graphx.lib + +import org.apache.spark.graphx.Graph + +import scala.reflect.ClassTag + +/** + * Created by SherlockYang. + */ +object SampleTriangle { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED], p: Double): Double = { + // Remove redundant edges + val g = graph.groupEdges((a, b) => a).cache() + + // sample graph + scala.util.Random.setSeed(745623) + val subgraph = g.subgraph(epred = (edge) => scala.util.Random.nextFloat() <= p) + + // count triangles in subgraph + val triangleCount = subgraph.triangleCount() + val verts = triangleCount.vertices + + // approximation + var res: Double = 0 + verts.collect().foreach { case (vid, count) => + res += count / (p * p * p) + } + res / 3 + } +} \ No newline at end of file From 7a6e4d068f88a04d7cc1c4533d49f5aefef8b94b Mon Sep 17 00:00:00 2001 From: SherlockYang72 Date: Mon, 12 Oct 2015 15:25:28 +0800 Subject: [PATCH 5/7] =fix mistake commits --- .../org/apache/spark/graphx/GraphOps.scala | 5 ---- .../spark/graphx/lib/SampleTriangle.scala | 30 ------------------- 2 files changed, 35 deletions(-) delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 8065563cd..1ad50d201 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -373,11 +373,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali StronglyConnectedComponents.run(graph, numIter) } - - def sampleTriangle(p: Double): Double = { - SampleTriangle.run(graph, p) - } - /** * Compute the local clustering coefficient for each vertex * diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala deleted file mode 100644 index 5c4641e74..000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SampleTriangle.scala +++ /dev/null @@ -1,30 +0,0 @@ -package org.apache.spark.graphx.lib - -import org.apache.spark.graphx.Graph - -import scala.reflect.ClassTag - -/** - * Created by SherlockYang. - */ -object SampleTriangle { - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED], p: Double): Double = { - // Remove redundant edges - val g = graph.groupEdges((a, b) => a).cache() - - // sample graph - scala.util.Random.setSeed(745623) - val subgraph = g.subgraph(epred = (edge) => scala.util.Random.nextFloat() <= p) - - // count triangles in subgraph - val triangleCount = subgraph.triangleCount() - val verts = triangleCount.vertices - - // approximation - var res: Double = 0 - verts.collect().foreach { case (vid, count) => - res += count / (p * p * p) - } - res / 3 - } -} \ No newline at end of file From c09238ed94f86a2006fce18e550c22d81f7a1d5f Mon Sep 17 00:00:00 2001 From: SherlockYang72 Date: Wed, 14 Oct 2015 10:23:50 +0800 Subject: [PATCH 6/7] =fix codestyle bug --- .../graphx/lib/LocalClusteringCoefficient.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficient.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficient.scala index b4ef1b6b0..5421d4021 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficient.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficient.scala @@ -33,7 +33,8 @@ import scala.collection.mutable.ListBuffer * C_i = |{e_jk: v_j, v_k \in N_i, e_jk \in E}| / (K_i * (K_i - 1)) * where K_i=|N_i| is the number of neighbors of v_i * - * Note that the input graph must have been partitioned using [[org.apache.spark.graphx.Graph#partitionBy]]. + * Note that the input graph must have been partitioned using + * [[org.apache.spark.graphx.Graph#partitionBy]]. */ object LocalClusteringCoefficient { /** @@ -42,7 +43,8 @@ object LocalClusteringCoefficient { * * @param graph the graph for which to compute the connected components * - * @return a graph with vertex attributes containing the local clustering coefficient of that vertex + * @return a graph with vertex attributes containing + * the local clustering coefficient of that vertex * */ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Double, ED] = { @@ -104,10 +106,12 @@ object LocalClusteringCoefficient { (vid, _, optCounter: Option[Double]) => val dblCount: Double = optCounter.getOrElse(0) val nbNum = nbNumMap(vid) - if (nbNum > 1) + if (nbNum > 1) { dblCount / (nbNum * (nbNum - 1)) - else + } + else { 0 + } } } } From 37535243607add1a59622a9580d63811fc4cc892 Mon Sep 17 00:00:00 2001 From: SherlockYang72 Date: Wed, 14 Oct 2015 15:47:41 +0800 Subject: [PATCH 7/7] =add unit test --- .../lib/LocalClusteringCoefficientSuite.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficientSuite.scala diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficientSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficientSuite.scala new file mode 100644 index 000000000..fe0263e79 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/LocalClusteringCoefficientSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.graphx.lib + +import org.apache.spark.graphx._ +import org.scalatest.FunSuite + +class LocalClusteringCoefficientSuite extends FunSuite with LocalSparkContext { + + def approEqual(a: Double, b: Double): Boolean = { + (a - b).abs < 1e-20 + } + + test("test in a complete graph") { + withSpark { sc => + val edges = Array( 0L->1L, 1L->2L, 2L->0L ) + val revEdges = edges.map { case (a, b) => (b, a) } + val rawEdges = sc.parallelize(edges ++ revEdges, 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val lccCount = graph.localClusteringCoefficient() + val verts = lccCount.vertices + verts.collect.foreach { case (_, count) => assert(approEqual(count, 1.0)) } + } + } + + test("test in a triangle with one bi-directed edges") { + withSpark { sc => + val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->1L, 2L->0L ), 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val lccCount = graph.localClusteringCoefficient() + val verts = lccCount.vertices + verts.collect.foreach { case (vid, count) => + if (vid == 0) { + assert(approEqual(count, 1.0)) + } else { + assert(approEqual(count, 0.5)) + } + } + } + } + + test("test in a graph with only self-edges") { + withSpark { sc => + val rawEdges = sc.parallelize(Array(0L -> 0L, 1L -> 1L, 2L -> 2L, 3L -> 3L), 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val lccCount = graph.localClusteringCoefficient() + val verts = lccCount.vertices + verts.collect.foreach { case (vid, count) => assert(approEqual(count, 0.0)) } + } + } + + test("test in a graph with duplicated edges") { + withSpark { sc => + val edges = Array( 0L->1L, 1L->2L, 2L->0L ) + val rawEdges = sc.parallelize(edges ++ edges, 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val lccCount = graph.localClusteringCoefficient() + val verts = lccCount.vertices + verts.collect.foreach { case (vid, count) => assert(approEqual(count, 1.0)) } + } + } +}