@@ -82,10 +82,15 @@ class PrefixSpan private (
8282 logWarning(" Input data is not cached." )
8383 }
8484 val minCount = getMinCount(sequences)
85- val (lengthOnePatternsAndCounts, prefixAndCandidates) =
86- findLengthOnePatterns(minCount, sequences)
87- val projectedDatabase = makePrefixProjectedDatabases(prefixAndCandidates)
88- val nextPatterns = getPatternsInLocal(minCount, projectedDatabase)
85+ val lengthOnePatternsAndCounts =
86+ getFreqItemAndCounts(minCount, sequences).collect()
87+ val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
88+ lengthOnePatternsAndCounts.map(_._1), sequences)
89+ val groupedProjectedDatabase = prefixAndProjectedDatabase
90+ .map(x => (x._1.toSeq, x._2))
91+ .groupByKey()
92+ .map(x => (x._1.toArray, x._2.toArray))
93+ val nextPatterns = getPatternsInLocal(minCount, groupedProjectedDatabase)
8994 val lengthOnePatternsAndCountsRdd =
9095 sequences.sparkContext.parallelize(
9196 lengthOnePatternsAndCounts.map(x => (Array (x._1), x._2)))
@@ -122,7 +127,7 @@ class PrefixSpan private (
122127 * @param sequences sequences data
123128 * @return prefixes and projected database
124129 */
125- private def getPatternAndProjectedDatabase (
130+ private def getPrefixAndProjectedDatabase (
126131 frequentPrefixes : Array [Int ],
127132 sequences : RDD [Array [Int ]]): RDD [(Array [Int ], Array [Int ])] = {
128133 val filteredSequences = sequences.map { p =>
@@ -136,33 +141,6 @@ class PrefixSpan private (
136141 }
137142 }
138143
139- /**
140- * Find the patterns that it's length is one
141- * @param minCount the minimum count
142- * @param sequences original sequences data
143- * @return length-one patterns and projection table
144- */
145- private def findLengthOnePatterns (
146- minCount : Long ,
147- sequences : RDD [Array [Int ]]): (Array [(Int , Long )], RDD [(Array [Int ], Array [Int ])]) = {
148- val frequentLengthOnePatternAndCounts = getFreqItemAndCounts(minCount, sequences)
149- val prefixAndProjectedDatabase = getPatternAndProjectedDatabase(
150- frequentLengthOnePatternAndCounts.keys.collect(), sequences)
151- (frequentLengthOnePatternAndCounts.collect(), prefixAndProjectedDatabase)
152- }
153-
154- /**
155- * Constructs prefix-projected databases from (prefix, suffix) pairs.
156- * @param data patterns and projected sequences data before re-partition
157- * @return patterns and projected sequences data after re-partition
158- */
159- private def makePrefixProjectedDatabases (
160- data : RDD [(Array [Int ], Array [Int ])]): RDD [(Array [Int ], Array [Array [Int ]])] = {
161- data.map(x => (x._1.toSeq, x._2))
162- .groupByKey()
163- .map(x => (x._1.toArray, x._2.toArray))
164- }
165-
166144 /**
167145 * calculate the patterns in local.
168146 * @param minCount the absolute minimum count
0 commit comments