Skip to content

Commit 2369e3a

Browse files
committed
[SPARK-24610] fix reading small files via wholeTextFiles
1 parent bc11146 commit 2369e3a

File tree

2 files changed

+105
-0
lines changed

2 files changed

+105
-0
lines changed

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat
5353
val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
5454
val maxSplitSize = Math.ceil(totalLen * 1.0 /
5555
(if (minPartitions == 0) 1 else minPartitions)).toLong
56+
57+
// For small files we need to ensure the min split size per node & rack <= maxSplitSize
58+
val config = context.getConfiguration
59+
val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
60+
val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
61+
62+
if (maxSplitSize < minSplitSizePerNode) {
63+
super.setMinSplitSizeNode(maxSplitSize)
64+
}
65+
66+
if (maxSplitSize < minSplitSizePerRack) {
67+
super.setMinSplitSizeRack(maxSplitSize)
68+
}
5669
super.setMaxSplitSize(maxSplitSize)
5770
}
5871
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.input
19+
20+
import java.io.{DataOutputStream, File, FileOutputStream}
21+
22+
import org.apache.spark.internal.Logging
23+
import org.apache.spark.util.Utils
24+
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
25+
import org.scalatest.BeforeAndAfterAll
26+
27+
import scala.collection.immutable.IndexedSeq
28+
29+
/**
30+
* Tests the correctness of
31+
* [[org.apache.spark.input.WholeTextFileInputFormat WholeTextFileInputFormat]]. A temporary
32+
* directory containing files is created as fake input which is deleted in the end.
33+
*/
34+
class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
35+
private var sc: SparkContext = _
36+
37+
override def beforeAll() {
38+
super.beforeAll()
39+
val conf = new SparkConf()
40+
sc = new SparkContext("local", "test", conf)
41+
42+
sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.node", 123456)
43+
sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.rack", 123456)
44+
}
45+
46+
override def afterAll() {
47+
try {
48+
sc.stop()
49+
} finally {
50+
super.afterAll()
51+
}
52+
}
53+
54+
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
55+
compress: Boolean) = {
56+
val path = s"${inputDir.toString}/$fileName"
57+
val out = new DataOutputStream(new FileOutputStream(path))
58+
out.write(contents, 0, contents.length)
59+
out.close()
60+
}
61+
62+
test("for small files minimum split size per node and per rack should be less than or equal to " +
63+
"maximum split size.") {
64+
var dir : File = null;
65+
try {
66+
dir = Utils.createTempDir()
67+
logInfo(s"Local disk address is ${dir.toString}.")
68+
69+
WholeTextFileInputFormatSuite.files.foreach { case (filename, contents) =>
70+
createNativeFile(dir, filename, contents, false)
71+
}
72+
73+
val res = sc.wholeTextFiles(dir.toString).count
74+
} finally {
75+
Utils.deleteRecursively(dir)
76+
}
77+
}
78+
}
79+
80+
/**
81+
* Files to be tested are defined here.
82+
*/
83+
object WholeTextFileInputFormatSuite {
84+
private val testWords: IndexedSeq[Byte] = "Spark is easy to use.\n".map(_.toByte)
85+
86+
private val fileNames = Array("part-00000", "part-00001", "part-00002")
87+
private val fileLengths = Array(10, 100, 1000)
88+
89+
private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) =>
90+
filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
91+
}.toMap
92+
}

0 commit comments

Comments
 (0)