-
Notifications
You must be signed in to change notification settings - Fork 1
[SPARK-4899][MESOS] Enable mesos checkpointing #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
getOption(key).map(_.toBoolean).getOrElse(defaultValue) | ||
} | ||
/** Get a parameter as a Option[Boolean] */ | ||
def getOptionBoolean(key: String): Option[Boolean] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need these. Just have the defaults retain the prior behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was unsure about copying the defaults from the mesos protos(they were set to None earlier which mesos translates as defaults), and FrameworkInfo.setCheckpoint()
expects an Option[Boolean] and there was no helper function that extracted Option[Boolean] from SparkConf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can do this:
sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xanec I drifted away from this because I wasn't sure about the behavior in case of non matching types(exception), but then noticed that getDouble
and getLong
also throw exception and do not default to the default value (bug or design ?).
So if they don't handle exceptions, then I can use what you suggested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it is better to throw exception for non matching type rather than silently revert to default value. Could be bug prone during operation.
For testing the check-pointing functionality, I think we can only test as far as checking the protobuf message (i.e., I know this is a bad practice but maybe we have to resort to using reflection to get the private |
Alternatively, we could modify |
@xanec I'll start off by having a simple test to check if |
It would usually be called |
Note: we'll probably want to backport this to the v2.1.0-mmx branch |
frameworkId: Option[String] = None): SchedulerDriver = { | ||
markRegistered() | ||
assert(checkpoint.equals(true)) | ||
assert(failoverTimeout.equals(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose these checks are sufficient to ensure that the values are passed correctly to createSchedulerDriver
. We don't want to check if the MesosSchedulerDriver
is created with the correct values? I.e., by inspecting the Protos.FrameworkInfo
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, does this work? checkpoint
in createSchedulerDriver
is Option[Boolean]
not Boolean
. Similarly for failoverTimeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I am still trying to run dev/run-tests on the entire project, but it keeps failing on some random Hive tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They should probably be:
assert(checkpoint.get.equals(true))
assert(failoverTimeout.get.equals(10.0))
Note that failoverTimeout
when "unboxed" is a Double
frameworkId: Option[String] = None): SchedulerDriver = { | ||
markRegistered() | ||
assert(checkpoint.equals(true)) | ||
assert(failoverTimeout.equals(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be:
assert(failoverTimeout.get.equals(10.0))
frameworkId: Option[String] = None): SchedulerDriver = { | ||
markRegistered() | ||
assert(checkpoint.equals(true)) | ||
assert(failoverTimeout.equals(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They should probably be:
assert(checkpoint.get.equals(true))
assert(failoverTimeout.get.equals(10.0))
Note that failoverTimeout
when "unboxed" is a Double
docs/running-on-mesos.md
Outdated
<td><code>spark.mesos.failoverTimeout</code></td> | ||
<td>0.0</td> | ||
<td> | ||
The amount of time (in seconds) that the master will wait for thescheduler to failover before it tears down the framework |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo thescheduler
-> the scheduler
docs/running-on-mesos.md
Outdated
<td>0.0</td> | ||
<td> | ||
The amount of time (in seconds) that the master will wait for thescheduler to failover before it tears down the framework | ||
by killing all its tasks/executors. This should be non-zero if aframework expects to reconnect after a failure and not lose |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo aframework
-> a framework
frameworkId: Option[String] = None): SchedulerDriver = { | ||
markRegistered() | ||
assert(checkpoint.contains(true)) | ||
assert(failoverTimeout.contains(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this does not cause the test to fail but probably we should use 10.0
here as well, just for conformity. Incidentally, Option.contains
uses ==
instead of equals
for comparison, that's why it passes; I did not know that 😅
Spoke with @gkc2104 , he's going to squash these into a different PR against |
Should also put in PR against upstream spark |
Squashed and backported in #28 |
What changes were proposed in this pull request?
https://metamarkets.atlassian.net/browse/BACKEND-655
https://issues.apache.org/jira/browse/SPARK-4899
apache#60
How was this patch tested?
Please review http://spark.apache.org/contributing.html before opening a pull request.
@jisookim0513 @xanec
I need some help figuring out how to unit test the checkpointing functionality as it is mesos dependent
And I'm not sure if extracting the config values the way I proposed is the best way of doing it