Skip to content

Commit 26faa6a

Browse files
committed
[SPARK-7155] [CORE] Support comma-separated list of files as input for newAPIHadoopFile, wholeTextFiles, and binaryFiles. Use setInputPaths for consistency.
1 parent 73e1f16 commit 26faa6a

File tree

2 files changed

+68
-16
lines changed

2 files changed

+68
-16
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
704704
RDD[(String, String)] = {
705705
assertNotStopped()
706706
val job = new NewHadoopJob(hadoopConfiguration)
707-
NewFileInputFormat.addInputPath(job, new Path(path))
707+
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
708+
// comma separated files as input. (see SPARK-7155)
709+
NewFileInputFormat.setInputPaths(job, path)
708710
val updateConf = job.getConfiguration
709711
new WholeTextFileRDD(
710712
this,
@@ -750,7 +752,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
750752
RDD[(String, PortableDataStream)] = {
751753
assertNotStopped()
752754
val job = new NewHadoopJob(hadoopConfiguration)
753-
NewFileInputFormat.addInputPath(job, new Path(path))
755+
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
756+
// comma separated files as input. (see SPARK-7155)
757+
NewFileInputFormat.setInputPaths(job, path)
754758
val updateConf = job.getConfiguration
755759
new BinaryFileRDD(
756760
this,
@@ -926,9 +930,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
926930
// The call to new NewHadoopJob automatically adds security credentials to conf,
927931
// so we don't need to explicitly add them ourselves
928932
val job = new NewHadoopJob(conf)
929-
// Use addInputPaths so that newAPIHadoopFile aligns with hadoopFile in taking
933+
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
930934
// comma separated files as input. (see SPARK-7155)
931-
NewFileInputFormat.addInputPaths(job, path)
935+
NewFileInputFormat.setInputPaths(job, path)
932936
val updatedConf = job.getConfiguration
933937
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
934938
}

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -216,29 +216,77 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
216216
}
217217
}
218218

219-
test("Comma separated paths could be used for hadoopFile and newAPIHadoopFile (SPARK-7155)") {
219+
test("Comma separated paths for newAPIHadoopFile/wholeTextFiles/binaryFiles (SPARK-7155)") {
220220
// Regression test for SPARK-7155
221-
val dir = Utils.createTempDir()
221+
// dir1 and dir2 are used for wholeTextFiles and binaryFiles
222+
val dir1 = Utils.createTempDir()
223+
val dir2 = Utils.createTempDir()
224+
225+
val dirpath1=dir1.getAbsolutePath
226+
val dirpath2=dir2.getAbsolutePath
227+
228+
// file1 and file2 are placed inside dir1, they are also used for
229+
// textFile, hadoopFile, and newAPIHadoopFile
230+
// file3, file4 and file5 are placed inside dir2, they are used for
231+
// textFile, hadoopFile, and newAPIHadoopFile as well
232+
val file1 = new File(dir1, "part-00000")
233+
val file2 = new File(dir1, "part-00001")
234+
val file3 = new File(dir2, "part-00000")
235+
val file4 = new File(dir2, "part-00001")
236+
val file5 = new File(dir2, "part-00002")
237+
238+
val filepath1=file1.getAbsolutePath
239+
val filepath2=file2.getAbsolutePath
240+
val filepath3=file3.getAbsolutePath
241+
val filepath4=file4.getAbsolutePath
242+
val filepath5=file5.getAbsolutePath
222243

223-
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
224-
val absolutePath1 = file1.getAbsolutePath
225-
226-
val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
227-
val absolutePath2 = file2.getAbsolutePath
228244

229245
try {
230-
// Create two text files.
246+
// Create 5 text files.
231247
Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, UTF_8)
232248
Files.write("someline1 in file2\nsomeline2 in file2", file2, UTF_8)
249+
Files.write("someline1 in file3", file3, UTF_8)
250+
Files.write("someline1 in file4\nsomeline2 in file4", file4, UTF_8)
251+
Files.write("someline1 in file2\nsomeline2 in file5", file5, UTF_8)
233252

234253
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
235254

236-
// Test textFile, hadoopFile, and newAPIHadoopFile
237-
assert(sc.textFile(absolutePath1+","+absolutePath2).count() == 5L)
238-
assert(sc.hadoopFile(absolutePath1+","+absolutePath2, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
239-
assert(sc.newAPIHadoopFile(absolutePath1+","+absolutePath2, classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
255+
// Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2
256+
assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L)
257+
assert(sc.hadoopFile(filepath1 + "," + filepath2,
258+
classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
259+
assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2,
260+
classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
261+
262+
// Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5
263+
assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L)
264+
assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
265+
classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
266+
assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
267+
classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
268+
269+
// Test wholeTextFiles, and binaryFiles for dir1 and dir2
270+
assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L)
271+
assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L)
272+
240273
} finally {
241274
sc.stop()
275+
if (file1.exists()) {
276+
file1.delete()
277+
}
278+
if (file2.exists()) {
279+
file2.delete()
280+
}
281+
if (file3.exists()) {
282+
file3.delete()
283+
}
284+
if (file4.exists()) {
285+
file4.delete()
286+
}
287+
if (file5.exists()) {
288+
file5.delete()
289+
}
242290
}
243291
}
244292
}

0 commit comments

Comments
 (0)