@@ -149,7 +149,8 @@ class PrefixSpan private (
149149 }
150150
151151 // Process the small projected databases locally
152- val remainingResults = getPatternsInLocal(minCount, pairsForLocal.groupByKey())
152+ val remainingResults = getPatternsInLocal(
153+ minCount, sc.parallelize(pairsForLocal, 1 ).groupByKey())
153154
154155 (sc.parallelize(resultsAccumulator, 1 ) ++ remainingResults)
155156 .map { case (pattern, count) => (pattern.toArray, count) }
@@ -163,7 +164,7 @@ class PrefixSpan private (
163164 * greater than [[maxLocalProjDBSize ]]
164165 */
165166 private def partitionByProjDBSize (prefixSuffixPairs : RDD [(List [Int ], Array [Int ])])
166- : (RDD [(List [Int ], Array [Int ])], RDD [(List [Int ], Array [Int ])]) = {
167+ : (Array [(List [Int ], Array [Int ])], RDD [(List [Int ], Array [Int ])]) = {
167168 val prefixToSuffixSize = prefixSuffixPairs
168169 .aggregateByKey(0 )(
169170 seqOp = { case (count, suffix) => count + suffix.length },
@@ -175,7 +176,7 @@ class PrefixSpan private (
175176 .toSet
176177 val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) }
177178 val large = prefixSuffixPairs.filter { case (prefix, _) => ! smallPrefixes.contains(prefix) }
178- (small, large)
179+ (small.collect() , large)
179180 }
180181
181182 /**
0 commit comments