Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ed10348
Expose application id to spark context.
Jun 14, 2014
2fb7de4
Expose the application ID in the ApplicationStart event.
Jun 14, 2014
b3f3664
[yarn] Make the RM link point to the app direcly in the HS.
Jun 14, 2014
26b266e
Use Mesos framework ID as Spark application ID.
Jun 16, 2014
abc4697
Make FsHistoryProvider keep a map of applications by id.
Jun 17, 2014
0afd696
Wait until master responds before returning from start().
Jun 25, 2014
36dc362
Don't use Iterator::takeWhile().
Jun 30, 2014
d35d86f
Fix yarn backend after rebase.
Jul 16, 2014
311e49d
Merge branch 'master' into yarn-hs-link-2
Jul 29, 2014
57517b8
Review feedback. Mostly, more consistent use of Scala's Option.
Jul 29, 2014
4e3483f
Fix test.
Jul 29, 2014
c6d7478
Merge branch 'master' into yarn-hs-link-2
Aug 11, 2014
b022bae
Undo SparkContext cleanup.
Aug 11, 2014
21aa71b
Fix JSON test.
Aug 12, 2014
3f8ec66
Review feedback.
Aug 13, 2014
c90a08d
Remove unused code.
Aug 14, 2014
f0ba90f
Use BufferedIterator.
Aug 14, 2014
25f2826
Add MIMA excludes.
Aug 14, 2014
cb0cab2
Merge branch 'master' into yarn-hs-link-2
Aug 14, 2014
4b10cfd
Merge branch 'master' into yarn-hs-link-2
Aug 19, 2014
a0056e6
Unbreak test.
Aug 19, 2014
a86bbcf
Merge branch 'master' into yarn-hs-link-2
Aug 20, 2014
8278316
Merge branch 'master' into yarn-hs-link-2
Aug 20, 2014
44112a8
Merge branch 'master' into yarn-hs-link-2
Aug 27, 2014
56fe42e
Fix cluster mode history address, plus a cleanup.
Aug 27, 2014
6706d3a
Implement applicationId() in base classes.
Sep 2, 2014
2d19f3c
Review feedback.
Sep 2, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,10 @@ class SparkContext(config: SparkConf) extends Logging {

/** Post the application start event */
private def postApplicationStart() {
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you verified the initialization order? This imposes a new ordering constraint where the taskScheduler must be initialized before posting application start, otherwise we throw an NPE. It would be good to at least add a comment to document this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a note. I did verify the order, and that's what I had to add some synchronization for the standalone case, since it was returning "too early" for this code to work.

See: vanzin@0afd696

startTime, sparkUser))
}

/** Post the application end event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ private[spark] abstract class ApplicationHistoryProvider {
*
* @return List of all know applications.
*/
def getListing(): Seq[ApplicationHistoryInfo]
def getListing(): Iterable[ApplicationHistoryInfo]

/**
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
* @return The application's UI, or null if application is not found.
* @return The application's UI, or None if application is not found.
*/
def getAppUI(appId: String): SparkUI
def getAppUI(appId: String): Option[SparkUI]

/**
* Called when the server is shutting down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.spark.util.Utils
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {

private val NOT_STARTED = "<Not Started>"

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000
Expand All @@ -45,8 +47,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L

// List of applications, in order from newest to oldest.
@volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
// The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that
// is already known.
private var lastModifiedTime = -1L

// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

/**
* A background thread that periodically checks for event log updates on disk.
Expand Down Expand Up @@ -91,15 +100,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
logCheckingThread.start()
}

override def getListing() = appList
override def getListing() = applications.values

override def getAppUI(appId: String): SparkUI = {
override def getAppUI(appId: String): Option[SparkUI] = {
try {
val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
ui
applications.get(appId).map { info =>
val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
new Path(logDir, info.logDir)))
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId,
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
}

replayBus.replay()

ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we leave the app name as just appId if appListener.appName is not set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, since the code goes to great lengths to only look at finished applications, I'm not sure that will ever happen.


val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
appListener.viewAcls.getOrElse(""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should set these acls only if the respective things are defined (adminAcls, sparkUser etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the behavior in the current code, so I'd rather not change it here.

ui
}
} catch {
case e: FileNotFoundException => null
case e: FileNotFoundException => None
}
}

Expand All @@ -117,84 +146,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val logInfos = logDirs.filter { dir =>
fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
}

val currentApps = Map[String, ApplicationHistoryInfo](
appList.map(app => app.id -> app):_*)

// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
// Otherwise just reuse what's already in memory.
val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
for (dir <- logInfos) {
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
// Load all new logs from the log directory. Only directories that have a modification time
// later than the last known log directory will be loaded.
var newLastModifiedTime = lastModifiedTime
val logInfos = logDirs
.filter { dir =>
if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
val modTime = getModificationTime(dir)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime > lastModifiedTime
} else {
false
}
}
.flatMap { dir =>
try {
val (app, _) = loadAppInfo(dir, renderUI = false)
newApps += app
val (replayBus, appListener) = createReplayBus(dir)
replayBus.replay()
Some(new FsApplicationHistoryInfo(
dir.getPath().getName(),
appListener.appId.getOrElse(dir.getPath().getName()),
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(dir),
appListener.sparkUser.getOrElse(NOT_STARTED)))
} catch {
case e: Exception => logError(s"Failed to load app info from directory $dir.")
case e: Exception =>
logInfo(s"Failed to load application log data from $dir.", e)
None
}
}
.sortBy { info => -info.endTime }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will throw NPE if an exception is thrown in the map


lastModifiedTime = newLastModifiedTime

// When there are new logs, merge the new list with the existing one, maintaining
// the expected ordering (descending end time). Maintaining the order is important
// to avoid having to sort the list every time there is a request for the log list.
if (!logInfos.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a short comment here explaining why we need to do this whole block (i.e. the UI should display the apps in decreasing end time, and we want to get that order quickly every time the user refreshes)

val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo) = {
if (!newApps.contains(info.id)) {
newApps += (info.id -> info)
}
} else {
newApps += curr
}
}

appList = newApps.sortBy { info => -info.endTime }
val newIterator = logInfos.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (newIterator.head.endTime > oldIterator.head.endTime) {
addIfAbsent(newIterator.next)
} else {
addIfAbsent(oldIterator.next)
}
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: there are a lot of random empty lines in this if block. I think it would be clearer that this whole block is trying to do one thing if we get rid of these lines.

applications = newApps
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This chunk doesn't seem related to the original issue (of exposing application ID). Is this just refactoring or is there intentional functionality change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is vanzin@abc4697, in case you want to look at the change in isolation.

Basically, since now we need to maintain a map of app id -> app metadata, we need slightly different code to have the optimization where we only load data for new applications from HDFS.

} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
}
}

/**
* Parse the application's logs to find out the information we need to build the
* listing page.
*
* When creating the listing of available apps, there is no need to load the whole UI for the
* application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
* clicks on a specific application.
*
* @param logDir Directory with application's log files.
* @param renderUI Whether to create the SparkUI for the application.
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
*/
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
val path = logDir.getPath
val appId = path.getName
private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
val path = logDir.getPath()
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)

val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId,
HistoryServer.UI_PATH_PREFIX + s"/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
}

replayBus.replay()
val appInfo = ApplicationHistoryInfo(
appId,
appListener.appName,
appListener.startTime,
appListener.endTime,
getModificationTime(logDir),
appListener.sparkUser)

if (ui != null) {
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
}
(appInfo, ui)
(replayBus, appListener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. This abstraction is much better than before.

}

/** Return when this directory was last modified. */
Expand All @@ -217,3 +241,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)

}

private class FsApplicationHistoryInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a lot like a case class but it can't be one because case classes can't extend each other. At the very least we should add a comment explaining that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm not sure why the original is a case class to start with...

val logDir: String,
id: String,
name: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ class HistoryServer(

private val appLoader = new CacheLoader[String, SparkUI] {
override def load(key: String): SparkUI = {
val ui = provider.getAppUI(key)
if (ui == null) {
throw new NoSuchElementException()
}
val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
attachSparkUI(ui)
ui
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,31 @@ package org.apache.spark.scheduler
* from multiple applications are seen, the behavior is unspecified.
*/
private[spark] class ApplicationEventListener extends SparkListener {
var appName = "<Not Started>"
var sparkUser = "<Not Started>"
var startTime = -1L
var endTime = -1L
var viewAcls = ""
var adminAcls = ""

def applicationStarted = startTime != -1

def applicationCompleted = endTime != -1

def applicationDuration: Long = {
val difference = endTime - startTime
if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
}
var appName: Option[String] = None
var appId: Option[String] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inconsistent with appName and other fields. Either back all of them <Not Started> or make all of them Option[String]

var sparkUser: Option[String] = None
var startTime: Option[Long] = None
var endTime: Option[Long] = None
var viewAcls: Option[String] = None
var adminAcls: Option[String] = None

override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = applicationStart.appName
startTime = applicationStart.time
sparkUser = applicationStart.sparkUser
appName = Some(applicationStart.appName)
appId = applicationStart.appId
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
endTime = applicationEnd.time
endTime = Some(applicationEnd.time)
}

override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
val environmentDetails = environmentUpdate.environmentDetails
val allProperties = environmentDetails("Spark Properties").toMap
viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
adminAcls = allProperties.getOrElse("spark.admin.acls", "")
viewAcls = allProperties.get("spark.ui.view.acls")
adminAcls = allProperties.get("spark.admin.acls")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,12 @@ private[spark] trait SchedulerBackend {
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
def isReady(): Boolean = true

/**
* The application ID associated with the job, if any.
*
* @return The application ID, or None if the backend does not provide an ID.
*/
def applicationId(): Option[String] = None

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
extends SparkListenerEvent
case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
sparkUser: String) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,12 @@ private[spark] trait TaskScheduler {
*/
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean

/**
* The application ID associated with the job, if any.
*
* @return The application ID, or None if the backend does not provide an ID.
*/
def applicationId(): Option[String] = None

}
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ private[spark] class TaskSchedulerImpl(
}
}
}

override def applicationId(): Option[String] = backend.applicationId()

}


Expand Down Expand Up @@ -535,4 +538,5 @@ private[spark] object TaskSchedulerImpl {

retval.toList
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
// Submit tasks only after (registered resources / total expected resources)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
// if minRegisteredRatio has not yet been reached
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ private[spark] class SimrSchedulerBackend(
fs.delete(new Path(driverFilePath), false)
super.stop()
}

}
Loading