@@ -45,7 +45,7 @@ class PrefixSpan private (
4545 private var minSupport : Double ,
4646 private var maxPatternLength : Int ) extends Logging with Serializable {
4747
48- private val maxSuffixesBeforeLocalProcessing : Long = 10000
48+ private val maxProjectedDBSizeBeforeLocalProcessing : Long = 10000
4949
5050 /**
5151 * Constructs a default instance with default parameters
@@ -89,41 +89,36 @@ class PrefixSpan private (
8989 val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, sequences)
9090 val prefixSuffixPairs = getPrefixSuffixPairs(
9191 lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
92- var patternsCount : Long = lengthOnePatternsAndCounts.count( )
92+ prefixSuffixPairs.persist( StorageLevel . MEMORY_AND_DISK )
9393 var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (ArrayBuffer (x._1), x._2))
9494 var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
9595 splitPrefixSuffixPairs(prefixSuffixPairs)
96- largePrefixSuffixPairs.persist(StorageLevel .MEMORY_AND_DISK )
97- var patternLength : Int = 1
98- while (patternLength < maxPatternLength &&
99- largePrefixSuffixPairs.count() != 0 ) {
96+ while (largePrefixSuffixPairs.count() != 0 ) {
10097 val (nextPatternAndCounts, nextPrefixSuffixPairs) =
10198 getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs)
102- patternsCount = nextPatternAndCounts.count()
10399 largePrefixSuffixPairs.unpersist()
104- val splitedPrefixSuffixPairs = splitPrefixSuffixPairs(nextPrefixSuffixPairs)
105- largePrefixSuffixPairs = splitedPrefixSuffixPairs._2
100+ val (smallerPairsPart, largerPairsPart) = splitPrefixSuffixPairs(nextPrefixSuffixPairs)
101+ largePrefixSuffixPairs = largerPairsPart
106102 largePrefixSuffixPairs.persist(StorageLevel .MEMORY_AND_DISK )
107- smallPrefixSuffixPairs = smallPrefixSuffixPairs ++ splitedPrefixSuffixPairs._1
108- allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
109- patternLength = patternLength + 1
103+ smallPrefixSuffixPairs ++= smallerPairsPart
104+ allPatternAndCounts ++= nextPatternAndCounts
110105 }
111106 if (smallPrefixSuffixPairs.count() > 0 ) {
112107 val projectedDatabase = smallPrefixSuffixPairs
113108 .map(x => (x._1.toSeq, x._2))
114109 .groupByKey()
115110 .map(x => (x._1.toArray, x._2.toArray))
116111 val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase)
117- allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
112+ allPatternAndCounts ++= nextPatternAndCounts
118113 }
119114 allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, count) }
120115 }
121116
122117
123118 /**
124119 * Split prefix suffix pairs to two parts:
125- * suffixes' size less than maxSuffixesBeforeLocalProcessing and
126- * suffixes' size more than maxSuffixesBeforeLocalProcessing
120+ * Prefixes with projected databases smaller than maxSuffixesBeforeLocalProcessing and
121+ * Prefixes with projected databases larger than maxSuffixesBeforeLocalProcessing
127122 * @param prefixSuffixPairs prefix (length n) and suffix pairs,
128123 * @return small size prefix suffix pairs and big size prefix suffix pairs
129124 * (RDD[prefix, suffix], RDD[prefix, suffix ])
@@ -134,7 +129,7 @@ class PrefixSpan private (
134129 val suffixSizeMap = prefixSuffixPairs
135130 .map(x => (x._1, x._2.length))
136131 .reduceByKey(_ + _)
137- .map(x => (x._2 <= maxSuffixesBeforeLocalProcessing , Set (x._1)))
132+ .map(x => (x._2 <= maxProjectedDBSizeBeforeLocalProcessing , Set (x._1)))
138133 .reduceByKey(_ ++ _)
139134 .collect
140135 .toMap
0 commit comments