@@ -103,7 +103,7 @@ class PrefixSpan private (
103103 // Convert min support to a min number of transactions for this dataset
104104 val minCount = if (minSupport == 0 ) 0L else math.ceil(sequences.count() * minSupport).toLong
105105
106- // Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold
106+ // ( Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold
107107 val freqItemCounts = sequences
108108 .flatMap(seq => seq.distinct.map(item => (item, 1L )))
109109 .reduceByKey(_ + _)
@@ -113,8 +113,9 @@ class PrefixSpan private (
113113 val itemSuffixPairs = {
114114 val freqItems = freqItemCounts.keys.collect().toSet
115115 sequences.flatMap { seq =>
116+ val filteredSeq = seq.filter(freqItems.contains(_))
116117 freqItems.flatMap { item =>
117- val candidateSuffix = LocalPrefixSpan .getSuffix(item, seq.filter(freqItems.contains(_)) )
118+ val candidateSuffix = LocalPrefixSpan .getSuffix(item, filteredSeq )
118119 candidateSuffix match {
119120 case suffix if ! suffix.isEmpty => Some ((List (item), suffix))
120121 case _ => None
@@ -123,7 +124,8 @@ class PrefixSpan private (
123124 }
124125 }
125126
126- // Accumulator for the computed results to be returned
127+ // Accumulator for the computed results to be returned, initialized to the frequent items (i.e.
128+ // frequent length-one prefixes)
127129 var resultsAccumulator = freqItemCounts.map(x => (List (x._1), x._2))
128130
129131 // Remaining work to be locally and distributively processed respectfully
@@ -133,7 +135,7 @@ class PrefixSpan private (
133135 // projected database sizes <= `maxLocalProjDBSize`)
134136 while (pairsForDistributed.count() != 0 ) {
135137 val (nextPatternAndCounts, nextPrefixSuffixPairs) =
136- getPatternCountsAndPrefixSuffixPairs (minCount, pairsForDistributed)
138+ extendPrefixes (minCount, pairsForDistributed)
137139 pairsForDistributed.unpersist()
138140 val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs)
139141 pairsForDistributed = largerPairsPart
@@ -151,7 +153,6 @@ class PrefixSpan private (
151153
152154 /**
153155 * Partitions the prefix-suffix pairs by projected database size.
154- *
155156 * @param prefixSuffixPairs prefix (length n) and suffix pairs,
156157 * @return prefix-suffix pairs partitioned by whether their projected database size is <= or
157158 * greater than [[maxLocalProjDBSize ]]
@@ -173,44 +174,57 @@ class PrefixSpan private (
173174 }
174175
175176 /**
176- * Get the pattern and counts, and prefix suffix pairs
177+ * Extends all prefixes by one item from their suffix and computes the resulting frequent prefixes
178+ * and remaining work.
177179 * @param minCount minimum count
178- * @param prefixSuffixPairs prefix (length n ) and suffix pairs,
179- * @return pattern ( length n+1) and counts, and prefix ( length n+1) and suffix pairs
180- * (RDD[pattern, count], RDD[ prefix, suffix ])
180+ * @param prefixSuffixPairs prefix (length N ) and suffix pairs,
181+ * @return (frequent length N+1 extended prefix, count) pairs and (frequent length N+1 extended
182+ * prefix, corresponding suffix) pairs.
181183 */
182- private def getPatternCountsAndPrefixSuffixPairs (
184+ private def extendPrefixes (
183185 minCount : Long ,
184186 prefixSuffixPairs : RDD [(List [Int ], Array [Int ])])
185187 : (RDD [(List [Int ], Long )], RDD [(List [Int ], Array [Int ])]) = {
186- val prefixAndFrequentItemAndCounts = prefixSuffixPairs
188+
189+ // (length N prefix, item from suffix) pairs and their corresponding number of occurrences
190+ // Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport`
191+ val prefixItemPairAndCounts = prefixSuffixPairs
187192 .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L )) }
188193 .reduceByKey(_ + _)
189194 .filter(_._2 >= minCount)
190- val patternAndCounts = prefixAndFrequentItemAndCounts
191- .map { case (( prefix, item), count) => (item :: prefix, count) }
192- val prefixToFrequentNextItemsMap = prefixAndFrequentItemAndCounts
195+
196+ // Map from prefix to set of possible next items from suffix
197+ val prefixToNextItems = prefixItemPairAndCounts
193198 .keys
194199 .groupByKey()
195200 .mapValues(_.toSet)
196201 .collect()
197202 .toMap
198- val nextPrefixSuffixPairs = prefixSuffixPairs
199- .filter(x => prefixToFrequentNextItemsMap.contains(x._1))
203+
204+
205+ // Frequent patterns with length N+1 and their corresponding counts
206+ val extendedPrefixAndCounts = prefixItemPairAndCounts
207+ .map { case ((prefix, item), count) => (item :: prefix, count) }
208+
209+ // Remaining work, all prefixes will have length N+1
210+ val extendedPrefixAndSuffix = prefixSuffixPairs
211+ .filter(x => prefixToNextItems.contains(x._1))
200212 .flatMap { case (prefix, suffix) =>
201- val frequentNextItems = prefixToFrequentNextItemsMap(prefix)
202- val filteredSuffix = suffix.filter(frequentNextItems.contains(_))
203- frequentNextItems.flatMap { item =>
204- val suffix = LocalPrefixSpan .getSuffix(item, filteredSuffix)
205- if (suffix.isEmpty) None
206- else Some (item :: prefix, suffix)
213+ val frequentNextItems = prefixToNextItems(prefix)
214+ val filteredSuffix = suffix.filter(frequentNextItems.contains(_))
215+ frequentNextItems.flatMap { item =>
216+ LocalPrefixSpan .getSuffix(item, filteredSuffix) match {
217+ case suffix if ! suffix.isEmpty => Some (item :: prefix, suffix)
218+ case _ => None
219+ }
220+ }
207221 }
208- }
209- (patternAndCounts, nextPrefixSuffixPairs )
222+
223+ (extendedPrefixAndCounts, extendedPrefixAndSuffix )
210224 }
211225
212226 /**
213- * calculate the patterns in local.
227+ * Calculate the patterns in local.
214228 * @param minCount the absolute minimum count
215229 * @param data prefixes and projected sequences data data
216230 * @return patterns
0 commit comments