Skip to content

Commit 1fe7710

Browse files
committed
Link the spark UI to RM ui in yarn-client mode
1 parent 6555618 commit 1fe7710

File tree

7 files changed

+52
-7
lines changed

7 files changed

+52
-7
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,6 @@ private[spark] object CoarseGrainedClusterMessages {
6666

6767
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
6868

69+
case class AddWebUIFilter(filter: String, proxyBase :String) extends CoarseGrainedClusterMessage
70+
6971
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
136136
removeExecutor(executorId, reason)
137137
sender ! true
138138

139+
case AddWebUIFilter(filter, proxyBase) =>
140+
addWebUIFilter(filter, proxyBase)
141+
sender ! true
139142
case DisassociatedEvent(_, address, _) =>
140143
addressToExecutorId.get(address).foreach(removeExecutor(_,
141144
"remote Akka client disassociated"))

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ private[spark] object JettyUtils extends Logging {
139139

140140
/** Add filters, if any, to the given list of ServletContextHandlers */
141141
def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
142-
val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
142+
val filters: Array[String] = sys.props.get("spark.ui.filters").
143+
getOrElse("").split(',').map(_.trim())
143144
filters.foreach {
144145
case filter : String =>
145146
if (!filter.isEmpty) {
@@ -148,7 +149,7 @@ private[spark] object JettyUtils extends Logging {
148149
holder.setClassName(filter)
149150
// Get any parameters for each filter
150151
val paramName = "spark." + filter + ".params"
151-
val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
152+
val params = sys.props.get(paramName).getOrElse("").split(',').map(_.trim()).toSet
152153
params.foreach {
153154
case param : String =>
154155
if (!param.isEmpty) {

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,16 @@ private[spark] object UIUtils extends Logging {
136136
}
137137

138138
// Yarn has to go through a proxy so the base uri is provided and has to be on all links
139-
val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
139+
def uiRoot: String = {
140+
if (System.getenv("APPLICATION_WEB_PROXY_BASE") != null) {
141+
System.getenv("APPLICATION_WEB_PROXY_BASE")
142+
} else if (System.getProperty("spark.ui.proxyBase") != null) {
143+
System.getProperty("spark.ui.proxyBase")
144+
}
145+
else {
146+
""
147+
}
148+
}
140149

141150
def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
142151

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private[spark] abstract class WebUI(
4343
extends Logging {
4444

4545
protected val tabs = ArrayBuffer[WebUITab]()
46-
protected val handlers = ArrayBuffer[ServletContextHandler]()
46+
protected[spark] val handlers = ArrayBuffer[ServletContextHandler]()
4747
protected var serverInfo: Option[ServerInfo] = None
4848
protected val localHostName = Utils.localHostName()
4949
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler.cluster
1919

2020
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
21+
import org.apache.spark.ui.JettyUtils
2122
import org.apache.spark.{SparkException, Logging, SparkContext}
2223
import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher}
2324
import org.apache.spark.scheduler.TaskSchedulerImpl
@@ -48,6 +49,7 @@ private[spark] class YarnClientSchedulerBackend(
4849
val driverHost = conf.get("spark.driver.host")
4950
val driverPort = conf.get("spark.driver.port")
5051
val hostport = driverHost + ":" + driverPort
52+
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
5153

5254
val argsArrayBuf = new ArrayBuffer[String]()
5355
argsArrayBuf += (
@@ -115,4 +117,15 @@ private[spark] class YarnClientSchedulerBackend(
115117
logInfo("Stopped")
116118
}
117119

120+
override def addWebUIFilter(filter: String, proxyBase: String) {
121+
if (filter != null && filter.nonEmpty && proxyBase != null && proxyBase.nonEmpty) {
122+
logInfo(s"Add WebUI Filter. $filter")
123+
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
124+
System.setProperty("spark.ui.filters", amFilter)
125+
System.setProperty(s"spark.$amFilter.params", filter)
126+
System.setProperty("spark.ui.proxyBase", proxyBase)
127+
JettyUtils.addFilters(sc.ui.handlers, conf)
128+
}
129+
}
130+
118131
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ import akka.actor.Terminated
3131
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3232
import org.apache.spark.util.{Utils, AkkaUtils}
3333
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
34+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
3435
import org.apache.spark.scheduler.SplitInfo
3536
import org.apache.hadoop.yarn.client.api.AMRMClient
3637
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
3738
import org.apache.spark.deploy.SparkHadoopUtil
39+
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
3840

3941
/**
4042
* An application master that allocates executors on behalf of a driver that is running outside
@@ -82,6 +84,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
8284
case x: DisassociatedEvent =>
8385
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
8486
driverClosed = true
87+
case x: AddWebUIFilter =>
88+
logInfo(s"Add WebUI Filter. $x")
89+
driver ! x
8590
}
8691
}
8792

@@ -99,6 +104,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
99104
registerApplicationMaster()
100105

101106
waitForSparkMaster()
107+
// setup AmIpFilter for the SparkUI - do this before we start the UI
108+
addAmIpFilter()
102109

103110
// Allocate all containers
104111
allocateExecutors()
@@ -142,9 +149,19 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
142149
}
143150

144151
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
145-
logInfo("Registering the ApplicationMaster")
146-
// TODO: Find out client's Spark UI address and fill in here?
147-
amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
152+
val appUIAddress = sparkConf.getOption("spark.driver.appUIAddress").getOrElse("")
153+
logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
154+
amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress)
155+
}
156+
157+
// add the yarn amIpFilter that Yarn requires for properly securing the UI
158+
private def addAmIpFilter() {
159+
val proxy = WebAppUtils.getProxyHostAndPort(conf)
160+
val parts : Array[String] = proxy.split(":")
161+
val proxyBase =System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
162+
val uriBase = "http://" + proxy + proxyBase
163+
val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
164+
actor ! AddWebUIFilter(amFilter, proxyBase)
148165
}
149166

150167
private def waitForSparkMaster() {

0 commit comments

Comments
 (0)