-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-3388] Expose aplication ID in ApplicationStart event, use it in history server. #1218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ed10348
2fb7de4
b3f3664
26b266e
abc4697
0afd696
36dc362
d35d86f
311e49d
57517b8
4e3483f
c6d7478
b022bae
21aa71b
3f8ec66
c90a08d
f0ba90f
25f2826
cb0cab2
4b10cfd
a0056e6
a86bbcf
8278316
44112a8
56fe42e
6706d3a
2d19f3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,8 @@ import org.apache.spark.util.Utils | |
| private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider | ||
| with Logging { | ||
|
|
||
| private val NOT_STARTED = "<Not Started>" | ||
|
|
||
| // Interval between each check for event log updates | ||
| private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", | ||
| conf.getInt("spark.history.updateInterval", 10)) * 1000 | ||
|
|
@@ -45,8 +47,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis | |
| // A timestamp of when the disk was last accessed to check for log updates | ||
| private var lastLogCheckTimeMs = -1L | ||
|
|
||
| // List of applications, in order from newest to oldest. | ||
| @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil | ||
| // The modification time of the newest log detected during the last scan. This is used | ||
| // to ignore logs that are older during subsequent scans, to avoid processing data that | ||
| // is already known. | ||
| private var lastModifiedTime = -1L | ||
|
|
||
| // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted | ||
| // into the map in order, so the LinkedHashMap maintains the correct ordering. | ||
| @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] | ||
| = new mutable.LinkedHashMap() | ||
|
|
||
| /** | ||
| * A background thread that periodically checks for event log updates on disk. | ||
|
|
@@ -91,15 +100,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis | |
| logCheckingThread.start() | ||
| } | ||
|
|
||
| override def getListing() = appList | ||
| override def getListing() = applications.values | ||
|
|
||
| override def getAppUI(appId: String): SparkUI = { | ||
| override def getAppUI(appId: String): Option[SparkUI] = { | ||
| try { | ||
| val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId)) | ||
| val (_, ui) = loadAppInfo(appLogDir, renderUI = true) | ||
| ui | ||
| applications.get(appId).map { info => | ||
| val (replayBus, appListener) = createReplayBus(fs.getFileStatus( | ||
| new Path(logDir, info.logDir))) | ||
| val ui = { | ||
| val conf = this.conf.clone() | ||
| val appSecManager = new SecurityManager(conf) | ||
| new SparkUI(conf, appSecManager, replayBus, appId, | ||
| s"${HistoryServer.UI_PATH_PREFIX}/$appId") | ||
| // Do not call ui.bind() to avoid creating a new server for each application | ||
| } | ||
|
|
||
| replayBus.replay() | ||
|
|
||
| ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we leave the app name as just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be fair, since the code goes to great lengths to only look at finished applications, I'm not sure that will ever happen. |
||
|
|
||
| val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) | ||
| ui.getSecurityManager.setAcls(uiAclsEnabled) | ||
| // make sure to set admin acls before view acls so they are properly picked up | ||
| ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) | ||
| ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED), | ||
| appListener.viewAcls.getOrElse("")) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should set these acls only if the respective things are defined ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This matches the behavior in the current code, so I'd rather not change it here. |
||
| ui | ||
| } | ||
| } catch { | ||
| case e: FileNotFoundException => null | ||
| case e: FileNotFoundException => None | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -117,84 +146,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis | |
| try { | ||
| val logStatus = fs.listStatus(new Path(resolvedLogDir)) | ||
| val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() | ||
| val logInfos = logDirs.filter { dir => | ||
| fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE)) | ||
| } | ||
|
|
||
| val currentApps = Map[String, ApplicationHistoryInfo]( | ||
| appList.map(app => app.id -> app):_*) | ||
|
|
||
| // For any application that either (i) is not listed or (ii) has changed since the last time | ||
| // the listing was created (defined by the log dir's modification time), load the app's info. | ||
| // Otherwise just reuse what's already in memory. | ||
| val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size) | ||
| for (dir <- logInfos) { | ||
| val curr = currentApps.getOrElse(dir.getPath().getName(), null) | ||
| if (curr == null || curr.lastUpdated < getModificationTime(dir)) { | ||
| // Load all new logs from the log directory. Only directories that have a modification time | ||
| // later than the last known log directory will be loaded. | ||
| var newLastModifiedTime = lastModifiedTime | ||
| val logInfos = logDirs | ||
| .filter { dir => | ||
| if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) { | ||
| val modTime = getModificationTime(dir) | ||
| newLastModifiedTime = math.max(newLastModifiedTime, modTime) | ||
| modTime > lastModifiedTime | ||
| } else { | ||
| false | ||
| } | ||
| } | ||
| .flatMap { dir => | ||
| try { | ||
| val (app, _) = loadAppInfo(dir, renderUI = false) | ||
| newApps += app | ||
| val (replayBus, appListener) = createReplayBus(dir) | ||
| replayBus.replay() | ||
| Some(new FsApplicationHistoryInfo( | ||
| dir.getPath().getName(), | ||
| appListener.appId.getOrElse(dir.getPath().getName()), | ||
| appListener.appName.getOrElse(NOT_STARTED), | ||
| appListener.startTime.getOrElse(-1L), | ||
| appListener.endTime.getOrElse(-1L), | ||
| getModificationTime(dir), | ||
| appListener.sparkUser.getOrElse(NOT_STARTED))) | ||
| } catch { | ||
| case e: Exception => logError(s"Failed to load app info from directory $dir.") | ||
| case e: Exception => | ||
| logInfo(s"Failed to load application log data from $dir.", e) | ||
| None | ||
| } | ||
| } | ||
| .sortBy { info => -info.endTime } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will throw NPE if an exception is thrown in the |
||
|
|
||
| lastModifiedTime = newLastModifiedTime | ||
|
|
||
| // When there are new logs, merge the new list with the existing one, maintaining | ||
| // the expected ordering (descending end time). Maintaining the order is important | ||
| // to avoid having to sort the list every time there is a request for the log list. | ||
| if (!logInfos.isEmpty) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would add a short comment here explaining why we need to do this whole block (i.e. the UI should display the apps in decreasing end time, and we want to get that order quickly every time the user refreshes) |
||
| val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() | ||
| def addIfAbsent(info: FsApplicationHistoryInfo) = { | ||
| if (!newApps.contains(info.id)) { | ||
| newApps += (info.id -> info) | ||
| } | ||
| } else { | ||
| newApps += curr | ||
| } | ||
| } | ||
|
|
||
| appList = newApps.sortBy { info => -info.endTime } | ||
| val newIterator = logInfos.iterator.buffered | ||
| val oldIterator = applications.values.iterator.buffered | ||
| while (newIterator.hasNext && oldIterator.hasNext) { | ||
| if (newIterator.head.endTime > oldIterator.head.endTime) { | ||
| addIfAbsent(newIterator.next) | ||
| } else { | ||
| addIfAbsent(oldIterator.next) | ||
| } | ||
| } | ||
| newIterator.foreach(addIfAbsent) | ||
| oldIterator.foreach(addIfAbsent) | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: there are a lot of random empty lines in this |
||
| applications = newApps | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This chunk doesn't seem related to the original issue (of exposing application ID). Is this just refactoring or is there intentional functionality change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is vanzin@abc4697, in case you want to look at the change in isolation. Basically, since now we need to maintain a map of app id -> app metadata, we need slightly different code to have the optimization where we only load data for new applications from HDFS. |
||
| } catch { | ||
| case t: Throwable => logError("Exception in checking for event log updates", t) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Parse the application's logs to find out the information we need to build the | ||
| * listing page. | ||
| * | ||
| * When creating the listing of available apps, there is no need to load the whole UI for the | ||
| * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user | ||
| * clicks on a specific application. | ||
| * | ||
| * @param logDir Directory with application's log files. | ||
| * @param renderUI Whether to create the SparkUI for the application. | ||
| * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false. | ||
| */ | ||
| private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = { | ||
| val path = logDir.getPath | ||
| val appId = path.getName | ||
| private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = { | ||
| val path = logDir.getPath() | ||
| val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs) | ||
| val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec) | ||
| val appListener = new ApplicationEventListener | ||
| replayBus.addListener(appListener) | ||
|
|
||
| val ui: SparkUI = if (renderUI) { | ||
| val conf = this.conf.clone() | ||
| val appSecManager = new SecurityManager(conf) | ||
| new SparkUI(conf, appSecManager, replayBus, appId, | ||
| HistoryServer.UI_PATH_PREFIX + s"/$appId") | ||
| // Do not call ui.bind() to avoid creating a new server for each application | ||
| } else { | ||
| null | ||
| } | ||
|
|
||
| replayBus.replay() | ||
| val appInfo = ApplicationHistoryInfo( | ||
| appId, | ||
| appListener.appName, | ||
| appListener.startTime, | ||
| appListener.endTime, | ||
| getModificationTime(logDir), | ||
| appListener.sparkUser) | ||
|
|
||
| if (ui != null) { | ||
| val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) | ||
| ui.getSecurityManager.setAcls(uiAclsEnabled) | ||
| // make sure to set admin acls before view acls so properly picked up | ||
| ui.getSecurityManager.setAdminAcls(appListener.adminAcls) | ||
| ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) | ||
| } | ||
| (appInfo, ui) | ||
| (replayBus, appListener) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great. This abstraction is much better than before. |
||
| } | ||
|
|
||
| /** Return when this directory was last modified. */ | ||
|
|
@@ -217,3 +241,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis | |
| private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000) | ||
|
|
||
| } | ||
|
|
||
| private class FsApplicationHistoryInfo( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks a lot like a case class but it can't be one because case classes can't extend each other. At the very least we should add a comment explaining that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually I'm not sure why the original is a case class to start with... |
||
| val logDir: String, | ||
| id: String, | ||
| name: String, | ||
| startTime: Long, | ||
| endTime: Long, | ||
| lastUpdated: Long, | ||
| sparkUser: String) | ||
| extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,38 +24,31 @@ package org.apache.spark.scheduler | |
| * from multiple applications are seen, the behavior is unspecified. | ||
| */ | ||
| private[spark] class ApplicationEventListener extends SparkListener { | ||
| var appName = "<Not Started>" | ||
| var sparkUser = "<Not Started>" | ||
| var startTime = -1L | ||
| var endTime = -1L | ||
| var viewAcls = "" | ||
| var adminAcls = "" | ||
|
|
||
| def applicationStarted = startTime != -1 | ||
|
|
||
| def applicationCompleted = endTime != -1 | ||
|
|
||
| def applicationDuration: Long = { | ||
| val difference = endTime - startTime | ||
| if (applicationStarted && applicationCompleted && difference > 0) difference else -1L | ||
| } | ||
| var appName: Option[String] = None | ||
| var appId: Option[String] = None | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is inconsistent with appName and other fields. Either back all of them |
||
| var sparkUser: Option[String] = None | ||
| var startTime: Option[Long] = None | ||
| var endTime: Option[Long] = None | ||
| var viewAcls: Option[String] = None | ||
| var adminAcls: Option[String] = None | ||
|
|
||
| override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { | ||
| appName = applicationStart.appName | ||
| startTime = applicationStart.time | ||
| sparkUser = applicationStart.sparkUser | ||
| appName = Some(applicationStart.appName) | ||
| appId = applicationStart.appId | ||
| startTime = Some(applicationStart.time) | ||
| sparkUser = Some(applicationStart.sparkUser) | ||
| } | ||
|
|
||
| override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { | ||
| endTime = applicationEnd.time | ||
| endTime = Some(applicationEnd.time) | ||
| } | ||
|
|
||
| override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { | ||
| synchronized { | ||
| val environmentDetails = environmentUpdate.environmentDetails | ||
| val allProperties = environmentDetails("Spark Properties").toMap | ||
| viewAcls = allProperties.getOrElse("spark.ui.view.acls", "") | ||
| adminAcls = allProperties.getOrElse("spark.admin.acls", "") | ||
| viewAcls = allProperties.get("spark.ui.view.acls") | ||
| adminAcls = allProperties.get("spark.admin.acls") | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,4 +68,5 @@ private[spark] class SimrSchedulerBackend( | |
| fs.delete(new Path(driverFilePath), false) | ||
| super.stop() | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you verified the initialization order? This imposes a new ordering constraint where the
taskSchedulermust be initialized before posting application start, otherwise we throw an NPE. It would be good to at least add a comment to document this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a note. I did verify the order, and that's what I had to add some synchronization for the standalone case, since it was returning "too early" for this code to work.
See: vanzin@0afd696