@@ -73,9 +73,11 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
7373 *
7474 * The buffer contains a sequence of RDD's, each containing a sequence of items
7575 */
76- class TestOutputStream [T : ClassTag ](parent : DStream [T ],
77- val output : ArrayBuffer [Seq [T ]] = ArrayBuffer [Seq [T ]]())
78- extends ForEachDStream [T ](parent, (rdd : RDD [T ], t : Time ) => {
76+ class TestOutputStream [T : ClassTag ](
77+ parent : DStream [T ],
78+ val output : SynchronizedBuffer [Seq [T ]] =
79+ new ArrayBuffer [Seq [T ]] with SynchronizedBuffer [Seq [T ]]
80+ ) extends ForEachDStream [T ](parent, (rdd : RDD [T ], t : Time ) => {
7981 val collected = rdd.collect()
8082 output += collected
8183 }) {
@@ -95,8 +97,10 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
9597 * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
9698 * containing a sequence of items.
9799 */
98- class TestOutputStreamWithPartitions [T : ClassTag ](parent : DStream [T ],
99- val output : ArrayBuffer [Seq [Seq [T ]]] = ArrayBuffer [Seq [Seq [T ]]]())
100+ class TestOutputStreamWithPartitions [T : ClassTag ](
101+ parent : DStream [T ],
102+ val output : SynchronizedBuffer [Seq [Seq [T ]]] =
103+ new ArrayBuffer [Seq [Seq [T ]]] with SynchronizedBuffer [Seq [Seq [T ]]])
100104 extends ForEachDStream [T ](parent, (rdd : RDD [T ], t : Time ) => {
101105 val collected = rdd.glom().collect().map(_.toSeq)
102106 output += collected
@@ -108,10 +112,6 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
108112 ois.defaultReadObject()
109113 output.clear()
110114 }
111-
112- def toTestOutputStream : TestOutputStream [T ] = {
113- new TestOutputStream [T ](this .parent, this .output.map(_.flatten))
114- }
115115}
116116
117117/**
@@ -425,12 +425,21 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
425425 logInfo(" --------------------------------" )
426426
427427 // Match the output with the expected output
428- assert(output.size === expectedOutput.size, " Number of outputs do not match" )
429428 for (i <- 0 until output.size) {
430429 if (useSet) {
431- assert(output(i).toSet === expectedOutput(i).toSet)
430+ assert(
431+ output(i).toSet === expectedOutput(i).toSet,
432+ s " Set comparison failed \n " +
433+ s " Expected output ( ${expectedOutput.size} items): \n ${expectedOutput.mkString(" \n " )}\n " +
434+ s " Generated output ( ${output.size} items): ${output.mkString(" \n " )}"
435+ )
432436 } else {
433- assert(output(i).toList === expectedOutput(i).toList)
437+ assert(
438+ output(i).toList === expectedOutput(i).toList,
439+ s " Ordered list comparison failed \n " +
440+ s " Expected output ( ${expectedOutput.size} items): \n ${expectedOutput.mkString(" \n " )}\n " +
441+ s " Generated output ( ${output.size} items): ${output.mkString(" \n " )}"
442+ )
434443 }
435444 }
436445 logInfo(" Output verified successfully" )
0 commit comments