-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-4029][Streaming] Update streaming driver to reliably save and recover received block metadata on driver failures #3026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@pwendell @JoshRosen @harishreedharan |
|
Test build #22571 has started for PR 3026 at commit
|
|
Test build #22571 has finished for PR 3026 at commit
|
|
Test FAILed. |
|
Test build #22572 has started for PR 3026 at commit
|
|
Test build #22572 has finished for PR 3026 at commit
|
|
Test FAILed. |
|
Test build #22574 has started for PR 3026 at commit
|
|
Test build #22574 has finished for PR 3026 at commit
|
|
Test FAILed. |
|
Jenkins, test this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the functionality to keep track of block-to-batch allocations have been moved from ReceiverInputDStream to ReceivedBlockTracker, so that all actions on the block metadata (include block-to-batch allocations) can be logged at a central location.
|
Test build #22585 has started for PR 3026 at commit
|
|
Test build #22587 has started for PR 3026 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the functionality to keep track of received block metadata have been moved from ReceiverTracker to ReceivedBlockTracker, so that all actions on the block metadata (include block-to-batch allocations) can be logged at a central location.
|
Test build #22585 has finished for PR 3026 at commit
|
|
Test PASSed. |
|
Test build #22587 has finished for PR 3026 at commit
|
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about naming this something like LogEvent? It wasn't clear to me when I looked at this what it meant by "Action".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about ReceivedBlockTrackerLogEvent ? I am not so sure about giving a such a generic name LogEvent; it becomes hard to immediately identify what module such this class is related to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure - having LogEvent somewhere in there would just be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this exposed only for testing? If so, can you note that down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
…aner for users of the tracker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a really dense expression. Can this be broken out into simpler experesssions that make it easier to read?
val streamsWithBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(_ => true))
}
val streamToBlocks = streamsWithBlocks.toMap
val allocatedBlocks = AllocatedBlocks(streamToBlocks)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, I had felt the same so had changed it. With your other suggestion incorporated, its cleaner.
|
This looks good overall. I like the clean-up on the allocation code path. Just left minor comments. |
|
Test build #22652 has started for PR 3026 at commit
|
|
Test build #22654 has started for PR 3026 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the receiver tracker read from HDFS each time getBlocksOfBatch is called (sorry, I don't remember if it does)? If it does, then this call incurs more HDFS reads than required when there are several streams in the same app, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore this. Verified it does not.
|
This looks good. Apart from the one question I had above, this looks good to go |
|
Test build #22654 has finished for PR 3026 at commit
|
|
Test PASSed. |
|
Test build #22652 timed out for PR 3026 at commit |
|
Test FAILed. |
|
Test build #22895 has started for PR 3026 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering - why does this need to be set here? Who consumes this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added when SparkEnv needed to be set for launching jobs on non-main threads. Since the JobGenerator is background thread which actually submits the jobs, the SparkEnv needed to be set. But since we have removed the whole threadlocal stuff from SparkEnv, this is probably not needed any more. we can either removed this (scary), or document this as potentially removable.
|
Hey @tdas this LGTM. The only question was around setting of the SparkEnv... it might be good to document what consumes that downstream. |
|
Test build #22895 has finished for PR 3026 at commit
|
|
Test PASSed. |
|
@pwendell added comment. Merging this. Thanks @pwendell and @JoshRosen for reviewing this PR and the previous ones for the streaming driver HA. |
|
Test build #22906 has started for PR 3026 at commit
|
|
Test build #22906 has finished for PR 3026 at commit
|
|
Test PASSed. |
…recover received block metadata on driver failures As part of the initiative of preventing data loss on driver failure, this JIRA tracks the sub task of modifying the streaming driver to reliably save received block metadata, and recover them on driver restart. This was solved by introducing a `ReceivedBlockTracker` that takes all the responsibility of managing the metadata of received blocks (i.e. `ReceivedBlockInfo`, and any actions on them (e.g, allocating blocks to batches, etc.). All actions to block info get written out to a write ahead log (using `WriteAheadLogManager`). On recovery, all the actions are replaying to recreate the pre-failure state of the `ReceivedBlockTracker`, which include the batch-to-block allocations and the unallocated blocks. Furthermore, the `ReceiverInputDStream` was modified to create `WriteAheadLogBackedBlockRDD`s when file segment info is present in the `ReceivedBlockInfo`. After recovery of all the block info (through recovery `ReceivedBlockTracker`), the `WriteAheadLogBackedBlockRDD`s gets recreated with the recovered info, and jobs submitted. The data of the blocks gets pulled from the write ahead logs, thanks to the segment info present in the `ReceivedBlockInfo`. This is still a WIP. Things that are missing here are. - *End-to-end integration tests:* Unit tests that tests the driver recovery, by killing and restarting the streaming context, and verifying all the input data gets processed. This has been implemented but not included in this PR yet. A sneak peek of that DriverFailureSuite can be found in this PR (on my personal repo): tdas#25 I can either include it in this PR, or submit that as a separate PR after this gets in. - *WAL cleanup:* Cleaning up the received data write ahead log, by calling `ReceivedBlockHandler.cleanupOldBlocks`. This is being worked on. Author: Tathagata Das <[email protected]> Closes #3026 from tdas/driver-ha-rbt and squashes the following commits: a8009ed [Tathagata Das] Added comment 1d704bb [Tathagata Das] Enabled storing recovered WAL-backed blocks to BM 2ee2484 [Tathagata Das] More minor changes based on PR 47fc1e3 [Tathagata Das] Addressed PR comments. 9a7e3e4 [Tathagata Das] Refactored ReceivedBlockTracker API a bit to make things a little cleaner for users of the tracker. af63655 [Tathagata Das] Minor changes. fce2b21 [Tathagata Das] Removed commented lines 59496d3 [Tathagata Das] Changed class names, made allocation more explicit and added cleanup 19aec7d [Tathagata Das] Fixed casting bug. f66d277 [Tathagata Das] Fix line lengths. cda62ee [Tathagata Das] Added license 25611d6 [Tathagata Das] Minor changes before submitting PR 7ae0a7f [Tathagata Das] Transferred changes from driver-ha-working branch (cherry picked from commit 5f13759) Signed-off-by: Tathagata Das <[email protected]>
As part of the initiative of preventing data loss on driver failure, this JIRA tracks the sub task of modifying the streaming driver to reliably save received block metadata, and recover them on driver restart.
This was solved by introducing a
ReceivedBlockTrackerthat takes all the responsibility of managing the metadata of received blocks (i.e.ReceivedBlockInfo, and any actions on them (e.g, allocating blocks to batches, etc.). All actions to block info get written out to a write ahead log (usingWriteAheadLogManager). On recovery, all the actions are replaying to recreate the pre-failure state of theReceivedBlockTracker, which include the batch-to-block allocations and the unallocated blocks.Furthermore, the
ReceiverInputDStreamwas modified to createWriteAheadLogBackedBlockRDDs when file segment info is present in theReceivedBlockInfo. After recovery of all the block info (through recoveryReceivedBlockTracker), theWriteAheadLogBackedBlockRDDs gets recreated with the recovered info, and jobs submitted. The data of the blocks gets pulled from the write ahead logs, thanks to the segment info present in theReceivedBlockInfo.This is still a WIP. Things that are missing here are.
ReceivedBlockHandler.cleanupOldBlocks. This is being worked on.