Skip to content

Commit 6aea07a

Browse files
committed
Ensure SynchronizedBuffer is used in every TestSuiteBase
1 parent 9f1f9b1 commit 6aea07a

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
7474
* The buffer contains a sequence of RDD's, each containing a sequence of items
7575
*/
7676
class TestOutputStream[T: ClassTag](parent: DStream[T],
77-
val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
77+
val output: SynchronizedBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
7878
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
7979
val collected = rdd.collect()
8080
output += collected
@@ -95,8 +95,10 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
9595
* The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
9696
* containing a sequence of items.
9797
*/
98-
class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
99-
val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
98+
class TestOutputStreamWithPartitions[T: ClassTag](
99+
parent: DStream[T],
100+
val output: SynchronizedBuffer[Seq[Seq[T]]] =
101+
new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]])
100102
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
101103
val collected = rdd.glom().collect().map(_.toSeq)
102104
output += collected
@@ -108,10 +110,6 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
108110
ois.defaultReadObject()
109111
output.clear()
110112
}
111-
112-
def toTestOutputStream: TestOutputStream[T] = {
113-
new TestOutputStream[T](this.parent, this.output.map(_.flatten))
114-
}
115113
}
116114

117115
/**

0 commit comments

Comments
 (0)