Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Oct 30, 2014

As part of the initiative of preventing data loss on driver failure, this JIRA tracks the sub task of modifying the streaming driver to reliably save received block metadata, and recover them on driver restart.

This was solved by introducing a ReceivedBlockTracker that takes all the responsibility of managing the metadata of received blocks (i.e. ReceivedBlockInfo, and any actions on them (e.g, allocating blocks to batches, etc.). All actions to block info get written out to a write ahead log (using WriteAheadLogManager). On recovery, all the actions are replaying to recreate the pre-failure state of the ReceivedBlockTracker, which include the batch-to-block allocations and the unallocated blocks.

Furthermore, the ReceiverInputDStream was modified to create WriteAheadLogBackedBlockRDDs when file segment info is present in the ReceivedBlockInfo. After recovery of all the block info (through recovery ReceivedBlockTracker), the WriteAheadLogBackedBlockRDDs gets recreated with the recovered info, and jobs submitted. The data of the blocks gets pulled from the write ahead logs, thanks to the segment info present in the ReceivedBlockInfo.

This is still a WIP. Things that are missing here are.

  • End-to-end integration tests: Unit tests that tests the driver recovery, by killing and restarting the streaming context, and verifying all the input data gets processed. This has been implemented but not included in this PR yet. A sneak peek of that DriverFailureSuite can be found in this PR (on my personal repo): Integration test that tests driver failure with receivers in an end-to-end manner tdas/spark#25 I can either include it in this PR, or submit that as a separate PR after this gets in.
  • WAL cleanup: Cleaning up the received data write ahead log, by calling ReceivedBlockHandler.cleanupOldBlocks. This is being worked on.

@tdas
Copy link
Contributor Author

tdas commented Oct 30, 2014

@pwendell @JoshRosen @harishreedharan
This is the final PR of the driver HA core feature. Please take a look!
Also, since is a WIP, there will be missing docs, style issue, extra printlns here and there. Please try to focus on the high level logic in the first pass.

@SparkQA
Copy link

SparkQA commented Oct 30, 2014

Test build #22571 has started for PR 3026 at commit 25611d6.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 30, 2014

Test build #22571 has finished for PR 3026 at commit 25611d6.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class WriteAheadLogBackedBlockRDDPartition(
    • class WriteAheadLogBackedBlockRDD[T: ClassTag](
    • case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]])
    • class ReceivedBlockTracker(
    • class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22571/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 30, 2014

Test build #22572 has started for PR 3026 at commit cda62ee.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 30, 2014

Test build #22572 has finished for PR 3026 at commit cda62ee.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]])
    • class ReceivedBlockTracker(
    • class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22572/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 30, 2014

Test build #22574 has started for PR 3026 at commit f66d277.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22574 has finished for PR 3026 at commit f66d277.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]])
    • class ReceivedBlockTracker(
    • class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22574/
Test FAILed.

@tdas
Copy link
Contributor Author

tdas commented Oct 31, 2014

Jenkins, test this please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the functionality to keep track of block-to-batch allocations have been moved from ReceiverInputDStream to ReceivedBlockTracker, so that all actions on the block metadata (include block-to-batch allocations) can be logged at a central location.

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22585 has started for PR 3026 at commit 19aec7d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22587 has started for PR 3026 at commit 19aec7d.

  • This patch merges cleanly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the functionality to keep track of received block metadata have been moved from ReceiverTracker to ReceivedBlockTracker, so that all actions on the block metadata (include block-to-batch allocations) can be logged at a central location.

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22585 has finished for PR 3026 at commit 19aec7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]])
    • class ReceivedBlockTracker(
    • class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22585/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22587 has finished for PR 3026 at commit 19aec7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]])
    • class ReceivedBlockTracker(
    • class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22587/
Test PASSed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about naming this something like LogEvent? It wasn't clear to me when I looked at this what it meant by "Action".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about ReceivedBlockTrackerLogEvent ? I am not so sure about giving a such a generic name LogEvent; it becomes hard to immediately identify what module such this class is related to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - having LogEvent somewhere in there would just be helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this exposed only for testing? If so, can you note that down?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really dense expression. Can this be broken out into simpler experesssions that make it easier to read?

val streamsWithBlocks = streamIds.map { streamId =>
  (streamId, getReceivedBlockQueue(streamId).dequeueAll(_ => true))
}
val streamToBlocks = streamsWithBlocks.toMap
val allocatedBlocks = AllocatedBlocks(streamToBlocks)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, I had felt the same so had changed it. With your other suggestion incorporated, its cleaner.

@pwendell
Copy link
Contributor

This looks good overall. I like the clean-up on the allocation code path. Just left minor comments.

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22652 has started for PR 3026 at commit 47fc1e3.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22654 has started for PR 3026 at commit 2ee2484.

  • This patch merges cleanly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the receiver tracker read from HDFS each time getBlocksOfBatch is called (sorry, I don't remember if it does)? If it does, then this call incurs more HDFS reads than required when there are several streams in the same app, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore this. Verified it does not.

@harishreedharan
Copy link
Contributor

This looks good. Apart from the one question I had above, this looks good to go

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22654 has finished for PR 3026 at commit 2ee2484.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]])
    • class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22654/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #22652 timed out for PR 3026 at commit 47fc1e3 after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22652/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Nov 4, 2014

Test build #22895 has started for PR 3026 at commit 1d704bb.

  • This patch merges cleanly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering - why does this need to be set here? Who consumes this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added when SparkEnv needed to be set for launching jobs on non-main threads. Since the JobGenerator is background thread which actually submits the jobs, the SparkEnv needed to be set. But since we have removed the whole threadlocal stuff from SparkEnv, this is probably not needed any more. we can either removed this (scary), or document this as potentially removable.

@pwendell
Copy link
Contributor

pwendell commented Nov 4, 2014

Hey @tdas this LGTM. The only question was around setting of the SparkEnv... it might be good to document what consumes that downstream.

@SparkQA
Copy link

SparkQA commented Nov 4, 2014

Test build #22895 has finished for PR 3026 at commit 1d704bb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]])
    • class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22895/
Test PASSed.

@tdas
Copy link
Contributor Author

tdas commented Nov 5, 2014

@pwendell added comment. Merging this. Thanks @pwendell and @JoshRosen for reviewing this PR and the previous ones for the streaming driver HA.

@SparkQA
Copy link

SparkQA commented Nov 5, 2014

Test build #22906 has started for PR 3026 at commit a8009ed.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 5, 2014

Test build #22906 has finished for PR 3026 at commit a8009ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]])
    • class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22906/
Test PASSed.

@asfgit asfgit closed this in 5f13759 Nov 5, 2014
asfgit pushed a commit that referenced this pull request Nov 5, 2014
…recover received block metadata on driver failures

As part of the initiative of preventing data loss on driver failure, this JIRA tracks the sub task of modifying the streaming driver to reliably save received block metadata, and recover them on driver restart.

This was solved by introducing a `ReceivedBlockTracker` that takes all the responsibility of managing the metadata of received blocks (i.e. `ReceivedBlockInfo`, and any actions on them (e.g, allocating blocks to batches, etc.). All actions to block info get written out to a write ahead log (using `WriteAheadLogManager`). On recovery, all the actions are replaying to recreate the pre-failure state of the `ReceivedBlockTracker`, which include the batch-to-block allocations and the unallocated blocks.

Furthermore, the `ReceiverInputDStream` was modified to create `WriteAheadLogBackedBlockRDD`s when file segment info is present in the `ReceivedBlockInfo`. After recovery of all the block info (through recovery `ReceivedBlockTracker`), the `WriteAheadLogBackedBlockRDD`s gets recreated with the recovered info, and jobs submitted. The data of the blocks gets pulled from the write ahead logs, thanks to the segment info present in the `ReceivedBlockInfo`.

This is still a WIP. Things that are missing here are.

- *End-to-end integration tests:* Unit tests that tests the driver recovery, by killing and restarting the streaming context, and verifying all the input data gets processed. This has been implemented but not included in this PR yet. A sneak peek of that DriverFailureSuite can be found in this PR (on my personal repo): tdas#25 I can either include it in this PR, or submit that as a separate PR after this gets in.

- *WAL cleanup:* Cleaning up the received data write ahead log, by calling `ReceivedBlockHandler.cleanupOldBlocks`. This is being worked on.

Author: Tathagata Das <[email protected]>

Closes #3026 from tdas/driver-ha-rbt and squashes the following commits:

a8009ed [Tathagata Das] Added comment
1d704bb [Tathagata Das] Enabled storing recovered WAL-backed blocks to BM
2ee2484 [Tathagata Das] More minor changes based on PR
47fc1e3 [Tathagata Das] Addressed PR comments.
9a7e3e4 [Tathagata Das] Refactored ReceivedBlockTracker API a bit to make things a little cleaner for users of the tracker.
af63655 [Tathagata Das] Minor changes.
fce2b21 [Tathagata Das] Removed commented lines
59496d3 [Tathagata Das] Changed class names, made allocation more explicit and added cleanup
19aec7d [Tathagata Das] Fixed casting bug.
f66d277 [Tathagata Das] Fix line lengths.
cda62ee [Tathagata Das] Added license
25611d6 [Tathagata Das] Minor changes before submitting PR
7ae0a7f [Tathagata Das] Transferred changes from driver-ha-working branch

(cherry picked from commit 5f13759)
Signed-off-by: Tathagata Das <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants