Skip to content

Conversation

@squito
Copy link
Contributor

@squito squito commented Oct 22, 2015

Shuffle writers now write to temp files, and when they are done, they atomically move those files into the final location if those files don't already exist. This way, if one executor ends up executing more than one task to generate shuffle output for one partition, the first successful one "wins", and all others are ignored.

@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44124 has finished for PR 9214 at commit c3e4456.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44149 has finished for PR 9214 at commit 1c12ca5.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44155 has finished for PR 9214 at commit 3d96747.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito squito force-pushed the SPARK-8029_first_wins branch from bd815d7 to 550e198 Compare October 22, 2015 19:51
@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44168 has finished for PR 9214 at commit 550e198.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Oct 23, 2015

cc @JoshRosen

@squito
Copy link
Contributor Author

squito commented Oct 23, 2015

this isn't quite ready yet ... still working through test failures. I think the remaining changes are to the tests, but need to work through those and then some cleanup ...

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44229 has finished for PR 9214 at commit 4a19702.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44231 has finished for PR 9214 at commit 89063dd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Oct 23, 2015

@JoshRosen @rxin I'm going to see if I can do a little cleanup still, but the functionality is there now. However, I did realize there are some open questions in the logic, particularly wrt to non-deterministic data.

(a) its possible for shuffle files generated in one attempt to not be generated in another attempt. Eg., if the data is empty, a sort based shuffle may not generate any output file. Similarly, the hash based shuffle may not generate a file if the output for a particular (map,reduce) pair is empty. Furthermore, based on the test case for SPARK-4085 if one shuffle output file is missing, we should consider the entire shuffle output missing and regenerate. You could imagine some degenerate cases, eg., attempt 1 creates outputs a & b, attempt 2 creates outputs b & c, and so attempt 2 overwrites attempt 1, even though its not really the first attempt.

(b) what should the MapStatus output of the uncommitted attempt be? With deterministic data it doesn't really matter, at least to fix SPARK-8029. The output sizes will be approximately the same and that is really good enough. But with non-determinstic data, its possible that attempt1 gets committed, but then attempt2 creates a MapStatus with some zero-sized blocks where attempt1 had non-zero sized blocks. The map status from attempt2 is used, so the shuffle-fetch side decides to completely skip reading those blocks.

Some possible solutions are:

  1. undefined behavior on non-deterministic data
  2. Shuffle writers always create the same output files, even if they are zero-length. This could really hurt the performance of the HashShuffle with lots of partitions, but then again, its already unusable with lots of partitions so it probably doesn't matter. I assume the effect on sort shuffle is negligible. And then add some new marker MapStatus which gets returned when the shuffle output is not committed.
  3. maybe the test for SPARK-4085 could be changed, and then we can change ShuffleOutputCoordinator so that it considers the destination pre-existing if any of the files are already there. You still can have some weird cases where attempt 1 creates outputs a & b, and attempt 2 creates c & d, but it doesn't matter if you commit files c & d also. You'd need the same marker MapStatus for the non-committed attempts.

I assume (1) is not an option. I'll implement (2) but wanted to see if you have any thoughts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen I'm pretty sure that BypassMergeSortShuffleWriter actually wasn't writing any data file if there was an empty iterator. This test was passing just b/c the test setup created the file w/ File.createTempFile. So the change here brings it into line with the implementation (either write no file at all, or write a zero-length file) but given my comment on non-determinism I will change this to always write a zero length file.

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44235 has finished for PR 9214 at commit 4145651.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44251 has finished for PR 9214 at commit 2089e12.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Oct 23, 2015

Well, as I've done some more testing and taken a closer look at the some of the corner cases, I don't think its easy to get this working. The major problem comes from when executors are lost but they come back. Say executor 1 generates some shuffle output, then for whatever reason the driver thinks its lost the executor. The driver unregisters the shuffle output, then queues up some tasks to regenerate that shuffle output. Meanwhile the executor comes back, and thanks to locality preferences its likely to get some of those same tasks again. But the ShuffleOutputCoordinator will ignore the output of those tasks, since those files already exist. If we don't send some map status back when the task completes, the driver will still think that some shuffle output needs to be generated, b/c its already removed the original shuffle output.

We could send back the map status based on the new (but uncommitted) shuffle output, but then it could be wrong about the zero-sized blocks if the data is non-deterministic. We could have the ShuffleOutputCoordinator remember the map status of all output it commits, so that it always returns the mapstatus of the committed output -- this way the second attempt would return the map status for the committed outputs of the first attempt. This adds complexity and memory overhead, though.

And, that still wouldn't solve the problem if the shuffle output files got corrupted somehow. You'd get a fetch failure, the executor would get removed, and then it would come right back. The same tasks would get scheduled on it, but it would never replace the shuffle output. You'd just spin till you get your 4 failures, and the job would fail.

To get around this, the driver could add some logic to prevent executors from re-registering, but to me that sounds like a much bigger change that needs to be thought through very carefully.

In my experience, it is actually much more common that an executor gets removed and then comes back, rather than a node being permanently unreachable. There is just a long GC or something, the executor gets removed, but then it re-registers w/ the master. (And there are unit tests in "local" mode which simulate the executor getting removed, but of course its the same executor which comes right back.)

@mateiz
Copy link
Contributor

mateiz commented Oct 23, 2015

Hey so I'm curious about two things here:

  1. If we just always replaced the output with a new one using a file rename, would we actually have a problem? I think that any thread that has a file open will still be reading from the old version of the file if you do a rename. You should double-check this, but I don't think it will switch mid-file. That might mean the "last task wins" strategy works.
  2. Otherwise, what I would do is store the status in a separate file, similar to the .index file we have for sort-based shuffle. There's no memory overhead and it's easy to read it back again when we're given a map task and we see that an output block for it already exists.

Regarding shuffle files getting corrupted somehow, I think this is super unlikely and I haven't seen many systems try to defend against this. If this were an issue, we'd also have to worry about data cached with DISK_ONLY being corrupted, etc. I think this is considered in systems like HDFS because they store a huge amount of data for a very long time, but I don't think it's a major problem in Spark, and we can always add checksums later if we see it happen.

@squito
Copy link
Contributor Author

squito commented Oct 26, 2015

Hi @mateiz, thanks for taking a look

  1. Do you know if that works reliably on all platforms? Josh had suggested that trick as well during our brainstorm earlier, but we weren't sure if we could rely on it. I think it doesn't work on windows, though I haven't tested and might be remembering wrong. I'm definitely not an expert on this area though, happy to defer. I did try this and it worked on my mac, anyway: https://gist.github.com/squito/222a28f04a6517aafba2
    I think that with "last task wins" you'd still need a lock when opening files for reading & writing to make sure you don't open one task's index file and another task's data file. (a lot of work can happen between opening the data file for writing and opening the index file for writing with the current code, but that can be changed.)

  2. Yeah, that is a great point. I will change this PR to store the map status in a file, that'll be a straightforward fix.

I suppose you are right that if a DISK_ONLY file gets corrupted, your entire spark app is also doomed. But that seems unfortunate to me, not something that we want to emulate. We already see that users are running really long spark apps, on ever increasing cluster sizes (consider #9246). Maybe its very uncommon so its not worth re-engineering things just for that. But I do still feel that even though having one output per attempt is a slightly bigger change right now, the changes are mostly plumbing, and it makes for something that is safer and much easier to reason to about.

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
	core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
	core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@mateiz
Copy link
Contributor

mateiz commented Oct 26, 2015

Maybe just go for version 2) above then, it seems like the simplest one.

Regarding re-engineering vs not, the problem is that if you're trying to do a bug fix, you should introduce the least complexity possible. With fault tolerance in particular it's possible to imagine lots of conditions that don't really happen. For example, what if network messages get corrupted? What if DRAM gets corrupted? You just need to pick a failure model (e.g. do we trust the filesystem to be reliable or not) that fits your observations and make sure things are correct within that model.

@SparkQA
Copy link

SparkQA commented Nov 4, 2015

Test build #45040 has finished for PR 9214 at commit 4d66df1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class ShuffleMapStatusBlockId(shuffleId: Int, mapId: Int) extends BlockId\n * public class JavaAssociationRulesExample\n * public class JavaPrefixSpanExample\n * public class JavaSimpleFPGrowth\n * class StreamInterceptor implements TransportFrameDecoder.Interceptor\n * public final class ChunkFetchSuccess extends ResponseWithBody\n * public abstract class ResponseWithBody implements ResponseMessage\n * public final class StreamFailure implements ResponseMessage\n * public final class StreamRequest implements RequestMessage\n * public final class StreamResponse extends ResponseWithBody\n * public class TransportFrameDecoder extends ChannelInboundHandlerAdapter\n

@SparkQA
Copy link

SparkQA commented Nov 4, 2015

Test build #45036 has finished for PR 9214 at commit 86f468a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class ShuffleMapStatusBlockId(shuffleId: Int, mapId: Int) extends BlockId\n * public class JavaAssociationRulesExample\n * public class JavaPrefixSpanExample\n * public class JavaSimpleFPGrowth\n * class StreamInterceptor implements TransportFrameDecoder.Interceptor\n * public final class ChunkFetchSuccess extends ResponseWithBody\n * public abstract class ResponseWithBody implements ResponseMessage\n * public final class StreamFailure implements ResponseMessage\n * public final class StreamRequest implements RequestMessage\n * public final class StreamResponse extends ResponseWithBody\n * public class TransportFrameDecoder extends ChannelInboundHandlerAdapter\n

@SparkQA
Copy link

SparkQA commented Nov 4, 2015

Test build #45041 has finished for PR 9214 at commit e59df41.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class ShuffleMapStatusBlockId(shuffleId: Int, mapId: Int) extends BlockId\n * public class JavaAssociationRulesExample\n * public class JavaPrefixSpanExample\n * public class JavaSimpleFPGrowth\n * class StreamInterceptor implements TransportFrameDecoder.Interceptor\n * public final class ChunkFetchSuccess extends ResponseWithBody\n * public abstract class ResponseWithBody implements ResponseMessage\n * public final class StreamFailure implements ResponseMessage\n * public final class StreamRequest implements RequestMessage\n * public final class StreamResponse extends ResponseWithBody\n * public class TransportFrameDecoder extends ChannelInboundHandlerAdapter\n

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation.

@JoshRosen
Copy link
Contributor

I think that with "last task wins" you'd still need a lock when opening files for reading & writing to make sure you don't open one task's index file and another task's data file. (a lot of work can happen between opening the data file for writing and opening the index file for writing with the current code, but that can be changed.)

What's worse, I think that this locking scheme would have to coordinate across processes, since we'd need to make sure that the external shuffle service acquires the proper read locks.

I've noticed that this current patch does not employ any locks for reading, but I don't think that's currently a problem:

  • The only case where we would need a lock is to prevent the case where we read the sort-shuffle index file and then have the data file replaced by a concurrent write.
  • Since sort-shuffle only creates one data file, that file will never be overwritten once created.

Does that logic sound right?

@JoshRosen
Copy link
Contributor

I've thought about this some more and there are a few things that I still find unclear. To recap my understanding:

  • We need to guard against both concurrent writes to the same shuffle file and concurrent reads and writes to the shuffle file.
  • Concurrent writes can occur in a few ways:
    • They can't / shouldn't occur via speculation, since the DAGScheduler will not schedule speculative tasks on the same executor as original tasks.
    • They might occur if we cancelled / killed a task and then launched a new attempt of the same task. This could happen because task cancellation is asynchronous.
    • They used to be able to occur if there were multiple concurrent attempts for the same stage, but I believe that this cause has been ruled out by scheduler changes.
  • A shuffle file read that is concurrent with a write to the same file can only occur if map output was lost and recomputations were triggered.
    • In sort-based shuffle, the entire map output is stored in a single file, so an individual map output can't be "partially lost."
    • In hash-based shuffle, it would be possible for a portion of a map's output to be lost in response to a disk failure.

In the discussion above, it seems like executors that re-register after being marked as dead are cited as one possible source of read/write concurrency problems, too.

I have an alternative proposal:

  • Given that concurrent writer are not expected to happen, we should be able to add some strong assertions / checks inside of Executor to be sure of this. We can maintain a set of (mapId, shuffleId) pairs to track which map outputs are currently being computed and can kill the entire executor if we see a duplicate attempt being launched.

  • Since the concurrent read and write problem can only occur if map output is partially lost, handle the loss of the shuffle files themselves by immediately killing the executor. The argument here is that a disk failure or filesystem problem means that we should just hard-stop the executor.

  • Once the master has marked an executor as dead, prevent that executor from re-registering: the executor should kill itself instead. There is some driver-side state associated with each executor that is cleared when an executor is marked as dead and having to reason about the re-creation of this state when re-registering an executor (which has its own internal state) adds complexity and introduces the potential for bugs.

    The only opposing argument that I can think of is performance concerns; my rebuttal:

    • If executor disconnect and re-registration is a super-common occurrence then you should increase the heartbeat timeout.
    • If increasing the heartbeat timeout is undesirable because it means that tasks are not rescheduled soon enough and it's also the case that dead executors frequently return, then users should enable speculation. By the way: this argument also uncovers a bug in our logic for deciding when to enable the OutputCommitCoordinator: due to the ability for tasks to keep running on "failed/dead" executors, jobs which do not use speculation are still vulnerable to output commit races in certain rare failure cases (I'll file a JIRA to fix this).

Summary: in response to rare failure modes, such as disk loss or concurrent writes which are never supposed to happen, we should consider hard-killing executors. It should always be safe to kill an executor; killing executors should only cause performance issues, not correctness issues, and thus we should be able to rely on the ability to kill an executor as a catch-all safety net for handling rare error scenarios or bugs.

Ignoring the question of implementation complexity, would the proposal that I outlined above be a sufficient fix for SPARK-8029?

@squito
Copy link
Contributor Author

squito commented Nov 5, 2015

Hi @JoshRosen

What's worse, I think that this locking scheme would have to coordinate across processes, since we'd need to make sure that the external shuffle service acquires the proper read locks.

I've noticed that this current patch does not employ any locks for reading, but I don't think that's currently a problem:

  • The only case where we would need a lock is to prevent the case where we read the sort-shuffle index file and then have the data file replaced by a concurrent write.
  • Since sort-shuffle only creates one data file, that file will never be overwritten once created.

Does that logic sound right?

Hmm, perhaps, I'd like to be totally clear: (a) assuming that there is no disk error which leads to a file going completely missing, then no files ever get replaced (so the data file can't get replaced after you've opened the index file for reading). (b) if you do consider vanishing files -- then it is possible for the data file to get overwritten if you lose your index file or your mapstatus file. So the worst case scenario is (i) reader open the index file, read the offset (ii) index file vanishes (iii) conflicting attempts finishes, notices the missing index file, writes an index file and overwrites the data file (iv) reader opens the data file, but it reads the wrong location. And on windows, you'll have a problem if the conflicting attempt finishes any time the index or data file is open for reading, a much wider window.

With non-deterministic data, there is another more subtle issue -- even if the second attempt commits its output in between any downstream reads, you can have the downstream stage see a mixture of output for one task. Eg. stage A has two attempts for task 1 (A.1.1 & A.1.2). In stage B, task 1 reads A.1.1, and task 2 reads A.1.2. Might not seem like a big deal, but I doubt it is what a user would expect, it certainly doesn't fit my mental model I started with as a user, and there may be cases where it matters. You might have some random data generating process, then some transformations which split the data up by keys -- but where you expect some relationship to hold between your keys. Maybe some paired sampling process or something. If you read output from different attempts, that relationship could be violated. (Though as I'm writing this I'm realizing that none of the proposed solutions would help with this.)

Concurrent writes can occur in a few ways:

  • They might occur if we cancelled / killed a task and then launched a new attempt of the same task. This could happen because task cancellation is asynchronous.

actually, we do not currently kill running tasks. See SPARK-2666. At one point I think there was some discussion about intentionally not canceling tasks, since they might do useful work, but I think now there is consensus they should be cancelled, it just hasn't been done. I suppose since its just for efficiency, not correctness, it hasn't been focused on. It should be easy, but scheduler changes are always more painful than they seem ...

Adding task cancellation would certainly make the issue here more rare, but it could still occur.

  • They used to be able to occur if there were multiple concurrent attempts for the same stage, but I believe that this cause has been ruled out by scheduler changes.

not exactly. The scheduler prevents concurrent non-zombie attempts for the same stage. But since tasks aren't cancelled on zombification, you've still got concurrent attempts, with one non-zombie and an unlimited number of zombie attempts.

In sort-based shuffle, the entire map output is stored in a single file, so an individual map output can't be "partially lost."

Well, you've still got the index and data file, and one is useless without the other, so I think the output can be partially lost. (and with this change, there would be a mapstatus file as well.)

I have an alternative proposal:
...

  • Once the master has marked an executor as dead, prevent that executor from re-registering: the executor should kill itself instead.

This sounds like a big change in behavior to me. I agree that allowing executors to re-register does add complexity, and it certainly has confused me as a user in the past. But unless there is no way around it, it seems like the type of change we'd want to make configurable till we understood the full ramifications. And in the meantime, we'd still want to get correctness. Eg., I think another way you can lose an executor now is if it gets overwhelmed serving too many shuffle requests -- if one of those requests times out, you get a fetch failure, executor gets removed. Of course, killing the executor shouldn't hurt correctness, I'm just trying to point out that there might be more cases the user needs to deal with for tuning things after that kind of change. Also, removing an executor can actually have a huge effect on performance -- if you run some big 5 stage pipeline for 10 hours, and you there is some transient failure in the 10th hour, you will lose some shuffle output for each of your 5 stages and it may result in a long delay. Even if there aren't many tasks to execute, you'll still need to run the 5 stages serially.

By the way: this argument also uncovers a bug in our logic for deciding when to enable the OutputCommitCoordinator: due to the ability for tasks to keep running on "failed/dead" executors, jobs which do not use speculation are still vulnerable to output commit races in certain rare failure cases (I'll file a JIRA to fix this).

yikes, good call. Especially since we don't actually cancel tasks in zombie stage attempts, this might not be that rare.

would the proposal that I outlined above be a sufficient fix for SPARK-8029?

yes, I do think it would be sufficient. I think you'd want to include task cancellation, SPARK-2666, if you were to kill an executor on duplicate attempts, otherwise it would be far too common.

@SparkQA
Copy link

SparkQA commented Nov 5, 2015

Test build #45098 has finished for PR 9214 at commit c206fc5.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class ShuffleMapStatusBlockId(shuffleId: Int, mapId: Int) extends BlockId\n

@SparkQA
Copy link

SparkQA commented Nov 5, 2015

Test build #45100 has finished for PR 9214 at commit c0edff1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class ShuffleMapStatusBlockId(shuffleId: Int, mapId: Int) extends BlockId\n

@squito
Copy link
Contributor Author

squito commented Nov 9, 2015

@JoshRosen @rxin @mateiz so, what is the path forward here? Josh is correct that this doesn't really correctly handle the case of missing files. But meanwhile, spark is still not fault tolerant. The way I see it, our options are:

  1. Prevent executors from re-registering, as Josh has suggested above. Almost certainly out of scope for 1.6, and I am skeptical that it would even make 1.7.
  2. use this approach despite the flaw when files go missing. It may work even if a file disappears, or it may not. But in other cases, it is still a huge improvement.
  3. Don't even try to handle the case of missing shuffle files, the same way we won't handle corrupted shuffle files. So the ShuffleOutputCoordinator logic would change so that it would not commit outputs if any output file already existed. This would be a behavior change, specifically a reversal of SPARK-4085, but would be a consistent model of fault-tolerance.
  4. Use one shuffle output per attempt. [SPARK-8029][core] shuffleoutput per attempt #6648

@SparkQA
Copy link

SparkQA commented Nov 11, 2015

Test build #45592 has finished for PR 9214 at commit 80e037d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class ShuffleMapStatusBlockId(shuffleId: Int, mapId: Int) extends BlockId\n * public class JavaLBFGSExample\n * class LDA @Since(\"1.6.0\") (\n * case class Metadata(\n * require(className == expectedClassName, s\"Error loading metadata: Expected class name\" +\n * class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T], val offset: Int)\n * class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int, val step: Int)\n

asfgit pushed a commit that referenced this pull request Nov 13, 2015
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem).

This PR is based on #9214 , thanks to squito . Closes #9214

Author: Davies Liu <[email protected]>

Closes #9610 from davies/safe_shuffle.
@asfgit asfgit closed this in ad96088 Nov 13, 2015
dskrvk pushed a commit to dskrvk/spark that referenced this pull request Nov 13, 2015
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem).

This PR is based on apache#9214 , thanks to squito . Closes apache#9214

Author: Davies Liu <[email protected]>

Closes apache#9610 from davies/safe_shuffle.
asfgit pushed a commit that referenced this pull request Nov 13, 2015
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem).

This PR is based on #9214 , thanks to squito

Author: Davies Liu <[email protected]>

Closes #9686 from davies/writer_1.5 and squashes the following commits:

e95fcf5 [Davies Liu] fix test
a6d569e [Davies Liu] fix consolidate
7e83298 [Davies Liu] robust shuffle writer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants