Skip to content

Commit 739341f

Browse files
committed
promote the speed of convert files to RDDS
1 parent 293a0b5 commit 739341f

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.rdd.RDD
2727
import org.apache.spark.rdd.UnionRDD
2828
import org.apache.spark.streaming.{StreamingContext, Time}
2929
import org.apache.spark.util.TimeStampedHashMap
30+
import scala.collection.mutable.ArrayBuffer
3031

3132

3233
private[streaming]
@@ -120,14 +121,14 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
120121

121122
/** Generate one RDD from an array of files */
122123
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
123-
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
124-
files.zip(fileRDDs).foreach { case (file, rdd) => {
124+
val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield {
125125
if (rdd.partitions.size == 0) {
126126
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
127127
"files that have been \"moved\" to the directory assigned to the file stream. " +
128128
"Refer to the streaming programming guide for more details.")
129129
}
130-
}}
130+
rdd
131+
}
131132
new UnionRDD(context.sparkContext, fileRDDs)
132133
}
133134

0 commit comments

Comments
 (0)