Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _eventLogger: Option[EventLoggingWriterListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
private var _listenerBusStarted: Boolean = false
Expand All @@ -245,6 +245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def appName: String = _conf.get("spark.app.name")

private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
private[spark] def eventLogGroupSize: Int = _conf.getInt("spark.eventLog.group.size", 0)
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec

Expand Down Expand Up @@ -333,7 +334,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null

private[spark] def eventLogger: Option[EventLoggingListener] = _eventLogger
private[spark] def eventLogger: Option[EventLoggingWriterListener] = _eventLogger

private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =
_executorAllocationManager
Expand Down Expand Up @@ -412,8 +413,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

_eventLogDir =
if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
val unresolvedDir = conf.get("spark.eventLog.dir",
EventLoggingWriterListener.DEFAULT_LOG_DIR).stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
Expand Down Expand Up @@ -527,9 +528,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

_eventLogger =
if (isEventLogEnabled) {
val logger =
val logger = if (eventLogGroupSize > 0) {
new EventLoggingGroupListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
} else {
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
}
logger.start()
listenerBus.addListener(logger)
Some(logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newLastScanTime = getNewLastScanTime()
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
val logInfos: Seq[FileStatus] = statusList

// App list should not contain meta file.
val logFiles = statusList.filter { entry => !entry.getPath.getName.contains("meta") }
val logInfos: Seq[FileStatus] = logFiles
.filter { entry =>
try {
getModificationTime(entry).map { time =>
Expand Down Expand Up @@ -307,7 +310,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val bus = new ReplayListenerBus()
val res = replay(fileStatus, bus)
val res = replayMetaEvent(fileStatus, bus)
res match {
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
Expand Down Expand Up @@ -386,6 +389,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
now - attempt.lastUpdated > maxAge && attempt.completed
}

def shouldCleanMeta(fs: FileSystem, appId: String): (Boolean, Path) = {
val metaFile = appId.replaceAll("part\\d+", "meta")
val metaPath = new Path(logDir, metaFile)
val appPrefix = appId.split("-part")(0)
if (fs.exists(metaPath) && applications.keySet.filter(_.contains(appPrefix)).size == 0) {
(true, metaPath)
} else {
(false, null)
}
}

// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
applications.values.foreach { app =>
Expand All @@ -411,6 +425,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logWarning(s"Error deleting ${path}")
}
}

// Determine if need to delete meta file
if (path.getName.contains("part")) {
val (needClean, metaPath) = shouldCleanMeta(fs, attempt.appId)
if (needClean) {
fs.delete(metaPath, true)
logInfo(s"Success to delete meta file: $metaPath")
}
}
} catch {
case e: AccessControlException =>
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
Expand Down Expand Up @@ -456,45 +479,88 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

/**
* Replays the events in the specified log file and returns information about the associated
* application. Return `None` if the application ID cannot be located.
* Replays the meta events in the specified log file and returns information about the associated
* application.
*/
private def replay(
private def replayMetaEvent(
eventLog: FileStatus,
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying meta file of log: $logPath")

val isGroupApp = logPath.getName.contains("part")
val input =
if (isGroupApp) {
EventLoggingWriterListener.getMetaStream(logPath, fs)
} else if (isLegacyLogDirectory(eventLog)) {
openLegacyEventLog(logPath)
} else {
EventLoggingWriterListener.openEventLog(logPath, fs)
}

try {
val appListener = new ApplicationEventListener
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(input, logPath.toString, !appCompleted)

val appId = if (isGroupApp) {
logPath.getName.split("\\.")(0)
} else if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
appListener.appId.getOrElse(logPath.getName())
} else {
return None
}

Some(new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appId,
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted))
} finally {
input.close()
}
}

/**
* Replays the events in the specified log file and returns information about the associated
* application. Return `None` if the application ID cannot be located.
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): Option[String] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")

val metaInput: Option[InputStream] =
if (logPath.getName.contains("part")) {
Some(EventLoggingWriterListener.getMetaStream(logPath, fs))
} else {
None
}
val logInput =
if (isLegacyLogDirectory(eventLog)) {
openLegacyEventLog(logPath)
} else {
EventLoggingListener.openEventLog(logPath, fs)
EventLoggingWriterListener.openEventLog(logPath, fs)
}
try {
val appListener = new ApplicationEventListener
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)

// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
// try to show their UI. Some old versions of Spark generate logs without an app ID, so let
// logs generated by those versions go through.
if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
Some(new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted))
} else {
None
if (null != logInput) {
bus.replay(logInput, logPath.toString, !appCompleted)
}
metaInput.map(bus.replay(_, logPath.toString, !appCompleted))
Some("")
} finally {
logInput.close()
metaInput.foreach(_.close)
if (null != logInput) {
logInput.close()
}
}
}

Expand Down Expand Up @@ -563,7 +629,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (isLegacyLogDirectory(entry)) {
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
} else {
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
!entry.getPath().getName().endsWith(EventLoggingWriterListener.IN_PROGRESS)
}
}

Expand Down Expand Up @@ -592,8 +658,8 @@ private[history] object FsHistoryProvider {

// Constants used to parse Spark 1.0.0 log directories.
val LOG_PREFIX = "EVENT_LOG_"
val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
val SPARK_VERSION_PREFIX = EventLoggingWriterListener.SPARK_VERSION_KEY + "_"
val COMPRESSION_CODEC_PREFIX = EventLoggingWriterListener.COMPRESSION_CODEC_KEY + "_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.scheduler.{EventLoggingWriterListener, EventLoggingListener, ReplayListenerBus}
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
Expand Down Expand Up @@ -924,28 +924,28 @@ private[deploy] class Master(
return None
}

val eventLogFilePrefix = EventLoggingListener.getLogPath(
val eventLogFilePrefix = EventLoggingWriterListener.getLogPath(
eventLogDir, app.id, app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
EventLoggingListener.IN_PROGRESS))
EventLoggingWriterListener.IN_PROGRESS))

if (inProgressExists) {
// Event logging is enabled for this application, but the application is still in progress
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
}

val (eventLogFile, status) = if (inProgressExists) {
(eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)")
(eventLogFilePrefix + EventLoggingWriterListener.IN_PROGRESS, " (in progress)")
} else {
(eventLogFilePrefix, " (completed)")
}

val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val logInput = EventLoggingWriterListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
val maybeTruncated = eventLogFile.endsWith(EventLoggingWriterListener.IN_PROGRESS)
try {
replayBus.replay(logInput, eventLogFile, maybeTruncated)
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import scala.collection.mutable

import org.apache.hadoop.fs.FSDataOutputStream

/**
* A SparkListener that logs events to persistent storage.
*
* Event logging is specified by the following configurable parameters:
* spark.eventLog.enabled - Whether event logging is enabled.
* spark.eventLog.compress - Whether to compress logged events
* spark.eventLog.overwrite - Whether to overwrite any existing files.
* spark.eventLog.dir - Path to the directory in which events are logged.
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
*/
private[spark] class EventLogGroupInfo {
var filePath: String = null
var hadoopDataStream: Option[FSDataOutputStream] = None

var jobNum = 0
var completedJobNum = 0
val jobIdToStages = new mutable.HashMap[Int, Set[Int]]
}
Loading