Skip to content

Commit 53f39f8

Browse files
JoshRosenzzcclp
authored andcommitted
[SPARK-17547] Ensure temp shuffle data file is cleaned up after error
SPARK-8029 (apache#9610) modified shuffle writers to first stage their data to a temporary file in the same directory as the final destination file and then to atomically rename this temporary file at the end of the write job. However, this change introduced the potential for the temporary output file to be leaked if an exception occurs during the write because the shuffle writers' existing error cleanup code doesn't handle deletion of the temp file. This patch avoids this potential cause of disk-space leaks by adding `finally` blocks to ensure that temp files are always deleted if they haven't been renamed. Author: Josh Rosen <[email protected]> Closes apache#15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer. (cherry picked from commit 5b8f737) Signed-off-by: Josh Rosen <[email protected]> (cherry picked from commit 8646b84)
1 parent 7a8c85b commit 53f39f8

File tree

4 files changed

+73
-49
lines changed

4 files changed

+73
-49
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,14 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
157157

158158
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
159159
File tmp = Utils.tempFileWith(output);
160-
partitionLengths = writePartitionedFile(tmp);
161-
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
160+
try {
161+
partitionLengths = writePartitionedFile(tmp);
162+
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
163+
} finally {
164+
if (tmp.exists() && !tmp.delete()) {
165+
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
166+
}
167+
}
162168
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
163169
}
164170

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,15 +209,21 @@ void closeAndWriteOutput() throws IOException {
209209
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
210210
final File tmp = Utils.tempFileWith(output);
211211
try {
212-
partitionLengths = mergeSpills(spills, tmp);
213-
} finally {
214-
for (SpillInfo spill : spills) {
215-
if (spill.file.exists() && ! spill.file.delete()) {
216-
logger.error("Error while deleting spill file {}", spill.file.getPath());
212+
try {
213+
partitionLengths = mergeSpills(spills, tmp);
214+
} finally {
215+
for (SpillInfo spill : spills) {
216+
if (spill.file.exists() && ! spill.file.delete()) {
217+
logger.error("Error while deleting spill file {}", spill.file.getPath());
218+
}
217219
}
218220
}
221+
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
222+
} finally {
223+
if (tmp.exists() && !tmp.delete()) {
224+
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
225+
}
219226
}
220-
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
221227
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
222228
}
223229

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -138,48 +138,54 @@ private[spark] class IndexShuffleBlockResolver(
138138
dataTmp: File): Unit = {
139139
val indexFile = getIndexFile(shuffleId, mapId)
140140
val indexTmp = Utils.tempFileWith(indexFile)
141-
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
142-
Utils.tryWithSafeFinally {
143-
// We take in lengths of each block, need to convert it to offsets.
144-
var offset = 0L
145-
out.writeLong(offset)
146-
for (length <- lengths) {
147-
offset += length
141+
try {
142+
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
143+
Utils.tryWithSafeFinally {
144+
// We take in lengths of each block, need to convert it to offsets.
145+
var offset = 0L
148146
out.writeLong(offset)
147+
for (length <- lengths) {
148+
offset += length
149+
out.writeLong(offset)
150+
}
151+
} {
152+
out.close()
149153
}
150-
} {
151-
out.close()
152-
}
153154

154-
val dataFile = getDataFile(shuffleId, mapId)
155-
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
156-
// the following check and rename are atomic.
157-
synchronized {
158-
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
159-
if (existingLengths != null) {
160-
// Another attempt for the same task has already written our map outputs successfully,
161-
// so just use the existing partition lengths and delete our temporary map outputs.
162-
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
163-
if (dataTmp != null && dataTmp.exists()) {
164-
dataTmp.delete()
165-
}
166-
indexTmp.delete()
167-
} else {
168-
// This is the first successful attempt in writing the map outputs for this task,
169-
// so override any existing index and data files with the ones we wrote.
170-
if (indexFile.exists()) {
171-
indexFile.delete()
172-
}
173-
if (dataFile.exists()) {
174-
dataFile.delete()
175-
}
176-
if (!indexTmp.renameTo(indexFile)) {
177-
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
178-
}
179-
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
180-
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
155+
val dataFile = getDataFile(shuffleId, mapId)
156+
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
157+
// the following check and rename are atomic.
158+
synchronized {
159+
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
160+
if (existingLengths != null) {
161+
// Another attempt for the same task has already written our map outputs successfully,
162+
// so just use the existing partition lengths and delete our temporary map outputs.
163+
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
164+
if (dataTmp != null && dataTmp.exists()) {
165+
dataTmp.delete()
166+
}
167+
indexTmp.delete()
168+
} else {
169+
// This is the first successful attempt in writing the map outputs for this task,
170+
// so override any existing index and data files with the ones we wrote.
171+
if (indexFile.exists()) {
172+
indexFile.delete()
173+
}
174+
if (dataFile.exists()) {
175+
dataFile.delete()
176+
}
177+
if (!indexTmp.renameTo(indexFile)) {
178+
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
179+
}
180+
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
181+
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
182+
}
181183
}
182184
}
185+
} finally {
186+
if (indexTmp.exists() && !indexTmp.delete()) {
187+
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
188+
}
183189
}
184190
}
185191

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,16 @@ private[spark] class SortShuffleWriter[K, V, C](
6868
// (see SPARK-3570).
6969
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
7070
val tmp = Utils.tempFileWith(output)
71-
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
72-
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
73-
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
74-
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
71+
try {
72+
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
73+
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
74+
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
75+
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
76+
} finally {
77+
if (tmp.exists() && !tmp.delete()) {
78+
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
79+
}
80+
}
7581
}
7682

7783
/** Close this writer, passing along whether the map completed */

0 commit comments

Comments
 (0)