Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.status.{AppStatusPlugin, AppStatusStore}
import org.apache.spark.status.AppStatusStore
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
Expand Down Expand Up @@ -416,7 +416,8 @@ class SparkContext(config: SparkConf) extends Logging {

// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
_statusStore = AppStatusStore.createLiveStore(conf, l => listenerBus.addToStatusQueue(l))
_statusStore = AppStatusStore.createLiveStore(conf)
listenerBus.addToStatusQueue(_statusStore.listener.get)

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
Expand Down Expand Up @@ -445,14 +446,9 @@ class SparkContext(config: SparkConf) extends Logging {
// For tests, do not enable the UI
None
}
_ui.foreach { ui =>
// Load any plugins that might want to modify the UI.
AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))

// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
ui.bind()
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())

_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.status._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.kvstore._
Expand Down Expand Up @@ -322,15 +321,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
(new InMemoryStore(), true)
}

val plugins = ServiceLoader.load(
classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala
val trackingStore = new ElementTrackingStore(kvstore, conf)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to limit the UI data for history server? cc @vanzin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, both because it's the old behavior, and to limit the app's history data growth. Also because the UI code itself doesn't scale to arbitrarily large lists of things like jobs and stages.

if (needReplay) {
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(trackingStore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(listener)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false)
}
for {
plugin <- plugins
listener <- plugin.createListeners(conf, trackingStore)
} replayBus.addListener(listener)
try {
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
Expand All @@ -353,9 +355,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
attempt.info.startTime.getTime(),
attempt.info.appSparkVersion)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupUI(ui)
}
plugins.foreach(_.setupUI(ui))

val loadedUI = LoadedAppUI(ui)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.ui.SparkUI

/**
* An interface for creating history listeners(to replay event logs) defined in other modules like
* SQL, and setup the UI of the plugin to rebuild the history UI.
*/
private[spark] trait AppHistoryServerPlugin {
/**
* Creates listeners to replay the event logs.
*/
def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener]

/**
* Sets up UI of this plugin to rebuild the history UI.
*/
def setupUI(ui: SparkUI): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] class AppStatusListener(

import config._

private var sparkVersion = SPARK_VERSION
private val sparkVersion = SPARK_VERSION
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is a bug; the version should be read from SparkListenerLogStart when it's in the event log. Feel free to file a separate bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private var appInfo: v1.ApplicationInfo = null
private var appSummary = new AppSummary(0, 0)
private var coresPerTask: Int = 1
Expand Down
71 changes: 0 additions & 71 deletions core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala

This file was deleted.

17 changes: 4 additions & 13 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@

package org.apache.spark.status

import java.io.File
import java.util.{Arrays, List => JList}
import java.util.{List => JList}

import scala.collection.JavaConverters._

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.status.api.v1
import org.apache.spark.ui.scope._
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.util.Distribution
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}

/**
* A wrapper around a KVStore that provides methods for accessing the API data stored within.
*/
private[spark] class AppStatusStore(
val store: KVStore,
listener: Option[AppStatusListener] = None) {
val listener: Option[AppStatusListener] = None) {

def applicationInfo(): v1.ApplicationInfo = {
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
Expand Down Expand Up @@ -346,17 +344,10 @@ private[spark] object AppStatusStore {

/**
* Create an in-memory store for a live application.
*
* @param conf Configuration.
* @param addListenerFn Function to register a listener with a bus.
*/
def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = {
def createLiveStore(conf: SparkConf): AppStatusStore = {
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true)
addListenerFn(listener)
AppStatusPlugin.loadPlugins().foreach { p =>
p.setupListeners(conf, store, addListenerFn, true)
}
new AppStatusStore(store, listener = Some(listener))
}

Expand Down
20 changes: 9 additions & 11 deletions core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.mockito.Matchers.anyString
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}

import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStatusStore
import org.apache.spark.ui.jobs.{StagePage, StagesTab}
import org.apache.spark.util.Utils

class StagePageSuite extends SparkFunSuite with LocalSparkContext {

Expand All @@ -55,20 +53,20 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
* This also runs a dummy stage to populate the page with useful content.
*/
private def renderStagePage(conf: SparkConf): Seq[Node] = {
val bus = new ReplayListenerBus()
val store = AppStatusStore.createLiveStore(conf, l => bus.addListener(l))
val statusStore = AppStatusStore.createLiveStore(conf)
val listener = statusStore.listener.get

try {
val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS)
when(tab.store).thenReturn(store)
when(tab.store).thenReturn(statusStore)

val request = mock(classOf[HttpServletRequest])
when(tab.conf).thenReturn(conf)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
when(request.getParameter("id")).thenReturn("0")
when(request.getParameter("attempt")).thenReturn("0")
val page = new StagePage(tab, store)
val page = new StagePage(tab, statusStore)

// Simulate a stage in job progress listener
val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details")
Expand All @@ -77,17 +75,17 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
taskId =>
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY,
false)
bus.postToAll(SparkListenerStageSubmitted(stageInfo))
bus.postToAll(SparkListenerTaskStart(0, 0, taskInfo))
listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis())
val taskMetrics = TaskMetrics.empty
taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
bus.postToAll(SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics))
listener.onTaskEnd(SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics))
}
bus.postToAll(SparkListenerStageCompleted(stageInfo))
listener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
page.render(request)
} finally {
store.close()
statusStore.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.execution.ui.SQLHistoryServerPlugin

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,11 @@ import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.kvstore.KVStore

private[sql] class SQLAppStatusListener(
class SQLAppStatusListener(
conf: SparkConf,
kvstore: ElementTrackingStore,
live: Boolean,
ui: Option[SparkUI] = None)
extends SparkListener with Logging {
live: Boolean) extends SparkListener with Logging {

// How often to flush intermediate state of a live execution to the store. When replaying logs,
// never flush (only do the very last write).
Expand All @@ -50,7 +46,10 @@ private[sql] class SQLAppStatusListener(
private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()

private var uiInitialized = false
// Returns true if this listener has no live data. Exposed for tests only.
private[sql] def noLiveData(): Boolean = {
liveExecutions.isEmpty && stageMetrics.isEmpty
}

kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get(UI_RETAINED_EXECUTIONS)) { count =>
cleanupExecutions(count)
Expand Down Expand Up @@ -230,14 +229,6 @@ private[sql] class SQLAppStatusListener(
}

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
// Install the SQL tab in a live app if it hasn't been initialized yet.
if (!uiInitialized) {
ui.foreach { _ui =>
new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
}
uiInitialized = true
}

val SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) = event

Expand Down Expand Up @@ -389,7 +380,7 @@ private class LiveStageMetrics(
val accumulatorIds: Array[Long],
val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])

private[sql] class LiveTaskMetrics(
private class LiveTaskMetrics(
val ids: Array[Long],
val values: Array[Long],
val succeeded: Boolean)
Loading