@@ -38,29 +38,35 @@ object ShuffleOutputCoordinator extends Logging {
3838 shuffleId : Int ,
3939 partitionId : Int , tmpToDest : Seq [(File , File )]): Boolean = synchronized {
4040 logInfo(s " renaming: $tmpToDest" )
41- val someDestAlreadyExists = tmpToDest.exists(_._2.exists)
42- if (! someDestAlreadyExists) {
41+
42+ // HashShuffleWriter might not write any records to some of its files -- that's OK, we only
43+ // move the files that do exist
44+ val toMove = tmpToDest.filter{_._1.exists()}
45+
46+ val destAlreadyExists = toMove.forall(_._2.exists)
47+ if (! destAlreadyExists) {
4348 // if any of the renames fail, delete all the dest files. otherwise, future
4449 // attempts have no hope of succeeding
45- val renamesSucceeded = tmpToDest.map { case (tmp, dest) =>
46- logInfo(s " trying to rename: $tmp -> $dest. ${tmp.exists()}; ${dest.exists()}" )
50+ val renamesSucceeded = toMove.map { case (tmp, dest) =>
51+ if (dest.exists()) {
52+ dest.delete()
53+ }
4754 val r = tmp.renameTo(dest)
4855 if (! r) {
4956 logInfo(s " failed to rename $tmp to $dest. ${tmp.exists()}; ${dest.exists()}" )
5057 }
5158 r
5259 }.forall{identity}
5360 if (! renamesSucceeded) {
54- tmpToDest .foreach { case (tmp, dest) => if (dest.exists()) dest.delete() }
61+ toMove .foreach { case (tmp, dest) => if (dest.exists()) dest.delete() }
5562 false
5663 } else {
5764 true
5865 }
5966 } else {
6067 logInfo(s " shuffle output for shuffle $shuffleId, partition $partitionId already exists, " +
61- s " not overwriting. Another task must have created this shuffle output: " +
62- tmpToDest.map{_._2}.filter{_.exists()})
63- tmpToDest.foreach{ case (tmp, _) => tmp.delete()}
68+ s " not overwriting. Another task must have created this shuffle output. " )
69+ toMove.foreach{ case (tmp, _) => tmp.delete()}
6470 false
6571 }
6672 }
0 commit comments