Skip to content

Commit efcb6e1

Browse files
committed
Modified to add application id to metrics name
1 parent 2ec848a commit efcb6e1

File tree

9 files changed

+38
-19
lines changed

9 files changed

+38
-19
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,6 @@ class SparkContext(config: SparkConf) extends Logging {
186186

187187
val master = conf.get("spark.master")
188188
val appName = conf.get("spark.app.name")
189-
val uniqueAppName = appName + "-" + System.currentTimeMillis()
190-
conf.set("spark.unique.app.name", uniqueAppName)
191189

192190
// Generate the random name for a temp folder in Tachyon
193191
// Add a timestamp as the suffix here to make it more safe
@@ -312,6 +310,14 @@ class SparkContext(config: SparkConf) extends Logging {
312310
// constructor
313311
taskScheduler.start()
314312

313+
val appId = taskScheduler.applicationId().getOrElse(System.currentTimeMillis().toString)
314+
conf.set("spark.app.id", appId)
315+
316+
val metricsSystem = env.metricsSystem
317+
metricsSystem.registerSources()
318+
metricsSystem.registerSinks()
319+
metricsSystem.start()
320+
315321
private[spark] val cleaner: Option[ContextCleaner] = {
316322
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
317323
Some(new ContextCleaner(this))

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,13 @@ object SparkEnv extends Logging {
261261
val metricsSystem = if (isDriver) {
262262
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
263263
} else {
264-
MetricsSystem.createMetricsSystem("executor", conf, securityManager)
264+
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
265+
ms.registerSources()
266+
ms.registerSinks()
267+
ms.start()
268+
ms
265269
}
266-
metricsSystem.start()
270+
267271

268272
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
269273
// this is a temporary directory; in distributed mode, this is the executor's current working

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
106106
executorId: String,
107107
hostname: String,
108108
cores: Int,
109-
workerUrl: Option[String]) {
109+
workerUrl: Option[String],
110+
appId: String) {
110111

111112
SignalLogger.register(log)
112113

@@ -122,7 +123,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
122123
val driver = fetcher.actorSelection(driverUrl)
123124
val timeout = AkkaUtils.askTimeout(executorConf)
124125
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
125-
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]]
126+
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++
127+
Seq[(String, String)](("spark.app.id", appId))
126128
fetcher.shutdown()
127129

128130
// Create a new ActorSystem using driver's Spark properties to run the backend.
@@ -144,16 +146,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
144146

145147
def main(args: Array[String]) {
146148
args.length match {
147-
case x if x < 4 =>
149+
case x if x < 5 =>
148150
System.err.println(
149151
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
150152
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
151153
"<cores> [<workerUrl>]")
152154
System.exit(1)
153-
case 4 =>
154-
run(args(0), args(1), args(2), args(3).toInt, None)
155-
case x if x > 4 =>
156-
run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
155+
case 5 =>
156+
run(args(0), args(1), args(2), args(3).toInt, None, args(4))
157+
case x if x > 5 =>
158+
run(args(0), args(1), args(2), args(3).toInt, Some(args(4)), args(5))
157159
}
158160
}
159161
}

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ private[spark] class MetricsSystem private (val instance: String,
8080
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
8181

8282
metricsConfig.initialize()
83-
registerSources()
84-
registerSinks()
8583

8684
def start() {
8785
sinks.foreach(_.start)
@@ -96,11 +94,14 @@ private[spark] class MetricsSystem private (val instance: String,
9694
}
9795

9896
def buildRegistryName(source: Source) = {
99-
val appNameOpt = conf.getOption("spark.unique.app.name")
97+
val appNameOpt = conf.getOption("spark.app.name")
98+
val appIdOpt = conf.getOption("spark.app.id")
10099
val executorIdOpt = conf.getOption("spark.executor.id")
101100
val registryName = {
102-
if (appNameOpt.isDefined && executorIdOpt.isDefined) {
103-
MetricRegistry.name(appNameOpt.get, executorIdOpt.get, source.sourceName)
101+
if (appNameOpt.isDefined && appIdOpt.isDefined &&
102+
executorIdOpt.isDefined) {
103+
MetricRegistry.name(appIdOpt.get, appNameOpt.get,
104+
executorIdOpt.get, source.sourceName)
104105
} else {
105106
MetricRegistry.name(source.sourceName)
106107
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class ExecutorRunnable(
4747
hostname: String,
4848
executorMemory: Int,
4949
executorCores: Int,
50+
appAttemptId: String,
5051
securityMgr: SecurityManager)
5152
extends Runnable with ExecutorRunnableUtil with Logging {
5253

@@ -83,7 +84,7 @@ class ExecutorRunnable(
8384
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
8485

8586
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
86-
localResources)
87+
appAttemptId, localResources)
8788
logInfo("Setting up executor with commands: " + commands)
8889
ctx.setCommands(commands)
8990

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ trait ExecutorRunnableUtil extends Logging {
4444
hostname: String,
4545
executorMemory: Int,
4646
executorCores: Int,
47+
appId: String,
4748
localResources: HashMap[String, LocalResource]): List[String] = {
4849
// Extra options for the JVM
4950
val javaOpts = ListBuffer[String]()
@@ -112,6 +113,7 @@ trait ExecutorRunnableUtil extends Logging {
112113
slaveId.toString,
113114
hostname.toString,
114115
executorCores.toString,
116+
appId,
115117
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
116118
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
117119

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ object AllocationType extends Enumeration {
5656
private[yarn] abstract class YarnAllocator(
5757
conf: Configuration,
5858
sparkConf: SparkConf,
59+
appAttemptId: ApplicationAttemptId,
5960
args: ApplicationMasterArguments,
6061
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
6162
securityMgr: SecurityManager)
@@ -295,6 +296,7 @@ private[yarn] abstract class YarnAllocator(
295296
executorHostname,
296297
executorMemory,
297298
executorCores,
299+
appAttemptId.getApplicationId.toString,
298300
securityMgr)
299301
launcherPool.execute(executorRunnable)
300302
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class ExecutorRunnable(
4747
hostname: String,
4848
executorMemory: Int,
4949
executorCores: Int,
50+
appId: String,
5051
securityMgr: SecurityManager)
5152
extends Runnable with ExecutorRunnableUtil with Logging {
5253

@@ -80,7 +81,7 @@ class ExecutorRunnable(
8081
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
8182

8283
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
83-
localResources)
84+
appId, localResources)
8485

8586
logInfo(s"Setting up executor with environment: $env")
8687
logInfo("Setting up executor with commands: " + commands)

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ private[yarn] class YarnAllocationHandler(
4141
args: ApplicationMasterArguments,
4242
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
4343
securityMgr: SecurityManager)
44-
extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
44+
extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) {
4545

4646
override protected def releaseContainer(container: Container) = {
4747
amClient.releaseAssignedContainer(container.getId())

0 commit comments

Comments
 (0)