Skip to content

Commit 8b8e70e

Browse files
committed
Merge pull request apache#73 from falaki/ApproximateDistinctCount
Approximate distinct count Added countApproxDistinct() to RDD and countApproxDistinctByKey() to PairRDDFunctions to approximately count distinct number of elements and distinct number of values per key, respectively. Both functions use HyperLogLog from stream-lib for counting. Both functions take a parameter that controls the trade-off between accuracy and memory consumption. Also added Scala docs and test suites for both methods.
2 parents 63b411d + bee445c commit 8b8e70e

File tree

12 files changed

+1595
-233
lines changed

12 files changed

+1595
-233
lines changed

core/pom.xml

Lines changed: 1351 additions & 231 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,42 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
611611
* Return an RDD with the values of each tuple.
612612
*/
613613
def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
614+
615+
/**
616+
* Return approximate number of distinct values for each key in this RDD.
617+
* The accuracy of approximation can be controlled through the relative standard deviation
618+
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
619+
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
620+
* Partitioner to partition the output RDD.
621+
*/
622+
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
623+
rdd.countApproxDistinctByKey(relativeSD, partitioner)
624+
}
625+
626+
/**
627+
* Return approximate number of distinct values for each key this RDD.
628+
* The accuracy of approximation can be controlled through the relative standard deviation
629+
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
630+
* more accurate counts but increase the memory footprint and vise versa. The default value of
631+
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
632+
* level.
633+
*/
634+
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
635+
rdd.countApproxDistinctByKey(relativeSD)
636+
}
637+
638+
639+
/**
640+
* Return approximate number of distinct values for each key in this RDD.
641+
* The accuracy of approximation can be controlled through the relative standard deviation
642+
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
643+
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
644+
* output RDD into numPartitions.
645+
*
646+
*/
647+
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
648+
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
649+
}
614650
}
615651

616652
object JavaPairRDD {

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,4 +444,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
444444
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
445445
takeOrdered(num, comp)
446446
}
447+
448+
/**
449+
* Return approximate number of distinct elements in the RDD.
450+
*
451+
* The accuracy of approximation can be controlled through the relative standard deviation
452+
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
453+
* more accurate counts but increase the memory footprint and vise versa. The default value of
454+
* relativeSD is 0.05.
455+
*/
456+
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
457+
447458
}

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
4040
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
4141
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
4242

43+
import com.clearspring.analytics.stream.cardinality.HyperLogLog
44+
4345
import org.apache.spark._
4446
import org.apache.spark.SparkContext._
4547
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4648
import org.apache.spark.Aggregator
4749
import org.apache.spark.Partitioner
4850
import org.apache.spark.Partitioner.defaultPartitioner
51+
import org.apache.spark.util.SerializableHyperLogLog
4952

5053
/**
5154
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -207,6 +210,45 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
207210
self.map(_._1).countByValueApprox(timeout, confidence)
208211
}
209212

213+
/**
214+
* Return approximate number of distinct values for each key in this RDD.
215+
* The accuracy of approximation can be controlled through the relative standard deviation
216+
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
217+
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
218+
* Partitioner to partition the output RDD.
219+
*/
220+
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
221+
val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
222+
val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
223+
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
224+
225+
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
226+
}
227+
228+
/**
229+
* Return approximate number of distinct values for each key in this RDD.
230+
* The accuracy of approximation can be controlled through the relative standard deviation
231+
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
232+
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
233+
* output RDD into numPartitions.
234+
*
235+
*/
236+
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
237+
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
238+
}
239+
240+
/**
241+
* Return approximate number of distinct values for each key this RDD.
242+
* The accuracy of approximation can be controlled through the relative standard deviation
243+
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
244+
* more accurate counts but increase the memory footprint and vise versa. The default value of
245+
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
246+
* level.
247+
*/
248+
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
249+
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
250+
}
251+
210252
/**
211253
* Merge the values for each key using an associative reduce function. This will also perform
212254
* the merging locally on each mapper before sending results to a reducer, similarly to a

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text
3333
import org.apache.hadoop.mapred.TextOutputFormat
3434

3535
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
36+
import com.clearspring.analytics.stream.cardinality.HyperLogLog
3637

3738
import org.apache.spark.Partitioner._
3839
import org.apache.spark.api.java.JavaRDD
@@ -41,7 +42,7 @@ import org.apache.spark.partial.CountEvaluator
4142
import org.apache.spark.partial.GroupedCountEvaluator
4243
import org.apache.spark.partial.PartialResult
4344
import org.apache.spark.storage.StorageLevel
44-
import org.apache.spark.util.{Utils, BoundedPriorityQueue}
45+
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
4546

4647
import org.apache.spark.SparkContext._
4748
import org.apache.spark._
@@ -788,6 +789,19 @@ abstract class RDD[T: ClassTag](
788789
sc.runApproximateJob(this, countPartition, evaluator, timeout)
789790
}
790791

792+
/**
793+
* Return approximate number of distinct elements in the RDD.
794+
*
795+
* The accuracy of approximation can be controlled through the relative standard deviation
796+
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
797+
* more accurate counts but increase the memory footprint and vise versa. The default value of
798+
* relativeSD is 0.05.
799+
*/
800+
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
801+
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
802+
aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
803+
}
804+
791805
/**
792806
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
793807
* results from that partition to estimate the number of additional partitions needed to satisfy
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
import java.io.{Externalizable, ObjectOutput, ObjectInput}
21+
import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
22+
23+
/**
24+
* A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
25+
*/
26+
private[spark]
27+
class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
28+
29+
def this() = this(null) // For deserialization
30+
31+
def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
32+
33+
def add[T](elem: T) = {
34+
this.value.offer(elem)
35+
this
36+
}
37+
38+
def readExternal(in: ObjectInput) {
39+
val byteLength = in.readInt()
40+
val bytes = new Array[Byte](byteLength)
41+
in.readFully(bytes)
42+
value = HyperLogLog.Builder.build(bytes)
43+
}
44+
45+
def writeExternal(out: ObjectOutput) {
46+
val bytes = value.getBytes()
47+
out.writeInt(bytes.length)
48+
out.write(bytes)
49+
}
50+
}

core/src/test/scala/org/apache/spark/JavaAPISuite.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,4 +930,36 @@ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
930930
parts[1]);
931931
}
932932

933+
@Test
934+
public void countApproxDistinct() {
935+
List<Integer> arrayData = new ArrayList<Integer>();
936+
int size = 100;
937+
for (int i = 0; i < 100000; i++) {
938+
arrayData.add(i % size);
939+
}
940+
JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
941+
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
942+
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
943+
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
944+
}
945+
946+
@Test
947+
public void countApproxDistinctByKey() {
948+
double relativeSD = 0.001;
949+
950+
List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
951+
for (int i = 10; i < 100; i++)
952+
for (int j = 0; j < i; j++)
953+
arrayData.add(new Tuple2<Integer, Integer>(i, j));
954+
955+
JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
956+
List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect();
957+
for (Tuple2<Integer, Object> resItem : res) {
958+
double count = (double)resItem._1();
959+
Long resCount = (Long)resItem._2();
960+
Double error = Math.abs((resCount - count) / count);
961+
Assert.assertTrue(error < relativeSD);
962+
}
963+
964+
}
933965
}

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
1919

2020
import scala.collection.mutable.ArrayBuffer
2121
import scala.collection.mutable.HashSet
22+
import scala.util.Random
2223

2324
import org.scalatest.FunSuite
2425

@@ -109,6 +110,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
109110
assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
110111
}
111112

113+
test("countApproxDistinctByKey") {
114+
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
115+
116+
/* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
117+
* only a statistical bound, the tests can fail for large values of relativeSD. We will be using
118+
* relatively tight error bounds to check correctness of functionality rather than checking
119+
* whether the approximation conforms with the requested bound.
120+
*/
121+
val relativeSD = 0.001
122+
123+
// For each value i, there are i tuples with first element equal to i.
124+
// Therefore, the expected count for key i would be i.
125+
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
126+
val rdd1 = sc.parallelize(stacked)
127+
val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
128+
counted1.foreach{
129+
case(k, count) => assert(error(count, k) < relativeSD)
130+
}
131+
132+
val rnd = new Random()
133+
134+
// The expected count for key num would be num
135+
val randStacked = (1 to 100).flatMap { i =>
136+
val num = rnd.nextInt % 500
137+
(1 to num).map(j => (num, j))
138+
}
139+
val rdd2 = sc.parallelize(randStacked)
140+
val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
141+
counted2.foreach{
142+
case(k, count) => assert(error(count, k) < relativeSD)
143+
}
144+
}
145+
112146
test("join") {
113147
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
114148
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext {
6363
}
6464
}
6565

66+
test("countApproxDistinct") {
67+
68+
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
69+
70+
val size = 100
71+
val uniformDistro = for (i <- 1 to 100000) yield i % size
72+
val simpleRdd = sc.makeRDD(uniformDistro)
73+
assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
74+
assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
75+
assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
76+
assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
77+
}
78+
6679
test("SparkContext.union") {
6780
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
6881
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))

core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
172172
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11))
173173
}
174174

175+
test("kryo with SerializableHyperLogLog") {
176+
assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3)
177+
}
178+
175179
test("kryo with reduce") {
176180
val control = 1 :: 2 :: Nil
177181
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))

0 commit comments

Comments
 (0)