Skip to content

Commit c5b6b39

Browse files
BryanCutlerAndrew Or
authored andcommitted
[SPARK-12062][CORE] Change Master to asyc rebuild UI when application completes
This change builds the event history of completed apps asynchronously so the RPC thread will not be blocked and allow new workers to register/remove if the event log history is very large and takes a long time to rebuild. Author: Bryan Cutler <[email protected]> Closes #10284 from BryanCutler/async-MasterUI-SPARK-12062.
1 parent 8a215d2 commit c5b6b39

File tree

2 files changed

+52
-29
lines changed

2 files changed

+52
-29
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import java.io.FileNotFoundException
2121
import java.net.URLEncoder
2222
import java.text.SimpleDateFormat
2323
import java.util.Date
24-
import java.util.concurrent.{ScheduledFuture, TimeUnit}
24+
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
2525

2626
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
27+
import scala.concurrent.duration.Duration
28+
import scala.concurrent.{Await, ExecutionContext, Future}
2729
import scala.language.postfixOps
2830
import scala.util.Random
2931

@@ -56,6 +58,10 @@ private[deploy] class Master(
5658
private val forwardMessageThread =
5759
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
5860

61+
private val rebuildUIThread =
62+
ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
63+
private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)
64+
5965
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
6066

6167
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
@@ -78,7 +84,8 @@ private[deploy] class Master(
7884
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
7985
private val completedApps = new ArrayBuffer[ApplicationInfo]
8086
private var nextAppNumber = 0
81-
private val appIdToUI = new HashMap[String, SparkUI]
87+
// Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
88+
private val appIdToUI = new ConcurrentHashMap[String, SparkUI]
8289

8390
private val drivers = new HashSet[DriverInfo]
8491
private val completedDrivers = new ArrayBuffer[DriverInfo]
@@ -191,6 +198,7 @@ private[deploy] class Master(
191198
checkForWorkerTimeOutTask.cancel(true)
192199
}
193200
forwardMessageThread.shutdownNow()
201+
rebuildUIThread.shutdownNow()
194202
webUi.stop()
195203
restServer.foreach(_.stop())
196204
masterMetricsSystem.stop()
@@ -367,6 +375,10 @@ private[deploy] class Master(
367375
case CheckForWorkerTimeOut => {
368376
timeOutDeadWorkers()
369377
}
378+
379+
case AttachCompletedRebuildUI(appId) =>
380+
// An asyncRebuildSparkUI has completed, so need to attach to master webUi
381+
Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) }
370382
}
371383

372384
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -809,7 +821,7 @@ private[deploy] class Master(
809821
if (completedApps.size >= RETAINED_APPLICATIONS) {
810822
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
811823
completedApps.take(toRemove).foreach( a => {
812-
appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
824+
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
813825
applicationMetricsSystem.removeSource(a.appSource)
814826
})
815827
completedApps.trimStart(toRemove)
@@ -818,7 +830,7 @@ private[deploy] class Master(
818830
waitingApps -= app
819831

820832
// If application events are logged, use them to rebuild the UI
821-
rebuildSparkUI(app)
833+
asyncRebuildSparkUI(app)
822834

823835
for (exec <- app.executors.values) {
824836
killExecutor(exec)
@@ -923,49 +935,57 @@ private[deploy] class Master(
923935
* Return the UI if successful, else None
924936
*/
925937
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
938+
val futureUI = asyncRebuildSparkUI(app)
939+
Await.result(futureUI, Duration.Inf)
940+
}
941+
942+
/** Rebuild a new SparkUI asynchronously to not block RPC event loop */
943+
private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = {
926944
val appName = app.desc.name
927945
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
928-
try {
929-
val eventLogDir = app.desc.eventLogDir
930-
.getOrElse {
931-
// Event logging is not enabled for this application
932-
app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
933-
return None
934-
}
935-
946+
val eventLogDir = app.desc.eventLogDir
947+
.getOrElse {
948+
// Event logging is disabled for this application
949+
app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
950+
return Future.successful(None)
951+
}
952+
val futureUI = Future {
936953
val eventLogFilePrefix = EventLoggingListener.getLogPath(
937-
eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
954+
eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
938955
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
939956
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
940-
EventLoggingListener.IN_PROGRESS))
957+
EventLoggingListener.IN_PROGRESS))
941958

942-
if (inProgressExists) {
959+
val eventLogFile = if (inProgressExists) {
943960
// Event logging is enabled for this application, but the application is still in progress
944961
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
945-
}
946-
947-
val (eventLogFile, status) = if (inProgressExists) {
948-
(eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)")
962+
eventLogFilePrefix + EventLoggingListener.IN_PROGRESS
949963
} else {
950-
(eventLogFilePrefix, " (completed)")
964+
eventLogFilePrefix
951965
}
952966

953967
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
954968
val replayBus = new ReplayListenerBus()
955969
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
956970
appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
957-
val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
958971
try {
959-
replayBus.replay(logInput, eventLogFile, maybeTruncated)
972+
replayBus.replay(logInput, eventLogFile, inProgressExists)
960973
} finally {
961974
logInput.close()
962975
}
963-
appIdToUI(app.id) = ui
964-
webUi.attachSparkUI(ui)
976+
977+
Some(ui)
978+
}(rebuildUIContext)
979+
980+
futureUI.onSuccess { case Some(ui) =>
981+
appIdToUI.put(app.id, ui)
982+
self.send(AttachCompletedRebuildUI(app.id))
965983
// Application UI is successfully rebuilt, so link the Master UI to it
984+
// NOTE - app.appUIUrlAtHistoryServer is volatile
966985
app.appUIUrlAtHistoryServer = Some(ui.basePath)
967-
Some(ui)
968-
} catch {
986+
}(ThreadUtils.sameThread)
987+
988+
futureUI.onFailure {
969989
case fnf: FileNotFoundException =>
970990
// Event logging is enabled for this application, but no event logs are found
971991
val title = s"Application history not found (${app.id})"
@@ -974,7 +994,7 @@ private[deploy] class Master(
974994
msg += " Did you specify the correct logging directory?"
975995
msg = URLEncoder.encode(msg, "UTF-8")
976996
app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title")
977-
None
997+
978998
case e: Exception =>
979999
// Relay exception message to application UI page
9801000
val title = s"Application history load error (${app.id})"
@@ -984,8 +1004,9 @@ private[deploy] class Master(
9841004
msg = URLEncoder.encode(msg, "UTF-8")
9851005
app.appUIUrlAtHistoryServer =
9861006
Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title")
987-
None
988-
}
1007+
}(ThreadUtils.sameThread)
1008+
1009+
futureUI
9891010
}
9901011

9911012
/** Generate a new app ID given a app's submission date */

core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,6 @@ private[master] object MasterMessages {
3939
case object BoundPortsRequest
4040

4141
case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])
42+
43+
case class AttachCompletedRebuildUI(appId: String)
4244
}

0 commit comments

Comments
 (0)