Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2350,7 +2350,8 @@ class SparkContext(config: SparkConf) extends Logging {
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls,
schedulerBackend.getMasterWebUiUrl))
}

/** Post the application end event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ private[deploy] object DeployMessages {

// Master to AppClient

case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
case class RegisteredApplication(appId: String, master: RpcEndpointRef,
masterWebUiUrl: String) extends DeployMessage

// TODO(matei): replace hostPort with host
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private[spark] class StandaloneAppClient(
}

override def receive: PartialFunction[Any, Unit] = {
case RegisteredApplication(appId_, masterRef) =>
case RegisteredApplication(appId_, masterRef, masterWebUiUrl) =>
// FIXME How to handle the following cases?
// 1. A master receives multiple registrations and sends back multiple
// RegisteredApplications due to an unstable network.
Expand All @@ -163,6 +163,7 @@ private[spark] class StandaloneAppClient(
registered.set(true)
master = Some(masterRef)
listener.connected(appId.get)
listener.masterChanged(masterWebUiUrl)

case ApplicationRemoved(message) =>
markDead("Master removed our application: %s".format(message))
Expand All @@ -185,6 +186,7 @@ private[spark] class StandaloneAppClient(
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
master = Some(masterRef)
listener.masterChanged(masterWebUiUrl)
alreadyDisconnected = false
masterRef.send(MasterChangeAcknowledged(appId.get))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ private[spark] trait StandaloneAppClientListener {

def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit

def masterChanged(masterWebUiUrl: String): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private[deploy] class Master(
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
driver.send(RegisteredApplication(app.id, self, masterWebUiUrl))
schedule()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ private[spark] trait SchedulerBackend {
*/
def getDriverLogUrls: Option[Map[String, String]] = None

def getMasterWebUiUrl: Option[String] = None

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ case class SparkListenerApplicationStart(
time: Long,
sparkUser: String,
appAttemptId: Option[String],
driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent
driverLogs: Option[Map[String, String]] = None,
masterWebUiUrl: Option[String] = None) extends SparkListenerEvent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this? It seems the standalone registration messages already take care of this.

If you want this, are you intentionally not saving this in event logs? The way it is, this is only ever useful when the application is starting but hasn't yet registered with the master. That's a very short period of time and probably not worth it.

If you add code to save to event logs, I'd rename this to be more cluster manager-agnostic. e.g. "clusterWebUiUrl".


@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ private[spark] class StandaloneSchedulerBackend(
removeExecutor(fullId.split("/")(1), reason)
}

var masterWebUiUrl: Option[String] = None

override def masterChanged(webUiUrl: String): Unit = {
masterWebUiUrl = Some(webUiUrl)
}

override def getMasterWebUiUrl(): Option[String] = {
masterWebUiUrl
}

override def sufficientResourcesRegistered(): Boolean = {
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ private[ui] class ExecutorsPage(
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
val masterWebUiUrl = listener.masterWebUiUrl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this information makes way more sense in the "Environment" tab.

val backLink = masterWebUiUrl.map(
link => <p><a href={link}>Back to Master</a></p>
).getOrElse(Seq.empty)
val content =
<div>
{
Expand All @@ -60,7 +64,7 @@ private[ui] class ExecutorsPage(
}
</div>;

UIUtils.headerSparkPage("Executors", content, parent, useDataTables = true)
UIUtils.headerSparkPage("Executors", backLink++content, parent, useDataTables = true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ private[ui] case class ExecutorTaskSummary(
@DeveloperApi
class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf)
extends SparkListener {
var masterWebUiUrl: Option[String] = None

var executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]()
var executorEvents = new ListBuffer[SparkListenerEvent]()

Expand Down Expand Up @@ -112,6 +114,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
taskSummary.executorLogs = logs.toMap
}
}
masterWebUiUrl = applicationStart.masterWebUiUrl
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ class AppClientSuite
deadReasonList.add(reason)
}

def masterChanged(masterWebUiUrl: String): Unit = { }

def executorAdded(
id: String,
workerId: String,
Expand Down
9 changes: 8 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ object MimaExcludes {
// [SPARK-14743] Improve delegation token handling in secure cluster
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal"),
// [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references")
) ++
Seq(
// [SPARK-16853][SQL] Fixes encoder error in DataSet typed select
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select"),
// [SPARK-16856] [WEBUI] [CORE] Link the application's executor page to the master's UI
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.apply"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.this"),
// [SPARK-16967] Move Mesos to Module
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"),
// [SPARK-16240] ML persistence backward compatibility for LDA
Expand Down