-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-8029][core] first successful shuffle task always wins #9214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #44124 has finished for PR 9214 at commit
|
|
Test build #44149 has finished for PR 9214 at commit
|
|
Test build #44155 has finished for PR 9214 at commit
|
ShuffleOutputCoordinator to atomically move w/ "first one wins"
bd815d7 to
550e198
Compare
|
Test build #44168 has finished for PR 9214 at commit
|
|
cc @JoshRosen |
|
this isn't quite ready yet ... still working through test failures. I think the remaining changes are to the tests, but need to work through those and then some cleanup ... |
|
Test build #44229 has finished for PR 9214 at commit
|
|
Test build #44231 has finished for PR 9214 at commit
|
|
@JoshRosen @rxin I'm going to see if I can do a little cleanup still, but the functionality is there now. However, I did realize there are some open questions in the logic, particularly wrt to non-deterministic data. (a) its possible for shuffle files generated in one attempt to not be generated in another attempt. Eg., if the data is empty, a sort based shuffle may not generate any output file. Similarly, the hash based shuffle may not generate a file if the output for a particular (map,reduce) pair is empty. Furthermore, based on the test case for SPARK-4085 if one shuffle output file is missing, we should consider the entire shuffle output missing and regenerate. You could imagine some degenerate cases, eg., attempt 1 creates outputs a & b, attempt 2 creates outputs b & c, and so attempt 2 overwrites attempt 1, even though its not really the first attempt. (b) what should the MapStatus output of the uncommitted attempt be? With deterministic data it doesn't really matter, at least to fix SPARK-8029. The output sizes will be approximately the same and that is really good enough. But with non-determinstic data, its possible that attempt1 gets committed, but then attempt2 creates a MapStatus with some zero-sized blocks where attempt1 had non-zero sized blocks. The map status from attempt2 is used, so the shuffle-fetch side decides to completely skip reading those blocks. Some possible solutions are:
I assume (1) is not an option. I'll implement (2) but wanted to see if you have any thoughts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen I'm pretty sure that BypassMergeSortShuffleWriter actually wasn't writing any data file if there was an empty iterator. This test was passing just b/c the test setup created the file w/ File.createTempFile. So the change here brings it into line with the implementation (either write no file at all, or write a zero-length file) but given my comment on non-determinism I will change this to always write a zero length file.
|
Test build #44235 has finished for PR 9214 at commit
|
|
Test build #44251 has finished for PR 9214 at commit
|
|
Well, as I've done some more testing and taken a closer look at the some of the corner cases, I don't think its easy to get this working. The major problem comes from when executors are lost but they come back. Say executor 1 generates some shuffle output, then for whatever reason the driver thinks its lost the executor. The driver unregisters the shuffle output, then queues up some tasks to regenerate that shuffle output. Meanwhile the executor comes back, and thanks to locality preferences its likely to get some of those same tasks again. But the We could send back the map status based on the new (but uncommitted) shuffle output, but then it could be wrong about the zero-sized blocks if the data is non-deterministic. We could have the And, that still wouldn't solve the problem if the shuffle output files got corrupted somehow. You'd get a fetch failure, the executor would get removed, and then it would come right back. The same tasks would get scheduled on it, but it would never replace the shuffle output. You'd just spin till you get your 4 failures, and the job would fail. To get around this, the driver could add some logic to prevent executors from re-registering, but to me that sounds like a much bigger change that needs to be thought through very carefully. In my experience, it is actually much more common that an executor gets removed and then comes back, rather than a node being permanently unreachable. There is just a long GC or something, the executor gets removed, but then it re-registers w/ the master. (And there are unit tests in "local" mode which simulate the executor getting removed, but of course its the same executor which comes right back.) |
|
Hey so I'm curious about two things here:
Regarding shuffle files getting corrupted somehow, I think this is super unlikely and I haven't seen many systems try to defend against this. If this were an issue, we'd also have to worry about data cached with DISK_ONLY being corrupted, etc. I think this is considered in systems like HDFS because they store a huge amount of data for a very long time, but I don't think it's a major problem in Spark, and we can always add checksums later if we see it happen. |
|
Hi @mateiz, thanks for taking a look
I suppose you are right that if a DISK_ONLY file gets corrupted, your entire spark app is also doomed. But that seems unfortunate to me, not something that we want to emulate. We already see that users are running really long spark apps, on ever increasing cluster sizes (consider #9246). Maybe its very uncommon so its not worth re-engineering things just for that. But I do still feel that even though having one output per attempt is a slightly bigger change right now, the changes are mostly plumbing, and it makes for something that is safer and much easier to reason to about. |
Conflicts: core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
|
Maybe just go for version 2) above then, it seems like the simplest one. Regarding re-engineering vs not, the problem is that if you're trying to do a bug fix, you should introduce the least complexity possible. With fault tolerance in particular it's possible to imagine lots of conditions that don't really happen. For example, what if network messages get corrupted? What if DRAM gets corrupted? You just need to pick a failure model (e.g. do we trust the filesystem to be reliable or not) that fits your observations and make sure things are correct within that model. |
|
Test build #45040 has finished for PR 9214 at commit
|
|
Test build #45036 has finished for PR 9214 at commit
|
|
Test build #45041 has finished for PR 9214 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation.
What's worse, I think that this locking scheme would have to coordinate across processes, since we'd need to make sure that the external shuffle service acquires the proper read locks. I've noticed that this current patch does not employ any locks for reading, but I don't think that's currently a problem:
Does that logic sound right? |
|
I've thought about this some more and there are a few things that I still find unclear. To recap my understanding:
In the discussion above, it seems like executors that re-register after being marked as dead are cited as one possible source of read/write concurrency problems, too. I have an alternative proposal:
Summary: in response to rare failure modes, such as disk loss or concurrent writes which are never supposed to happen, we should consider hard-killing executors. It should always be safe to kill an executor; killing executors should only cause performance issues, not correctness issues, and thus we should be able to rely on the ability to kill an executor as a catch-all safety net for handling rare error scenarios or bugs. Ignoring the question of implementation complexity, would the proposal that I outlined above be a sufficient fix for SPARK-8029? |
|
Hi @JoshRosen
Hmm, perhaps, I'd like to be totally clear: (a) assuming that there is no disk error which leads to a file going completely missing, then no files ever get replaced (so the data file can't get replaced after you've opened the index file for reading). (b) if you do consider vanishing files -- then it is possible for the data file to get overwritten if you lose your index file or your mapstatus file. So the worst case scenario is (i) reader open the index file, read the offset (ii) index file vanishes (iii) conflicting attempts finishes, notices the missing index file, writes an index file and overwrites the data file (iv) reader opens the data file, but it reads the wrong location. And on windows, you'll have a problem if the conflicting attempt finishes any time the index or data file is open for reading, a much wider window. With non-deterministic data, there is another more subtle issue -- even if the second attempt commits its output in between any downstream reads, you can have the downstream stage see a mixture of output for one task. Eg. stage A has two attempts for task 1 (A.1.1 & A.1.2). In stage B, task 1 reads A.1.1, and task 2 reads A.1.2. Might not seem like a big deal, but I doubt it is what a user would expect, it certainly doesn't fit my mental model I started with as a user, and there may be cases where it matters. You might have some random data generating process, then some transformations which split the data up by keys -- but where you expect some relationship to hold between your keys. Maybe some paired sampling process or something. If you read output from different attempts, that relationship could be violated. (Though as I'm writing this I'm realizing that none of the proposed solutions would help with this.)
actually, we do not currently kill running tasks. See SPARK-2666. At one point I think there was some discussion about intentionally not canceling tasks, since they might do useful work, but I think now there is consensus they should be cancelled, it just hasn't been done. I suppose since its just for efficiency, not correctness, it hasn't been focused on. It should be easy, but scheduler changes are always more painful than they seem ... Adding task cancellation would certainly make the issue here more rare, but it could still occur.
not exactly. The scheduler prevents concurrent non-zombie attempts for the same stage. But since tasks aren't cancelled on zombification, you've still got concurrent attempts, with one non-zombie and an unlimited number of zombie attempts.
Well, you've still got the index and data file, and one is useless without the other, so I think the output can be partially lost. (and with this change, there would be a mapstatus file as well.)
This sounds like a big change in behavior to me. I agree that allowing executors to re-register does add complexity, and it certainly has confused me as a user in the past. But unless there is no way around it, it seems like the type of change we'd want to make configurable till we understood the full ramifications. And in the meantime, we'd still want to get correctness. Eg., I think another way you can lose an executor now is if it gets overwhelmed serving too many shuffle requests -- if one of those requests times out, you get a fetch failure, executor gets removed. Of course, killing the executor shouldn't hurt correctness, I'm just trying to point out that there might be more cases the user needs to deal with for tuning things after that kind of change. Also, removing an executor can actually have a huge effect on performance -- if you run some big 5 stage pipeline for 10 hours, and you there is some transient failure in the 10th hour, you will lose some shuffle output for each of your 5 stages and it may result in a long delay. Even if there aren't many tasks to execute, you'll still need to run the 5 stages serially.
yikes, good call. Especially since we don't actually cancel tasks in zombie stage attempts, this might not be that rare.
yes, I do think it would be sufficient. I think you'd want to include task cancellation, SPARK-2666, if you were to kill an executor on duplicate attempts, otherwise it would be far too common. |
|
Test build #45098 has finished for PR 9214 at commit
|
|
Test build #45100 has finished for PR 9214 at commit
|
|
@JoshRosen @rxin @mateiz so, what is the path forward here? Josh is correct that this doesn't really correctly handle the case of missing files. But meanwhile, spark is still not fault tolerant. The way I see it, our options are:
|
|
Test build #45592 has finished for PR 9214 at commit
|
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem). This PR is based on #9214 , thanks to squito . Closes #9214 Author: Davies Liu <[email protected]> Closes #9610 from davies/safe_shuffle.
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem). This PR is based on apache#9214 , thanks to squito . Closes apache#9214 Author: Davies Liu <[email protected]> Closes apache#9610 from davies/safe_shuffle.
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem). This PR is based on #9214 , thanks to squito Author: Davies Liu <[email protected]> Closes #9686 from davies/writer_1.5 and squashes the following commits: e95fcf5 [Davies Liu] fix test a6d569e [Davies Liu] fix consolidate 7e83298 [Davies Liu] robust shuffle writer
Shuffle writers now write to temp files, and when they are done, they atomically move those files into the final location if those files don't already exist. This way, if one executor ends up executing more than one task to generate shuffle output for one partition, the first successful one "wins", and all others are ignored.