Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
56cc7fb
First cut implementation of Streaming UI.
tdas Mar 28, 2014
93f1c69
Added network receiver information to the Streaming UI.
tdas Mar 31, 2014
4d86e98
Added basic stats to the StreamingUI and refactored the UI to a Page …
tdas Apr 1, 2014
db27bad
Added last batch processing time to StreamingUI.
tdas Apr 1, 2014
aef4dd5
Added Apache licenses.
tdas Apr 1, 2014
7d57444
Refactoring the UI interface to add flexibility
andrewor14 Apr 2, 2014
cd000b0
Merge github.com:apache/spark into ui-refactor
andrewor14 Apr 3, 2014
a37ad4f
Comments, imports and formatting (minor)
andrewor14 Apr 3, 2014
ed25dfc
Generalize SparkUI header to display tabs dynamically
andrewor14 Apr 3, 2014
53be2c5
Minor style updates.
tdas Apr 3, 2014
61358e3
Merge remote-tracking branch 'apache-github/master' into streaming-we…
tdas Apr 3, 2014
9a48fa1
Allow adding tabs to SparkUI dynamically + add example
andrewor14 Apr 3, 2014
0d61ee8
Merge branch 'streaming-web-ui' of github.com:tdas/spark into ui-refa…
andrewor14 Apr 3, 2014
8f7323b
End of file new lines, indentation, and imports (minor)
andrewor14 Apr 3, 2014
c78c92d
Remove outdated comment
andrewor14 Apr 7, 2014
3e986f8
Merge remote-tracking branch 'apache/master' into streaming-web-ui
tdas Apr 10, 2014
168fe86
Merge pull request #2 from andrewor14/ui-refactor
tdas Apr 10, 2014
827e81a
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streami…
tdas Apr 10, 2014
1af239b
Changed streaming UI to attach itself as a tab with the Spark UI.
tdas Apr 10, 2014
1c0bcef
Refactored streaming UI into two files.
tdas Apr 10, 2014
fa760fe
Fixed long line.
tdas Apr 10, 2014
ee6543f
Minor changes based on Andrew's comments.
tdas Apr 10, 2014
6de06b0
Merge remote-tracking branch 'apache/master' into streaming-web-ui
tdas Apr 10, 2014
548c98c
Wide refactoring of WebUI, UITab, and UIPage (see commit message)
andrewor14 Apr 11, 2014
914b8ff
Moved utils functions to UIUtils.
tdas Apr 11, 2014
585cd65
Merge pull request #5 from andrewor14/ui-refactor
tdas Apr 11, 2014
caa5e05
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streami…
tdas Apr 11, 2014
f8e1053
Added Spark and Streaming UI unit tests.
tdas Apr 11, 2014
aa396d4
Rename tabs and pages (No more IndexPage.scala)
andrewor14 Apr 11, 2014
2fc09c8
Added binary check exclusions
tdas Apr 11, 2014
72fe256
Merge pull request #6 from andrewor14/ui-refactor
tdas Apr 11, 2014
89dae36
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streami…
tdas Apr 11, 2014
90feb8d
Address Patrick's comments
andrewor14 Apr 11, 2014
125a054
Disable serving static resources with gzip
andrewor14 Apr 11, 2014
e038b4b
Addressed Patrick's comments.
tdas Apr 11, 2014
252c566
Merge pull request #8 from andrewor14/ui-refactor
tdas Apr 11, 2014
34bb364
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streami…
tdas Apr 11, 2014
f4f4cbe
More minor fixes.
tdas Apr 11, 2014
eb30517
Merge github.com:apache/spark into ui-refactor
andrewor14 Apr 12, 2014
642dd88
Merge SparkUISuite.scala into UISuite.scala
andrewor14 Apr 12, 2014
fc73ca5
Merge pull request #9 from andrewor14/ui-refactor
tdas Apr 12, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ class SparkContext(config: SparkConf) extends Logging {
// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
ui.start()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
Expand Down
50 changes: 0 additions & 50 deletions core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{UIUtils, WebUI}
import org.apache.spark.ui.{WebUIPage, UIUtils}

private[spark] class IndexPage(parent: HistoryServer) {
private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

def render(request: HttpServletRequest): Seq[Node] = {
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
Expand Down Expand Up @@ -62,13 +62,13 @@ private[spark] class IndexPage(parent: HistoryServer) {
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val appName = if (info.started) info.name else info.logDirPath.getName
val uiAddress = parent.getAddress + info.ui.basePath
val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started"
val endTime = if (info.completed) WebUI.formatDate(info.endTime) else "Not completed"
val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
val logDirectory = info.logDirPath.getName
val lastUpdated = WebUI.formatDate(info.lastUpdated)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{appName}</a></td>
<td>{startTime}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@

package org.apache.spark.deploy.history

import javax.servlet.http.HttpServletRequest

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, Path}
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkUIContainer
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.{WebUI, SparkUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils

Expand All @@ -46,17 +42,15 @@ import org.apache.spark.util.Utils
*/
class HistoryServer(
val baseLogDir: String,
securityManager: SecurityManager,
conf: SparkConf)
extends SparkUIContainer("History Server") with Logging {
extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {

import HistoryServer._

private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
private val localHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
private val port = WEB_UI_PORT
private val securityManager = new SecurityManager(conf)
private val indexPage = new IndexPage(this)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTime = -1L
Expand Down Expand Up @@ -90,37 +84,23 @@ class HistoryServer(
}
}

private val handlers = Seq[ServletContextHandler](
createStaticHandler(STATIC_RESOURCE_DIR, "/static"),
createServletHandler("/",
(request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
)

// A mapping of application ID to its history information, which includes the rendered UI
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()

initialize()

/**
* Start the history server.
* Initialize the history server.
*
* This starts a background thread that periodically synchronizes information displayed on
* this UI with the event logs in the provided base directory.
*/
def start() {
def initialize() {
attachPage(new HistoryPage(this))
attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
logCheckingThread.start()
}

/** Bind to the HTTP server behind this web interface. */
override def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
logError("Failed to bind HistoryServer", e)
System.exit(1)
}
}

/**
* Check for any updates to event logs in the base directory. This is only effective once
* the server has been bound.
Expand Down Expand Up @@ -151,7 +131,7 @@ class HistoryServer(
// Remove any applications that should no longer be retained
appIdToInfo.foreach { case (appId, info) =>
if (!retainedAppIds.contains(appId)) {
detachUI(info.ui)
detachSparkUI(info.ui)
appIdToInfo.remove(appId)
}
}
Expand Down Expand Up @@ -186,15 +166,14 @@ class HistoryServer(
val path = logDir.getPath
val appId = path.getName
val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
val ui = new SparkUI(replayBus, appId, "/history/" + appId)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
replayBus.replay()
if (appListener.applicationStarted) {
attachUI(ui)
attachSparkUI(ui)
val appName = appListener.appName
val sparkUser = appListener.sparkUser
val startTime = appListener.startTime
Expand All @@ -213,6 +192,18 @@ class HistoryServer(
fileSystem.close()
}

/** Attach a reconstructed UI to this server. Only valid after bind(). */
private def attachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
ui.getHandlers.foreach(attachHandler)
}

/** Detach a reconstructed UI from this server. Only valid after bind(). */
private def detachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs")
ui.getHandlers.foreach(detachHandler)
}

/** Return the address of this server. */
def getAddress: String = "http://" + publicHost + ":" + boundPort

Expand Down Expand Up @@ -262,9 +253,9 @@ object HistoryServer {

def main(argStrings: Array[String]) {
val args = new HistoryServerArguments(argStrings)
val server = new HistoryServer(args.logDir, conf)
val securityManager = new SecurityManager(conf)
val server = new HistoryServer(args.logDir, securityManager, conf)
server.bind()
server.start()

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ private[spark] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
appIdToUI.remove(a.id).foreach { ui => webUi.detachUI(ui) }
appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
Expand Down Expand Up @@ -667,12 +667,12 @@ private[spark] class Master(
if (!eventLogPaths.isEmpty) {
try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(replayBus, appName + " (completed)", "/history/" + app.id)
ui.start()
val ui = new SparkUI(
new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
replayBus.replay()
app.desc.appUiUrl = ui.basePath
appIdToUI(app.id) = ui
webUi.attachUI(ui)
webUi.attachSparkUI(ui)
return true
} catch {
case t: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
val timeout = parent.timeout
private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") {

private val master = parent.masterActorRef
private val timeout = parent.timeout

/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
Expand Down Expand Up @@ -96,7 +97,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}

def executorRow(executor: ExecutorInfo): Seq[Node] = {
private def executorRow(executor: ExecutorInfo): Seq[Node] = {
<tr>
<td>{executor.id}</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ import scala.xml.Node
import akka.pattern.ask
import org.json4s.JValue

import org.apache.spark.deploy.{JsonProtocol}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
val timeout = parent.timeout
private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private val master = parent.masterActorRef
private val timeout = parent.timeout

def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
Expand Down Expand Up @@ -139,7 +139,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
}

def workerRow(worker: WorkerInfo): Seq[Node] = {
private def workerRow(worker: WorkerInfo): Seq[Node] = {
<tr>
<td>
<a href={worker.webUiAddress}>{worker.id}</a>
Expand All @@ -154,8 +154,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</tr>
}


def appRow(app: ApplicationInfo): Seq[Node] = {
private def appRow(app: ApplicationInfo): Seq[Node] = {
<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
Expand All @@ -169,14 +168,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
<td>{WebUI.formatDate(app.submitDate)}</td>
<td>{UIUtils.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
<td>{app.state.toString}</td>
<td>{WebUI.formatDuration(app.duration)}</td>
<td>{UIUtils.formatDuration(app.duration)}</td>
</tr>
}

def driverRow(driver: DriverInfo): Seq[Node] = {
private def driverRow(driver: DriverInfo): Seq[Node] = {
<tr>
<td>{driver.id} </td>
<td>{driver.submitDate}</td>
Expand Down
Loading