-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20653][core] Add cleaning of old elements from the status store. #19751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This change restores the functionality that keeps a limited number of different types (jobs, stages, etc) depending on configuration, to avoid the store growing indefinitely over time. The feature is implemented by creating a new type (ElementTrackingStore) that wraps a KVStore and allows triggers to be set up for when elements of a certain type meet a certain threshold. Triggers don't need to necessarily only delete elements, but the current API is set up in a way that makes that use case easier. The new store also has a trigger for the "close" call, which makes it easier for listeners to register code for cleaning things up and flushing partial state to the store. The old configurations for cleaning up the stored elements from the core and SQL UIs are now active again, and the old unit tests are re-enabled.
|
For context:
|
|
Test build #83867 has finished for PR 19751 at commit
|
|
@vanzin looks like this PR has conflicts now. |
| write(value) | ||
|
|
||
| if (checkTriggers && !stopped) { | ||
| triggers.get(value.getClass()).foreach { list => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should remove the empty parens after getClass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really prefer this style (since the original method is declared with the parentheses - it's a Java method after all).
|
Test build #83927 has finished for PR 19751 at commit
|
| doAsync { | ||
| val count = store.count(value.getClass()) | ||
| list.foreach { t => | ||
| if (count > t.threshold) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent two spaces
| val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) | ||
| val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS) | ||
| val retainedTasks = conf.get(UI_RETAINED_TASKS) | ||
| val retainedStages = conf.getInt("spark.ui.retainedStages", 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use hard code here? Maybe make the configurations in config.scala public, so that we don't need to write the default values in two places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is being removed in a separate PR.
|
|
||
| private def update(entity: LiveEntity, now: Long): Unit = { | ||
| entity.write(kvstore, now) | ||
| private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe change last to isLast ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer how the current version reads on the call site, e.g.:
update(exec, now, last = true)
Also, Spark generally avoids Java-beans-style prefixes in Scala code (like "is" or "get").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe checkTriggers is better than last?
| def write(store: KVStore, now: Long): Unit = { | ||
| store.write(doUpdate()) | ||
| def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = { | ||
| store.write(doUpdate(), checkTriggers || lastWriteTime == 0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you specify why does it check triggers on the first write?
|
Test build #83957 has finished for PR 19751 at commit
|
|
retest this please |
|
Test build #83975 has finished for PR 19751 at commit
|
|
Not retesting since there will probably be feedback and the failure seems unrelated, so better just wait. |
|
Test build #84318 has finished for PR 19751 at commit
|
|
Test build #84665 has finished for PR 19751 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first pass comments, will go through again this afternoon
| */ | ||
| private[spark] abstract class LiveEntity { | ||
|
|
||
| var lastWriteTime = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: can the initial value be -1 instead? doesn't matter right now, but often in tests we use a manual clock starting at time 0. That would cause problems w/ this default.
| * internal state to the store (e.g. after the SHS finishes parsing an event log). | ||
| * | ||
| * The configured triggers are run on the same thread that triggered the write, after the write | ||
| * has completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they are actually run in a different thread, right? (comment on addTrigger looks correct)
| if (checkTriggers && !stopped) { | ||
| triggers.get(value.getClass()).foreach { list => | ||
| doAsync { | ||
| val count = store.count(value.getClass()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate that this is generic, but isn't this significantly more expensive than just having a special internal variable to track this for each class as you update? I'm imaging a job with tons of very quick tasks.
You could also pull the store.count() out of foreach in case there ever were multiple triggers associated with a class (though I guess there aren't right now).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok on another read, I see that tasks are actually tracked specially and don't use this trigger mechanism. Though, still this api isn't really that useful in the end -- its good for jobs and stages, but not actually the right count for executors, and you don't use it for tasks. still might be easier to just track those other counts directly, without going through kvstore.count()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kvstore.count is pretty cheap in-memory (it's basically a hash table lookup), and cheap enough on a disk store (its cost is dwarfed by the writes that actually trigger the call).
So while I do agree that the interface is not optimal, of the 4 call sites, it handles 3 without need for any special code, and the 4th (cleanupExecutors) would still need to call kvstore.count() or keep that count separately, so this sounds simpler.
| // Because the limit is on the number of *dead* executors, we need to calculate whether | ||
| // there are actually enough dead executors to be deleted. | ||
| val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS) | ||
| val dead = count - activeExecutorCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KVStore has this:
/**
* Returns the number of items of the given type which match the given indexed value.
*/
long count(Class<?> type, String index, Object indexedValue) throws Exception;so with an api change you could get the right number directly from the store. (though this conflicts with my other comment about not using kvstore.count() at all in the trigger, which I think is more important.)
| } | ||
|
|
||
| /** Turns a KVStoreView into a Scala sequence, applying a filter. */ | ||
| def viewToSeq[T]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like you don't actually need a sequence at all in any of the call sites, you could just use an iterator. I'm thinking about the price of that say if you're cleaning up 100k tasks repeatedly.
This does give you a nice spot to include iter.close(), but I think you could change this to foreachWithMaxFilterClose or something to avoid ever creating the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I create a list explicitly to avoid consistency issues when deleting these elements. If I had an iterator instead, and I then called kvstore.delete, you could get a ConcurrentModificationException.
Since the cleanup code deletes more than necessary to just respect the limit (to avoid having to do this every time you write something), hopefully the cost is amortized a little.
| // On live applications, try to delete finished tasks only; when in the SHS, treat all | ||
| // tasks as the same. | ||
| val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t => | ||
| !live || t.info.status != TaskState.RUNNING.toString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the SHS, wouldn't you still prefer to delete finished tasks over live ones? in all cases, should you really just try to delete finished tasks first, but still delete running tasks if need be?
I don't see any filter like this in the old code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old code deletes tasks in the order they arrive; it would be expensive to do that here since it would involve sorting the task list (cheap for disk store, expensive for in-memory).
I can keep the same filter behavior for both.
| } | ||
|
|
||
| // Start 3 stages, all should be kept. Stop 2 of them, the oldest stopped one should be | ||
| // deleted. Start a new attempt of the second stopped one, and verify that the stage graph |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oldest meaning smallest id? or ordered they are submitted? with a non-linear stage DAG, the ordering of ids, start-time, & end-time can be ordered arbitrarily. Ids will correspond to submission order, but then stage retries complicates that.
I guess I'm just trying to make sure I understand how the kvstore works, and if there is some important part I'm missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no DAG here, the test controls what "oldest" means. In this case "oldest" = "first stage in the list", which is also "smallest id", which is the actual behavior of the listener.
|
|
||
| if (needReplay) { | ||
| val trackingStore = new ElementTrackingStore(kvstore, conf) | ||
| val listener = if (needReplay) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
listener is unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok after anohter pass, overall I think this is good. the only somewhat important comment I have is about the filtering done on tasks for cleanup, how its different from before, and why its different live vs. shs
|
Test build #84885 has finished for PR 19751 at commit
|
|
Test build #84909 has finished for PR 19751 at commit
|
| } | ||
|
|
||
| case _ => | ||
| (new InMemoryStore(), true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to this change, comment is really for val _replay = !path.isDirectory() but github won't let me put comment there ...
I know we discussed this when you made this change, but still I was confused reading this bit of code on replay. Maybe could you just add a comment above that line like "the kvstore is deleted when we decide that the loaded data is stale -- see LoadedAppUI for a more extensive discussion of the lifecycle".
doesn't seem worth a separate jira / pr just for that comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
Test build #84931 has finished for PR 19751 at commit
|
|
LGTM |
|
|
||
| kvstore.onFlush { | ||
| if (!live) { | ||
| flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, why only flush for history server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the store is only closed on live applications when the context is shut down. So there's no more UI for you to see this.
|
|
||
| import config._ | ||
|
|
||
| private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a mutable map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Man, this has been pushed... I'd appreciate reviews more before they are pushed.
| executor.shutdownNow() | ||
| } | ||
|
|
||
| flushTriggers.foreach { trigger => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush sounds like we would do it periodicly, how about closeTriggers?
| store.write(doUpdate()) | ||
| def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = { | ||
| // Always check triggers on the first write, since adding an element to the store may | ||
| // cause the maximum count for the element type to be exceeded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, for the first write, how can it trigger maximum count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple jobs, multiple stages, multiple tasks, etc, etc, etc.
This change restores the functionality that keeps a limited number of
different types (jobs, stages, etc) depending on configuration, to avoid
the store growing indefinitely over time.
The feature is implemented by creating a new type (ElementTrackingStore)
that wraps a KVStore and allows triggers to be set up for when elements
of a certain type meet a certain threshold. Triggers don't need to
necessarily only delete elements, but the current API is set up in a way
that makes that use case easier.
The new store also has a trigger for the "close" call, which makes it
easier for listeners to register code for cleaning things up and flushing
partial state to the store.
The old configurations for cleaning up the stored elements from the core
and SQL UIs are now active again, and the old unit tests are re-enabled.