Skip to content

Commit 4145651

Browse files
committed
ShuffleOutputCoordinatorSuite
1 parent 89063dd commit 4145651

File tree

1 file changed

+126
-0
lines changed

1 file changed

+126
-0
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.shuffle
18+
19+
import java.io.{FileInputStream, FileOutputStream, File}
20+
21+
import org.scalatest.BeforeAndAfterEach
22+
23+
import org.apache.spark.SparkFunSuite
24+
import org.apache.spark.util.Utils
25+
26+
class ShuffleOutputCoordinatorSuite extends SparkFunSuite with BeforeAndAfterEach {
27+
28+
var tempDir: File = _
29+
30+
override def beforeEach(): Unit = {
31+
tempDir = Utils.createTempDir()
32+
}
33+
34+
def writeFile(filename: String, data: Int): File = {
35+
val f = new File(tempDir, filename)
36+
val out = new FileOutputStream(f)
37+
out.write(data)
38+
out.close()
39+
f
40+
}
41+
42+
def verifyFiles(successfulAttempt: Int): Unit = {
43+
(0 until 3).foreach { idx =>
44+
val exp = successfulAttempt* 3 + idx
45+
val file = new File(tempDir, s"d$idx")
46+
withClue(s"checking dest file $file") {
47+
assert(file.length === 1)
48+
val in = new FileInputStream(file)
49+
assert(in.read() === exp)
50+
in.close()
51+
52+
}
53+
}
54+
}
55+
56+
override def afterEach(): Unit = {
57+
Utils.deleteRecursively(tempDir)
58+
}
59+
60+
61+
def generateAttempt(attempt: Int): Seq[(File, File)] = {
62+
(0 until 3).map { idx =>
63+
val j = attempt * 3 + idx
64+
writeFile(s"t$j", j) -> new File(tempDir, s"d$idx")
65+
}
66+
}
67+
68+
test("move files if dest missing") {
69+
val firstAttempt = generateAttempt(0)
70+
assert(ShuffleOutputCoordinator.commitOutputs(0, 0, firstAttempt))
71+
verifyFiles(0)
72+
firstAttempt.foreach{ case (t, d) => assert(!t.exists())}
73+
74+
val secondAttempt = generateAttempt(1)
75+
// second commit fails, and also deletes the tmp files
76+
assert(!ShuffleOutputCoordinator.commitOutputs(0, 0, secondAttempt))
77+
verifyFiles(0)
78+
// make sure we delete the temp files if the dest exists
79+
secondAttempt.foreach{ case (t, d) => assert(!t.exists())}
80+
}
81+
82+
test("move files if dest partially missing") {
83+
val firstAttempt = generateAttempt(0)
84+
assert(ShuffleOutputCoordinator.commitOutputs(0, 0, firstAttempt))
85+
verifyFiles(0)
86+
firstAttempt.foreach{ case (t, d) => assert(!t.exists())}
87+
88+
val secondAttempt = generateAttempt(1)
89+
firstAttempt(0)._2.delete()
90+
// second commit now succeeds since one destination file is missing
91+
assert(ShuffleOutputCoordinator.commitOutputs(0, 0, secondAttempt))
92+
verifyFiles(1)
93+
secondAttempt.foreach{ case (t, d) => assert(!t.exists())}
94+
}
95+
96+
test("ignore missing tmp files") {
97+
// HashShuffle doesn't necessarily even create 0 length files for all of its output,
98+
// so just ignore tmp files that are missing
99+
val firstAttempt = generateAttempt(0) ++
100+
Seq(new File(tempDir, "bogus") -> new File(tempDir, "blah"))
101+
assert(ShuffleOutputCoordinator.commitOutputs(0, 0, firstAttempt))
102+
verifyFiles(0)
103+
assert(!new File(tempDir, "blah").exists())
104+
firstAttempt.foreach{ case (t, d) => assert(!t.exists())}
105+
106+
// if we try again, once more with the missing tmp file, commit fails even though dest
107+
// is "partially missing"
108+
// TODO figure out right semantics, esp wrt non-determinstic data
109+
val secondAttempt = generateAttempt(1) ++
110+
Seq(new File(tempDir, "bogus") -> new File(tempDir, "blah"))
111+
assert(!ShuffleOutputCoordinator.commitOutputs(0, 0, secondAttempt))
112+
verifyFiles(0)
113+
assert(!new File(tempDir, "blah").exists())
114+
secondAttempt.foreach{ case (t, d) => assert(!t.exists())}
115+
116+
// but now if we delete one of the real dest files, and try again, it goes through
117+
val thirdAttempt = generateAttempt(2) ++
118+
Seq(new File(tempDir, "bogus") -> new File(tempDir, "blah"))
119+
firstAttempt(0)._2.delete()
120+
assert(ShuffleOutputCoordinator.commitOutputs(0, 0, thirdAttempt))
121+
verifyFiles(2)
122+
assert(!new File(tempDir, "blah").exists())
123+
thirdAttempt.foreach{ case (t, d) => assert(!t.exists())}
124+
}
125+
126+
}

0 commit comments

Comments
 (0)