Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ class SparkContext(config: SparkConf) extends Logging {

val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")
val appUniqueName = Utils.generateUniqueName(appName.replaceAll("[ :/]", "-").toLowerCase)
conf.set("spark.app.uniqueName", appUniqueName)

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
Expand Down Expand Up @@ -247,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging {
// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
val logger = new EventLoggingListener(appUniqueName, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ private[spark] class Master(
import context.dispatcher // to use Akka's scheduler.schedule()

val conf = new SparkConf
conf.set("spark.app.name", "Master")
conf.set("spark.app.uniqueName", Utils.generateUniqueName(conf.get("spark.app.name")))

def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ private[spark] class Worker(
Utils.checkHost(host, "Expected hostname")
assert (port > 0)

conf.set("spark.app.name", "Worker")
conf.set("spark.app.uniqueName", Utils.generateUniqueName(conf.get("spark.app.name")))

def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs

// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ private[spark] class MetricsSystem private (val instance: String,
if (null != classPath) {
try {
val sink = Class.forName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
.getConstructor(classOf[Properties], classOf[MetricRegistry],
classOf[SecurityManager], classOf[SparkConf])
.newInstance(kv._2, registry, securityMgr, conf)
if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import java.util.concurrent.TimeUnit

import com.codahale.metrics.{ConsoleReporter, MetricRegistry}

import org.apache.spark.SecurityManager
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.metrics.MetricsSystem

private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
securityMgr: SecurityManager, conf: SparkConf) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"

Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import java.util.concurrent.TimeUnit

import com.codahale.metrics.{CsvReporter, MetricRegistry}

import org.apache.spark.SecurityManager
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.metrics.MetricsSystem

private[spark] class CsvSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
securityMgr: SecurityManager, conf: SparkConf) extends Sink with Logging {
val CSV_KEY_PERIOD = "period"
val CSV_KEY_UNIT = "unit"
val CSV_KEY_DIR = "directory"
Expand All @@ -48,16 +48,19 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis

MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
case Some(s) => s
case None => CSV_DEFAULT_DIR
}
val pollDir = Option(property.getProperty(CSV_KEY_DIR)).getOrElse(CSV_DEFAULT_DIR) +
conf.get("spark.app.uniqueName")

val file= new File(pollDir)
file.mkdirs

logInfo("Dumping csv metrics to " + pollDir)

val reporter: CsvReporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir))
.build(file)

override def start() {
reporter.start(pollPeriod, pollUnit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}

import org.apache.spark.SecurityManager
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.metrics.MetricsSystem

private[spark] class GraphiteSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
securityMgr: SecurityManager, conf: SparkConf) extends Sink {
val GRAPHITE_DEFAULT_PERIOD = 10
val GRAPHITE_DEFAULT_UNIT = "SECONDS"
val GRAPHITE_DEFAULT_PREFIX = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.metrics.sink
import java.util.Properties

import com.codahale.metrics.{JmxReporter, MetricRegistry}
import org.apache.spark.SecurityManager

import org.apache.spark.{SecurityManager, SparkConf}

private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
securityMgr: SecurityManager, conf: SparkConf) extends Sink {

val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import com.codahale.metrics.json.MetricsModule
import com.fasterxml.jackson.databind.ObjectMapper
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.SecurityManager
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.ui.JettyUtils._

private[spark] class MetricsServlet(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
securityMgr: SecurityManager, conf: SparkConf) extends Sink {
val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
*/
private[spark] class EventLoggingListener(
appName: String,
appUniqueName: String,
sparkConf: SparkConf,
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
extends SparkListener with Logging {
Expand All @@ -54,7 +54,7 @@ private[spark] class EventLoggingListener(
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
private val name = appUniqueName
val logDir = logBaseDir + "/" + name

protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1306,4 +1306,9 @@ private[spark] object Utils extends Logging {
s"$className: $desc\n$st"
}

/** Generate an unique name from the input string by appending an unique identifier. */
def generateUniqueName(name: String): String = {
name + "-" + SystemClock.getTime()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

// Verify logging directory exists
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
val eventLogger = new EventLoggingListener(Utils.generateUniqueName("test"), conf)
eventLogger.start()
val logPath = new Path(eventLogger.logDir)
assert(fileSystem.exists(logPath))
Expand Down Expand Up @@ -206,7 +206,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

// Verify that all information is correctly parsed before stop()
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
val eventLogger = new EventLoggingListener(Utils.generateUniqueName("test"), conf)
eventLogger.start()
var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
assertInfoCorrect(eventLoggingInfo, loggerStopped = false)
Expand All @@ -225,7 +225,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
*/
private def testEventLogging(compressionCodec: Option[String] = None) {
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
val eventLogger = new EventLoggingListener(Utils.generateUniqueName("test"), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
val applicationEnd = SparkListenerApplicationEnd(1000L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import com.codahale.metrics.ganglia.GangliaReporter
import info.ganglia.gmetric4j.gmetric.GMetric
import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode

import org.apache.spark.SecurityManager
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.metrics.MetricsSystem

class GangliaSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
securityMgr: SecurityManager, conf: SparkConf) extends Sink {
val GANGLIA_KEY_PERIOD = "period"
val GANGLIA_DEFAULT_PERIOD = 10

Expand Down