diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8052499ab752..f44e5029a8b9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -185,6 +185,8 @@ class SparkContext(config: SparkConf) extends Logging { val master = conf.get("spark.master") val appName = conf.get("spark.app.name") + val appUniqueName = Utils.generateUniqueName(appName.replaceAll("[ :/]", "-").toLowerCase) + conf.set("spark.app.uniqueName", appUniqueName) // Generate the random name for a temp folder in Tachyon // Add a timestamp as the suffix here to make it more safe @@ -247,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging { // Optionally log Spark events private[spark] val eventLogger: Option[EventLoggingListener] = { if (conf.getBoolean("spark.eventLog.enabled", false)) { - val logger = new EventLoggingListener(appName, conf, hadoopConfiguration) + val logger = new EventLoggingListener(appUniqueName, conf, hadoopConfiguration) logger.start() listenerBus.addListener(logger) Some(logger) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 21f8667819c4..e4e879ec1053 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -54,6 +54,8 @@ private[spark] class Master( import context.dispatcher // to use Akka's scheduler.schedule() val conf = new SparkConf + conf.set("spark.app.name", "Master") + conf.set("spark.app.uniqueName", Utils.generateUniqueName(conf.get("spark.app.name"))) def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000 diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ce425443051b..f589e940144d 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -57,6 +57,9 @@ private[spark] class Worker( Utils.checkHost(host, "Expected hostname") assert (port > 0) + conf.set("spark.app.name", "Worker") + conf.set("spark.app.uniqueName", Utils.generateUniqueName(conf.get("spark.app.name"))) + def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs // Send a heartbeat every (heartbeat timeout) / 4 milliseconds diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 651511da1b7f..3531ade64e72 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -132,8 +132,9 @@ private[spark] class MetricsSystem private (val instance: String, if (null != classPath) { try { val sink = Class.forName(classPath) - .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) - .newInstance(kv._2, registry, securityMgr) + .getConstructor(classOf[Properties], classOf[MetricRegistry], + classOf[SecurityManager], classOf[SparkConf]) + .newInstance(kv._2, registry, securityMgr, conf) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 05852f1f9899..d37ffbf29540 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -22,11 +22,11 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.{ConsoleReporter, MetricRegistry} -import org.apache.spark.SecurityManager +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.metrics.MetricsSystem private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { + securityMgr: SecurityManager, conf: SparkConf) extends Sink { val CONSOLE_DEFAULT_PERIOD = 10 val CONSOLE_DEFAULT_UNIT = "SECONDS" diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 542dce65366b..f22d0a881a07 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -23,11 +23,11 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.{CsvReporter, MetricRegistry} -import org.apache.spark.SecurityManager +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.metrics.MetricsSystem private[spark] class CsvSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { + securityMgr: SecurityManager, conf: SparkConf) extends Sink with Logging { val CSV_KEY_PERIOD = "period" val CSV_KEY_UNIT = "unit" val CSV_KEY_DIR = "directory" @@ -48,16 +48,19 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match { - case Some(s) => s - case None => CSV_DEFAULT_DIR - } + val pollDir = Option(property.getProperty(CSV_KEY_DIR)).getOrElse(CSV_DEFAULT_DIR) + + conf.get("spark.app.uniqueName") + + val file= new File(pollDir) + file.mkdirs + + logInfo("Dumping csv metrics to " + pollDir) val reporter: CsvReporter = CsvReporter.forRegistry(registry) .formatFor(Locale.US) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) - .build(new File(pollDir)) + .build(file) override def start() { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index aeb4ad44a064..a8c2df77077a 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -24,11 +24,11 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.MetricRegistry import com.codahale.metrics.graphite.{Graphite, GraphiteReporter} -import org.apache.spark.SecurityManager +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.metrics.MetricsSystem private[spark] class GraphiteSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { + securityMgr: SecurityManager, conf: SparkConf) extends Sink { val GRAPHITE_DEFAULT_PERIOD = 10 val GRAPHITE_DEFAULT_UNIT = "SECONDS" val GRAPHITE_DEFAULT_PREFIX = "" diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index ed27234b4e76..059cf2ad9df9 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -20,10 +20,11 @@ package org.apache.spark.metrics.sink import java.util.Properties import com.codahale.metrics.{JmxReporter, MetricRegistry} -import org.apache.spark.SecurityManager + +import org.apache.spark.{SecurityManager, SparkConf} private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { + securityMgr: SecurityManager, conf: SparkConf) extends Sink { val reporter: JmxReporter = JmxReporter.forRegistry(registry).build() diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 571539ba5e46..2464454cfaf4 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -27,11 +27,11 @@ import com.codahale.metrics.json.MetricsModule import com.fasterxml.jackson.databind.ObjectMapper import org.eclipse.jetty.servlet.ServletContextHandler -import org.apache.spark.SecurityManager +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.ui.JettyUtils._ private[spark] class MetricsServlet(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { + securityMgr: SecurityManager, conf: SparkConf) extends Sink { val SERVLET_KEY_PATH = "path" val SERVLET_KEY_SAMPLE = "sample" diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index ae6ca9f4e7bf..29068cfec42f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.{FileLogger, JsonProtocol} * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams */ private[spark] class EventLoggingListener( - appName: String, + appUniqueName: String, sparkConf: SparkConf, hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration()) extends SparkListener with Logging { @@ -54,7 +54,7 @@ private[spark] class EventLoggingListener( private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") - private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis + private val name = appUniqueName val logDir = logBaseDir + "/" + name protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5784e974fbb6..95ede8708905 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1306,4 +1306,9 @@ private[spark] object Utils extends Logging { s"$className: $desc\n$st" } + /** Generate an unique name from the input string by appending an unique identifier. */ + def generateUniqueName(name: String): String = { + name + "-" + SystemClock.getTime() + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 21e3db34b8b7..510e17579b6e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -166,7 +166,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { // Verify logging directory exists val conf = getLoggingConf(logDirPath, compressionCodec) - val eventLogger = new EventLoggingListener("test", conf) + val eventLogger = new EventLoggingListener(Utils.generateUniqueName("test"), conf) eventLogger.start() val logPath = new Path(eventLogger.logDir) assert(fileSystem.exists(logPath)) @@ -206,7 +206,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { // Verify that all information is correctly parsed before stop() val conf = getLoggingConf(logDirPath, compressionCodec) - val eventLogger = new EventLoggingListener("test", conf) + val eventLogger = new EventLoggingListener(Utils.generateUniqueName("test"), conf) eventLogger.start() var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) assertInfoCorrect(eventLoggingInfo, loggerStopped = false) @@ -225,7 +225,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { */ private def testEventLogging(compressionCodec: Option[String] = None) { val conf = getLoggingConf(logDirPath, compressionCodec) - val eventLogger = new EventLoggingListener("test", conf) + val eventLogger = new EventLoggingListener(Utils.generateUniqueName("test"), conf) val listenerBus = new LiveListenerBus val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") val applicationEnd = SparkListenerApplicationEnd(1000L) diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index d03d7774e8c8..b57ee9fa7bb8 100644 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -25,11 +25,11 @@ import com.codahale.metrics.ganglia.GangliaReporter import info.ganglia.gmetric4j.gmetric.GMetric import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode -import org.apache.spark.SecurityManager +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.metrics.MetricsSystem class GangliaSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { + securityMgr: SecurityManager, conf: SparkConf) extends Sink { val GANGLIA_KEY_PERIOD = "period" val GANGLIA_DEFAULT_PERIOD = 10