@@ -94,36 +94,21 @@ class EMLDAOptimizer extends LDAOptimizer {
9494 /**
9595 * Compute bipartite term/doc graph.
9696 */
97- private [clustering] override def initialize (docs : RDD [(Long , Vector )], lda : LDA ):
98- LDAOptimizer = {
97+ private [clustering] override def initialize (docs : RDD [(Long , Vector )], lda : LDA ): LDAOptimizer = {
9998
10099 val docConcentration = lda.getDocConcentration
101100 val topicConcentration = lda.getTopicConcentration
102101 val k = lda.getK
103102
104- /**
105- * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions),
106- * but values in (0,1) are not yet supported.
107- */
103+ // Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions),
104+ // but values in (0,1) are not yet supported.
108105 require(docConcentration > 1.0 || docConcentration == - 1.0 , s " LDA docConcentration must be " +
109106 s " > 1.0 (or -1 for auto) for EM Optimizer, but was set to $docConcentration" )
110107 require(topicConcentration > 1.0 || topicConcentration == - 1.0 , s " LDA topicConcentration " +
111108 s " must be > 1.0 (or -1 for auto) for EM Optimizer, but was set to $topicConcentration" )
112109
113- /**
114- * - For EM: default = (50 / k) + 1.
115- * - The 50/k is common in LDA libraries.
116- * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM.
117- */
118110 this .docConcentration = if (docConcentration == - 1 ) (50.0 / k) + 1.0 else docConcentration
119-
120- /**
121- * - For EM: default = 0.1 + 1.
122- * - The 0.1 gives a small amount of smoothing.
123- * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM.
124- */
125111 this .topicConcentration = if (topicConcentration == - 1 ) 1.1 else topicConcentration
126-
127112 val randomSeed = lda.getSeed
128113
129114 // For each document, create an edge (Document -> Term) for each unique term in the document.
@@ -230,8 +215,8 @@ class EMLDAOptimizer extends LDAOptimizer {
230215/**
231216 * :: Experimental ::
232217 *
233- * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
234- * processes a subset of the corpus by each call to next , and update the term-topic
218+ * An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA
219+ * algorithm, which processes a subset of the corpus on each iteration , and update the term-topic
235220 * distribution adaptively.
236221 *
237222 * References:
@@ -242,39 +227,35 @@ class OnlineLDAOptimizer extends LDAOptimizer {
242227
243228 // LDA common parameters
244229 private var k : Int = 0
245- private var D : Int = 0
230+ private var corpusSize : Long = 0
246231 private var vocabSize : Int = 0
247232 private var alpha : Double = 0
248233 private var eta : Double = 0
249- private var randomSeed : Long = 0
234+ private var randomGenerator : java.util. Random = null
250235
251236 // Online LDA specific parameters
252- private var tau_0 : Double = - 1
253- private var kappa : Double = - 1
254- private var batchSize : Int = - 1
237+ private var tau_0 : Double = 1024
238+ private var kappa : Double = 0.51
239+ private var minibatchFraction : Double = 0.01
255240
256241 // internal data structure
257242 private var docs : RDD [(Long , Vector )] = null
258243 private var lambda : BDM [Double ] = null
259244 private var Elogbeta : BDM [Double ]= null
260245 private var expElogbeta : BDM [Double ] = null
261246
262- // count of invocation to next, used to help deciding the weight for each iteration
247+ // count of invocation to next, which helps deciding the weight for each iteration
263248 private var iteration = 0
264249
265250 /**
266- * A (positive) learning parameter that downweights early iterations
251+ * A (positive) learning parameter that downweights early iterations. Larger values make early
252+ * iterations count less
267253 */
268- def getTau_0 : Double = {
269- if (this .tau_0 == - 1 ) {
270- 1024
271- } else {
272- this .tau_0
273- }
274- }
254+ def getTau_0 : Double = this .tau_0
275255
276256 /**
277- * A (positive) learning parameter that downweights early iterations
257+ * A (positive) learning parameter that downweights early iterations. Larger values make early
258+ * iterations count less
278259 * Automatic setting of parameter:
279260 * - default = 1024, which follows the recommendation from OnlineLDA paper.
280261 */
@@ -287,18 +268,12 @@ class OnlineLDAOptimizer extends LDAOptimizer {
287268 /**
288269 * Learning rate: exponential decay rate
289270 */
290- def getKappa : Double = {
291- if (this .kappa == - 1 ) {
292- 0.5
293- } else {
294- this .kappa
295- }
296- }
271+ def getKappa : Double = this .kappa
297272
298273 /**
299274 * Learning rate: exponential decay rate---should be between
300275 * (0.5, 1.0] to guarantee asymptotic convergence.
301- * - default = 0.5 , which follows the recommendation from OnlineLDA paper.
276+ * - default = 0.51 , which follows the recommendation from OnlineLDA paper.
302277 */
303278 def setKappa (kappa : Double ): this .type = {
304279 require(kappa >= 0 || kappa == - 1.0 ,
@@ -310,52 +285,44 @@ class OnlineLDAOptimizer extends LDAOptimizer {
310285 /**
311286 * Mini-batch size, which controls how many documents are used in each iteration
312287 */
313- def getBatchSize : Int = {
314- if (this .batchSize == - 1 ) {
315- D / 100
316- } else {
317- this .batchSize
318- }
319- }
288+ def getMiniBatchFraction : Double = this .minibatchFraction
320289
321290 /**
322291 * Mini-batch size, which controls how many documents are used in each iteration
323292 * default = 1% from total documents.
324293 */
325- def setBatchSize ( batchSize : Int ): this .type = {
326- this .batchSize = batchSize
294+ def setMiniBatchFraction ( miniBatchFraction : Double ): this .type = {
295+ this .minibatchFraction = miniBatchFraction
327296 this
328297 }
329298
330- private [clustering] override def initialize (docs : RDD [(Long , Vector )], lda : LDA ): LDAOptimizer = {
299+ private [clustering] override def initialize (docs : RDD [(Long , Vector )], lda : LDA ): LDAOptimizer = {
331300
332301 this .k = lda.getK
333- this .D = docs.count().toInt
302+ this .corpusSize = docs.count()
334303 this .vocabSize = docs.first()._2.size
335304 this .alpha = if (lda.getDocConcentration == - 1 ) 1.0 / k else lda.getDocConcentration
336305 this .eta = if (lda.getTopicConcentration == - 1 ) 1.0 / k else lda.getTopicConcentration
337- this .randomSeed = randomSeed
306+ this .randomGenerator = new Random (lda.getSeed)
338307
339308 this .docs = docs
340309
341310 // Initialize the variational distribution q(beta|lambda)
342311 this .lambda = getGammaMatrix(k, vocabSize)
343- this .Elogbeta = dirichlet_expectation (lambda)
312+ this .Elogbeta = dirichletExpectation (lambda)
344313 this .expElogbeta = exp(Elogbeta )
345314 this .iteration = 0
346315 this
347316 }
348317
349318 /**
350- * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model,
351- * and it will update the topic distribution adaptively for the terms appearing in the subset.
352- *
353- * @return Inferred LDA model
319+ * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA
320+ * model, and it will update the topic distribution adaptively for the terms appearing in the
321+ * subset.
354322 */
355323 private [clustering] override def next (): OnlineLDAOptimizer = {
356324 iteration += 1
357- val batchSize = getBatchSize
358- val batch = docs.sample(true , batchSize.toDouble / D , randomSeed).cache()
325+ val batch = docs.sample(true , minibatchFraction, randomGenerator.nextLong())
359326 if (batch.isEmpty()) return this
360327
361328 val k = this .k
@@ -406,15 +373,17 @@ class OnlineLDAOptimizer extends LDAOptimizer {
406373 })
407374
408375 val batchResult = stats.reduce(_ += _)
409- update(batchResult, iteration, batchSize)
410- batch.unpersist()
376+ update(batchResult, iteration, (minibatchFraction * corpusSize).toInt)
411377 this
412378 }
413379
414380 private [clustering] override def getLDAModel (iterationTimes : Array [Double ]): LDAModel = {
415381 new LocalLDAModel (Matrices .fromBreeze(lambda).transpose)
416382 }
417383
384+ /**
385+ * Update lambda based on the batch submitted. batchSize can be different for each iteration.
386+ */
418387 private def update (raw : BDM [Double ], iter: Int , batchSize : Int ): Unit = {
419388
420389 val tau_0 = this .getTau_0
@@ -427,18 +396,26 @@ class OnlineLDAOptimizer extends LDAOptimizer {
427396 val stat = raw :* expElogbeta
428397
429398 // Update lambda based on documents.
430- lambda = lambda * (1 - weight) + (stat * D .toDouble / batchSize.toDouble + eta) * weight
431- Elogbeta = dirichlet_expectation(lambda)
399+ lambda = lambda * (1 - weight) +
400+ (stat * (corpusSize.toDouble / batchSize.toDouble) + eta) * weight
401+ Elogbeta = dirichletExpectation(lambda)
432402 expElogbeta = exp(Elogbeta )
433403 }
434404
405+ /**
406+ * Get a random matrix to initialize lambda
407+ */
435408 private def getGammaMatrix (row: Int , col: Int ): BDM [Double ] = {
436409 val gammaRandomGenerator = new Gamma (100 , 1.0 / 100.0 )
437410 val temp = gammaRandomGenerator.sample(row * col).toArray
438411 (new BDM [Double ](col, row, temp)).t
439412 }
440413
441- private def dirichlet_expectation (alpha : BDM [Double ]): BDM [Double ] = {
414+ /**
415+ * For theta ~ Dir(alpha), computes E[log(theta)] given alpha. Currently the implementation
416+ * uses digamma which is accurate but expensive.
417+ */
418+ private def dirichletExpectation (alpha : BDM [Double ]): BDM [Double ] = {
442419 val rowSum = sum(alpha(breeze.linalg.* , :: ))
443420 val digAlpha = digamma(alpha)
444421 val digRowSum = digamma(rowSum)
0 commit comments