Skip to content

Conversation

@vanzin
Copy link
Contributor

@vanzin vanzin commented Nov 14, 2017

This change restores the functionality that keeps a limited number of
different types (jobs, stages, etc) depending on configuration, to avoid
the store growing indefinitely over time.

The feature is implemented by creating a new type (ElementTrackingStore)
that wraps a KVStore and allows triggers to be set up for when elements
of a certain type meet a certain threshold. Triggers don't need to
necessarily only delete elements, but the current API is set up in a way
that makes that use case easier.

The new store also has a trigger for the "close" call, which makes it
easier for listeners to register code for cleaning things up and flushing
partial state to the store.

The old configurations for cleaning up the stored elements from the core
and SQL UIs are now active again, and the old unit tests are re-enabled.

This change restores the functionality that keeps a limited number of
different types (jobs, stages, etc) depending on configuration, to avoid
the store growing indefinitely over time.

The feature is implemented by creating a new type (ElementTrackingStore)
that wraps a KVStore and allows triggers to be set up for when elements
of a certain type meet a certain threshold. Triggers don't need to
necessarily only delete elements, but the current API is set up in a way
that makes that use case easier.

The new store also has a trigger for the "close" call, which makes it
easier for listeners to register code for cleaning things up and flushing
partial state to the store.

The old configurations for cleaning up the stored elements from the core
and SQL UIs are now active again, and the old unit tests are re-enabled.
@vanzin
Copy link
Contributor Author

vanzin commented Nov 14, 2017

@SparkQA
Copy link

SparkQA commented Nov 15, 2017

Test build #83867 has finished for PR 19751 at commit 8c346a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

@vanzin looks like this PR has conflicts now.

write(value)

if (checkTriggers && !stopped) {
triggers.get(value.getClass()).foreach { list =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should remove the empty parens after getClass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really prefer this style (since the original method is declared with the parentheses - it's a Java method after all).

@SparkQA
Copy link

SparkQA commented Nov 16, 2017

Test build #83927 has finished for PR 19751 at commit 8b150e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

doAsync {
val count = store.count(value.getClass())
list.foreach { t =>
if (count > t.threshold) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent two spaces

val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
val retainedTasks = conf.get(UI_RETAINED_TASKS)
val retainedStages = conf.getInt("spark.ui.retainedStages", 1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use hard code here? Maybe make the configurations in config.scala public, so that we don't need to write the default values in two places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is being removed in a separate PR.


private def update(entity: LiveEntity, now: Long): Unit = {
entity.write(kvstore, now)
private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe change last to isLast ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer how the current version reads on the call site, e.g.:

update(exec, now, last = true)

Also, Spark generally avoids Java-beans-style prefixes in Scala code (like "is" or "get").

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe checkTriggers is better than last?

def write(store: KVStore, now: Long): Unit = {
store.write(doUpdate())
def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {
store.write(doUpdate(), checkTriggers || lastWriteTime == 0L)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you specify why does it check triggers on the first write?

@SparkQA
Copy link

SparkQA commented Nov 17, 2017

Test build #83957 has finished for PR 19751 at commit e09a376.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 17, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Nov 17, 2017

Test build #83975 has finished for PR 19751 at commit e09a376.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 17, 2017

Not retesting since there will probably be feedback and the failure seems unrelated, so better just wait.

@SparkQA
Copy link

SparkQA commented Nov 30, 2017

Test build #84318 has finished for PR 19751 at commit 3f7c25d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 9, 2017

Test build #84665 has finished for PR 19751 at commit 2606fcd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first pass comments, will go through again this afternoon

*/
private[spark] abstract class LiveEntity {

var lastWriteTime = 0L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: can the initial value be -1 instead? doesn't matter right now, but often in tests we use a manual clock starting at time 0. That would cause problems w/ this default.

* internal state to the store (e.g. after the SHS finishes parsing an event log).
*
* The configured triggers are run on the same thread that triggered the write, after the write
* has completed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are actually run in a different thread, right? (comment on addTrigger looks correct)

if (checkTriggers && !stopped) {
triggers.get(value.getClass()).foreach { list =>
doAsync {
val count = store.count(value.getClass())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate that this is generic, but isn't this significantly more expensive than just having a special internal variable to track this for each class as you update? I'm imaging a job with tons of very quick tasks.

You could also pull the store.count() out of foreach in case there ever were multiple triggers associated with a class (though I guess there aren't right now).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok on another read, I see that tasks are actually tracked specially and don't use this trigger mechanism. Though, still this api isn't really that useful in the end -- its good for jobs and stages, but not actually the right count for executors, and you don't use it for tasks. still might be easier to just track those other counts directly, without going through kvstore.count()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kvstore.count is pretty cheap in-memory (it's basically a hash table lookup), and cheap enough on a disk store (its cost is dwarfed by the writes that actually trigger the call).

So while I do agree that the interface is not optimal, of the 4 call sites, it handles 3 without need for any special code, and the 4th (cleanupExecutors) would still need to call kvstore.count() or keep that count separately, so this sounds simpler.

// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
val dead = count - activeExecutorCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KVStore has this:

  /**
   * Returns the number of items of the given type which match the given indexed value.
   */
  long count(Class<?> type, String index, Object indexedValue) throws Exception;

so with an api change you could get the right number directly from the store. (though this conflicts with my other comment about not using kvstore.count() at all in the trigger, which I think is more important.)

}

/** Turns a KVStoreView into a Scala sequence, applying a filter. */
def viewToSeq[T](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you don't actually need a sequence at all in any of the call sites, you could just use an iterator. I'm thinking about the price of that say if you're cleaning up 100k tasks repeatedly.

This does give you a nice spot to include iter.close(), but I think you could change this to foreachWithMaxFilterClose or something to avoid ever creating the list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create a list explicitly to avoid consistency issues when deleting these elements. If I had an iterator instead, and I then called kvstore.delete, you could get a ConcurrentModificationException.

Since the cleanup code deletes more than necessary to just respect the limit (to avoid having to do this every time you write something), hopefully the cost is amortized a little.

// On live applications, try to delete finished tasks only; when in the SHS, treat all
// tasks as the same.
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t =>
!live || t.info.status != TaskState.RUNNING.toString()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the SHS, wouldn't you still prefer to delete finished tasks over live ones? in all cases, should you really just try to delete finished tasks first, but still delete running tasks if need be?

I don't see any filter like this in the old code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code deletes tasks in the order they arrive; it would be expensive to do that here since it would involve sorting the task list (cheap for disk store, expensive for in-memory).

I can keep the same filter behavior for both.

}

// Start 3 stages, all should be kept. Stop 2 of them, the oldest stopped one should be
// deleted. Start a new attempt of the second stopped one, and verify that the stage graph
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oldest meaning smallest id? or ordered they are submitted? with a non-linear stage DAG, the ordering of ids, start-time, & end-time can be ordered arbitrarily. Ids will correspond to submission order, but then stage retries complicates that.

I guess I'm just trying to make sure I understand how the kvstore works, and if there is some important part I'm missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no DAG here, the test controls what "oldest" means. In this case "oldest" = "first stage in the list", which is also "smallest id", which is the actual behavior of the listener.


if (needReplay) {
val trackingStore = new ElementTrackingStore(kvstore, conf)
val listener = if (needReplay) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listener is unused

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok after anohter pass, overall I think this is good. the only somewhat important comment I have is about the filtering done on tasks for cleanup, how its different from before, and why its different live vs. shs

@SparkQA
Copy link

SparkQA commented Dec 14, 2017

Test build #84885 has finished for PR 19751 at commit b02ea2c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 14, 2017

Test build #84909 has finished for PR 19751 at commit b02ea2c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

case _ =>
(new InMemoryStore(), true)
Copy link
Contributor

@squito squito Dec 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this change, comment is really for val _replay = !path.isDirectory() but github won't let me put comment there ...

I know we discussed this when you made this change, but still I was confused reading this bit of code on replay. Maybe could you just add a comment above that line like "the kvstore is deleted when we decide that the loaded data is stale -- see LoadedAppUI for a more extensive discussion of the lifecycle".

doesn't seem worth a separate jira / pr just for that comment

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@SparkQA
Copy link

SparkQA commented Dec 15, 2017

Test build #84931 has finished for PR 19751 at commit d384ff4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

LGTM


kvstore.onFlush {
if (!live) {
flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, why only flush for history server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the store is only closed on live applications when the context is shut down. So there's no more UI for you to see this.


import config._

private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use a mutable map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Man, this has been pushed... I'd appreciate reviews more before they are pushed.

executor.shutdownNow()
}

flushTriggers.foreach { trigger =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush sounds like we would do it periodicly, how about closeTriggers?

store.write(doUpdate())
def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {
// Always check triggers on the first write, since adding an element to the store may
// cause the maximum count for the element type to be exceeded.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, for the first write, how can it trigger maximum count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple jobs, multiple stages, multiple tasks, etc, etc, etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants