diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala index 20b3992083ae6..c893c800181a4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala @@ -20,7 +20,6 @@ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} @@ -28,9 +27,10 @@ import org.apache.spark._ private[streaming] class HDFSBackedBlockRDDPartition( - val blockId: BlockId, idx: Int, val segment: WriteAheadLogFileSegment) extends Partition { - val index = idx -} + val blockId: BlockId, + val index: Int, + val segment: WriteAheadLogFileSegment + ) extends Partition private[streaming] class HDFSBackedBlockRDD[T: ClassTag]( @@ -42,13 +42,12 @@ class HDFSBackedBlockRDD[T: ClassTag]( val storageLevel: StorageLevel ) extends BlockRDD[T](sc, blockIds) { - if (blockIds.length != segments.length) { - throw new IllegalStateException("Number of block ids must be the same as number of segments!") - } + require(blockIds.length == segments.length, + "Number of block ids must be the same as number of segments!") // Hadoop Configuration is not serializable, so broadcast it as a serializable. val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) - .asInstanceOf[Broadcast[SerializableWritable[Configuration]]] + override def getPartitions: Array[Partition] = { assertValid() (0 until blockIds.size).map { i => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala index 9e1ad312e52fb..7bada0c83f0a3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala @@ -19,17 +19,18 @@ package org.apache.spark.streaming.rdd import java.io.File import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.{SparkConf, SparkContext} + import scala.collection.mutable.ArrayBuffer -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} -import org.apache.spark.{SparkConf, SparkContext} -class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { val conf = new SparkConf() .setMaster("local[2]") .setAppName(this.getClass.getSimpleName) @@ -51,6 +52,13 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { dir.delete() } + override def afterAll(): Unit = { + // Copied from LocalSparkContext which can't be imported since spark-core test-jar does not + // get imported properly by sbt even if it is created. + sparkContext.stop() + System.clearProperty("spark.driver.port") + } + test("Data available in BM and HDFS") { doTestHDFSBackedRDD(5, 5, 20, 5) } @@ -70,8 +78,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { /** * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the * BlockManager, so all reads need not happen from HDFS. - * @param total - Total number of Strings to write - * @param blockCount - Number of blocks to write (therefore, total # of events per block = + * @param total Total number of Strings to write + * @param blockCount Number of blocks to write (therefore, total # of events per block = * total/blockCount */ private def doTestHDFSBackedRDD( @@ -81,8 +89,7 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { blockCount: Int ) { val countPerBlock = total / blockCount - val blockIds = (0 until blockCount).map { - i => + val blockIds = (0 until blockCount).map { i => StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet()) } @@ -95,16 +102,17 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { } } - val segments = new ArrayBuffer[WriteAheadLogFileSegment] - if (writeToHDFSCount != 0) { - // Generate some fake segments for the blocks in BM so the RDD does not complain - segments ++= generateFakeSegments(writeToBMCount) - segments ++= writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount), - blockIds.slice(writeToBMCount, blockCount)) - - } else { - segments ++= generateFakeSegments(blockCount) + val segments = { + if (writeToHDFSCount != 0) { + // Generate some fake segments for the blocks in BM so the RDD does not complain + generateFakeSegments(writeToBMCount) ++ + writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount), + blockIds.slice(writeToBMCount, blockCount)) + } else { + generateFakeSegments(blockCount) + } } + val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, segments.toArray, false, StorageLevel.MEMORY_ONLY) @@ -116,10 +124,9 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { /** * Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that * went into one block. - * @param count - Number of Strings to write - * @param countPerBlock - Number of Strings per block - * @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of WriteAheadLogFileSegments, - * each representing the block being written to HDFS. + * @param count Number of Strings to write + * @param countPerBlock Number of Strings per block + * @return Seq of Seqs, each of these Seqs is one block */ private def generateData( count: Int, @@ -130,8 +137,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { } private def writeDataToHDFS( - blockData: Seq[Seq[String]], - blockIds: Seq[BlockId] + blockData: Seq[Seq[String]], + blockIds: Seq[BlockId] ): Seq[WriteAheadLogFileSegment] = { assert(blockData.size === blockIds.size) val segments = new ArrayBuffer[WriteAheadLogFileSegment]()