Skip to content

Conversation

@hvanhovell
Copy link
Contributor

This PR implements a HyperLogLog based Approximate Count Distinct function using the new UDAF interface.

The implementation is inspired by the ClearSpring HyperLogLog implementation and should produce the same results.

There is still some documentation and testing left to do.

cc @yhuai

@yhuai
Copy link
Contributor

yhuai commented Aug 21, 2015

ok to test

@rxin
Copy link
Contributor

rxin commented Aug 21, 2015

This made my day. The approach is super cool.

Couple suggestions:

  1. Can we use HyperLogLogPlus? It's also in streamlib: https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java
  2. Can we write this in a way to make it more unit testable?

Beyond this, would be cool to have count-min sketch too! (future work) In the past I had created a ticket to track streaming algorithms: https://issues.apache.org/jira/browse/SPARK-6760

@SparkQA
Copy link

SparkQA commented Aug 21, 2015

Test build #41377 has finished for PR 8362 at commit e178d9e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class HyperLogLog(child: Expression, relativeSD: Double = 0.05)

@hvanhovell
Copy link
Contributor Author

Thanks.

I was aiming for compatibility with the existing approxCountDistinct, but we can also implement HLL++. HLL++ introduces three (orthogonal) refinements: 64-bit hashing, better low cardinality corrections and a sparse encoding scheme. The first two refinements are easy to add. The third will require a bit more effort.

Unit testing this is a bit of a challenge. End-to-end (blackbox) testing is no problem, as long as we know what the result should be, or if we do random testing (results should be within 5% of the actual value). Testing parts of the algorithm is a bit of a PITA:

  • It is hard to reason about the results (the updated registers) HLL produces.
  • Register access code and HLL code are intertwined.

Both the ClearSpring and AggregateKnowledge implementations resort to blackbox testing. I will create some blackbox tests.

@rxin
Copy link
Contributor

rxin commented Aug 21, 2015

Thanks - I think blackbox testing is fine. But it would be great to apply that at the "unit" testing level, i.e. running directly against the aggregate function, rather than against Spark SQL end to end.

@hvanhovell
Copy link
Contributor Author

Implemented initial non-sparse HLL++. I am going to take a look at the sparse version next week. The results are still equal to the Clearspring HLL+ implementation in non-sparse mode.

I also need to clean-up the docs for the main HLL++ class a bit.

@SparkQA
Copy link

SparkQA commented Aug 28, 2015

Test build #41719 has finished for PR 8362 at commit 1ea722b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class HyperLogLogPlusPlus(child: Expression, relativeSD: Double = 0.05)

@rxin
Copy link
Contributor

rxin commented Sep 7, 2015

@hvanhovell do you mind closing this pull request, and re-open when you feel it is ready for review again?

@hvanhovell
Copy link
Contributor Author

@rxin the dense version of HLL++ is ready. We could also add this, and add the sparse logic in a follow-up PR. Let me know what you think. I'll close if you'd rather do everything in one go.

@rxin
Copy link
Contributor

rxin commented Sep 8, 2015

Ah ok. will add this to our sprint backlog and get somebody to review it soon.

@rxin
Copy link
Contributor

rxin commented Sep 10, 2015

One quick note: https://github.com/twitter/algebird/pull/491/files

anything we can learn from the above pr?

@MLnick
Copy link
Contributor

MLnick commented Sep 14, 2015

@hvanhovell as discussed on the dev mailing list, perhaps it would be interesting to allow the return type to include the aggregated HLL registers. This could be (for example) in the form of StructType {'cardinality':Long, 'hll': Array[Byte]}, where the hll is in the same serialized form that can be used to instantiate say a StreamLib or Algebird HLL class for use outside of Spark.

Is it possible to specify input arguments for rsd? So SELECT APPROX DISTINCT(column, 0.1) FROM ...? If so, then another option is to add a further argument such as returnHLL: Boolean = false so that either the raw HLL or the cardinality is returned?

@MLnick
Copy link
Contributor

MLnick commented Sep 14, 2015

@hvanhovell @rxin is it intended that this replace the existing approxCountDistinct implementation? And I assume this will happen automatically due to extending AggregateFunction2?

@yhuai
Copy link
Contributor

yhuai commented Sep 14, 2015

@MLnick This one will replace the existing implementation. For now, we will do conversion as shown at https://github.com/apache/spark/pull/8362/files#diff-78b9b210b8cee72e7097bc1af44bd315L98. Later, we will remove the old implementation (AggregateFunction1 interface).

@hvanhovell
Copy link
Contributor Author

@MLnick I am in the process of moving house, so I am a bit slow/late with my response :(...

I think it is very usefull to be able to return the HLL registers to the users (it could also be nice to use in cost based planning). I would rather give it a different name though createHLLRegisters for instance (the name needs work), to make it clear that we are doing something different.

The UDAF should support a rsd parameter. Doesn't it? I'll add a test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we round it up?

@davies
Copy link
Contributor

davies commented Sep 28, 2015

I took a round, this looks pretty good to me over all.

Currently, each grouping key needs 200 bytes (b=8, by default), so the sparse version could help to reduce the memory usage in case of the average number of distinct value is small (I believe it's a common case). Since we already support external aggregation (using sort based), so it's not critical, could be a optional improvement (separated PR).

Had ran a small benchmark for this patch, I'm surprised it's slower than 1.5 (using old aggregation). The test code:

df = sqlContext.range(1<<25).agg(approxCountDistinct("id"))
df.explain()
t = time.time()
print df.collect()
print time.time() - t

It took 3.4 seconds in 1.5, but 6.4 seconds with this patch.

The plain in 1.5:

Aggregate false, [APPROXIMATE COUNT(DISTINCT PartialApproxCountDistinct#2) AS APPROXIMATE COUNT(DISTINCT id)#1L]
 Exchange SinglePartition
  Aggregate true, [APPROXIMATE COUNT(DISTINCT id#0L) AS PartialApproxCountDistinct#2]
   Scan PhysicalRDD[id#0L]

The plan with this patch:

SortBasedAggregate(key=[], functions=[(hyperloglogplusplus(id#0L),mode=Final,isDistinct=false)], output=[APPROXIMATE COUNT(DISTINCT id)#1L])
 ConvertToSafe
  TungstenExchange SinglePartition
   ConvertToUnsafe
    SortBasedAggregate(key=[], functions=[(hyperloglogplusplus(id#0L),mode=Partial,isDistinct=false)], output=[MS[0]#30L,MS[1]#31L,MS[2]#32L,MS[3]#33L,MS[4]#34L,MS[5]#35L,MS[6]#36L,MS[7]#37L,MS[8]#38L,MS[9]#39L,MS[10]#40L,MS[11]#41L,MS[12]#42L,MS[13]#43L,MS[14]#44L,MS[15]#45L,MS[16]#46L,MS[17]#47L,MS[18]#48L,MS[19]#49L,MS[20]#50L,MS[21]#51L,MS[22]#52L,MS[23]#53L,MS[24]#54L,MS[25]#55L])
     Scan PhysicalRDD[id#0L]

Discussed this with @yhuai , the slowness may come from the new aggregation, that only not support AAlgebraicAggregate in hash mode, we will fix that in 1.6.

@rxin
Copy link
Contributor

rxin commented Sep 29, 2015

@yhuai can we make non-codegen path use tungsten aggregate as well? Otherwise we would need to maintain two entirely separate codepath.

@davies
Copy link
Contributor

davies commented Sep 29, 2015

@hvanhovell @rxin Just realized that the tungsten aggregation does not support var-length types in aggregation buffer, so we can't have sparse version without aggregation changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line here.

@rxin
Copy link
Contributor

rxin commented Sep 29, 2015

We can work on improving the aggregate operator.

@SparkQA
Copy link

SparkQA commented Sep 30, 2015

Test build #43132 has finished for PR 8362 at commit a5fdd07.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class HyperLogLogPlusPlus(child: Expression, relativeSD: Double = 0.05)

@davies
Copy link
Contributor

davies commented Sep 30, 2015

LGTM, merging this into master, thanks!

@asfgit asfgit closed this in 16fd2a2 Sep 30, 2015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Does HLL++ require using hash64? I took a look at the implementation of it. Looks we will convert the input value to a java string in many cases (https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/hash/MurmurHash.java#L135-L159). For our old function, the offer method of HyperLogLog class use hash internally, which has some specializations.

@hvanhovell
Copy link
Contributor Author

@yhuai It doesn't. A 64-bit hashcode is recommended though, especially when would want to approximate a billion or more unique values. I have used the ClearSpring hashcode, because this enabled me to compare the results of my HLL++ implementation to theirs.

We could replace it with another, better performing, one; don't we have one in Spark? We could also scale down to 32-bits...

@hvanhovell
Copy link
Contributor Author

A good article on HLL++ and the hashcode: http://research.neustar.biz/2013/01/24/hyperloglog-googles-take-on-engineering-hll

@yhuai
Copy link
Contributor

yhuai commented Oct 14, 2015

Thanks for the pointer. Looks like we only have a 32-bit Murmur3 hasher in spark's unsafe module (https://github.com/apache/spark/blob/master/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java).

@hvanhovell
Copy link
Contributor Author

Another thought on hashing. The ClearSpring hash is a generic hash function. We could used very specialized (hopefully fast) hashing functions, because we know the type of our input.

@rxin
Copy link
Contributor

rxin commented Oct 15, 2015

we can create a hash expression, and codegen that. And then just use hyperloglog(hash(field)).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants