From 62c982b0048252d88de27e0791cbafbbc69c6c57 Mon Sep 17 00:00:00 2001 From: xutingjun Date: Fri, 23 Oct 2015 15:52:10 +0800 Subject: [PATCH 1/2] add big event log --- .../scala/org/apache/spark/SparkContext.scala | 15 +- .../deploy/history/FsHistoryProvider.scala | 122 ++++++-- .../apache/spark/deploy/master/Master.scala | 12 +- .../spark/scheduler/EventLogGroupInfo.scala | 41 +++ .../scheduler/EventLoggingGroupListener.scala | 284 ++++++++++++++++++ .../scheduler/EventLoggingListener.scala | 167 ++-------- .../EventLoggingWriterListener.scala | 262 ++++++++++++++++ .../org/apache/spark/scheduler/TaskInfo.scala | 3 +- .../spark/scheduler/TaskSetManager.scala | 2 +- .../apache/spark/util/JsonGroupProtocol.scala | 66 ++++ .../history/FsHistoryProviderSuite.scala | 10 +- .../scheduler/EventLoggingListenerSuite.scala | 22 +- .../spark/scheduler/ReplayListenerSuite.scala | 4 +- 13 files changed, 802 insertions(+), 208 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/EventLogGroupInfo.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/EventLoggingGroupListener.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/EventLoggingWriterListener.scala create mode 100644 core/src/main/scala/org/apache/spark/util/JsonGroupProtocol.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ccba3ed9e643c..9a925c850944b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -218,7 +218,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli @volatile private var _dagScheduler: DAGScheduler = _ private var _applicationId: String = _ private var _applicationAttemptId: Option[String] = None - private var _eventLogger: Option[EventLoggingListener] = None + private var _eventLogger: Option[EventLoggingWriterListener] = None private var _executorAllocationManager: Option[ExecutorAllocationManager] = None private var _cleaner: Option[ContextCleaner] = None private var _listenerBusStarted: Boolean = false @@ -245,6 +245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def appName: String = _conf.get("spark.app.name") private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false) + private[spark] def eventLogGroupSize: Int = _conf.getInt("spark.eventLog.group.size", 0) private[spark] def eventLogDir: Option[URI] = _eventLogDir private[spark] def eventLogCodec: Option[String] = _eventLogCodec @@ -333,7 +334,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null - private[spark] def eventLogger: Option[EventLoggingListener] = _eventLogger + private[spark] def eventLogger: Option[EventLoggingWriterListener] = _eventLogger private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] = _executorAllocationManager @@ -412,8 +413,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _eventLogDir = if (isEventLogEnabled) { - val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) - .stripSuffix("/") + val unresolvedDir = conf.get("spark.eventLog.dir", + EventLoggingWriterListener.DEFAULT_LOG_DIR).stripSuffix("/") Some(Utils.resolveURI(unresolvedDir)) } else { None @@ -527,9 +528,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _eventLogger = if (isEventLogEnabled) { - val logger = + val logger = if (eventLogGroupSize > 0) { + new EventLoggingGroupListener(_applicationId, _applicationAttemptId, _eventLogDir.get, + _conf, _hadoopConfiguration) + } else { new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) + } logger.start() listenerBus.addListener(logger) Some(logger) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 80bfda9dddb39..c0c225d847e70 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -182,7 +182,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newLastScanTime = getNewLastScanTime() val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) .getOrElse(Seq[FileStatus]()) - val logInfos: Seq[FileStatus] = statusList + + // App list should not contain meta file. + val logFiles = statusList.filter { entry => !entry.getPath.getName.contains("meta") } + val logInfos: Seq[FileStatus] = logFiles .filter { entry => try { getModificationTime(entry).map { time => @@ -307,7 +310,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newAttempts = logs.flatMap { fileStatus => try { val bus = new ReplayListenerBus() - val res = replay(fileStatus, bus) + val res = replayMetaEvent(fileStatus, bus) res match { case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.") case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " + @@ -386,6 +389,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) now - attempt.lastUpdated > maxAge && attempt.completed } + def shouldCleanMeta(fs: FileSystem, appId: String): (Boolean, Path) = { + val metaFile = appId.replaceAll("part\\d+", "meta") + val metaPath = new Path(logDir, metaFile) + val appPrefix = appId.split("-part")(0) + if (fs.exists(metaPath) && applications.keySet.filter(_.contains(appPrefix)).size == 0) { + (true, metaPath) + } else { + (false, null) + } + } + // Scan all logs from the log directory. // Only completed applications older than the specified max age will be deleted. applications.values.foreach { app => @@ -411,6 +425,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logWarning(s"Error deleting ${path}") } } + + // Determine if need to delete meta file + if (path.getName.contains("part")) { + val (needClean, metaPath) = shouldCleanMeta(fs, attempt.appId) + if (needClean) { + fs.delete(metaPath, true) + logInfo(s"Success to delete meta file: $metaPath") + } + } } catch { case e: AccessControlException => logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") @@ -456,45 +479,88 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** - * Replays the events in the specified log file and returns information about the associated - * application. Return `None` if the application ID cannot be located. + * Replays the meta events in the specified log file and returns information about the associated + * application. */ - private def replay( + private def replayMetaEvent( eventLog: FileStatus, bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = { val logPath = eventLog.getPath() + logInfo(s"Replaying meta file of log: $logPath") + + val isGroupApp = logPath.getName.contains("part") + val input = + if (isGroupApp) { + EventLoggingWriterListener.getMetaStream(logPath, fs) + } else if (isLegacyLogDirectory(eventLog)) { + openLegacyEventLog(logPath) + } else { + EventLoggingWriterListener.openEventLog(logPath, fs) + } + + try { + val appListener = new ApplicationEventListener + val appCompleted = isApplicationCompleted(eventLog) + bus.addListener(appListener) + bus.replay(input, logPath.toString, !appCompleted) + + val appId = if (isGroupApp) { + logPath.getName.split("\\.")(0) + } else if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) { + appListener.appId.getOrElse(logPath.getName()) + } else { + return None + } + + Some(new FsApplicationAttemptInfo( + logPath.getName(), + appListener.appName.getOrElse(NOT_STARTED), + appId, + appListener.appAttemptId, + appListener.startTime.getOrElse(-1L), + appListener.endTime.getOrElse(-1L), + getModificationTime(eventLog).get, + appListener.sparkUser.getOrElse(NOT_STARTED), + appCompleted)) + } finally { + input.close() + } + } + + /** + * Replays the events in the specified log file and returns information about the associated + * application. Return `None` if the application ID cannot be located. + */ + private def replay(eventLog: FileStatus, bus: ReplayListenerBus): Option[String] = { + val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") + + val metaInput: Option[InputStream] = + if (logPath.getName.contains("part")) { + Some(EventLoggingWriterListener.getMetaStream(logPath, fs)) + } else { + None + } val logInput = if (isLegacyLogDirectory(eventLog)) { openLegacyEventLog(logPath) } else { - EventLoggingListener.openEventLog(logPath, fs) + EventLoggingWriterListener.openEventLog(logPath, fs) } try { val appListener = new ApplicationEventListener val appCompleted = isApplicationCompleted(eventLog) bus.addListener(appListener) - bus.replay(logInput, logPath.toString, !appCompleted) - - // Without an app ID, new logs will render incorrectly in the listing page, so do not list or - // try to show their UI. Some old versions of Spark generate logs without an app ID, so let - // logs generated by those versions go through. - if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) { - Some(new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appListener.appId.getOrElse(logPath.getName()), - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - getModificationTime(eventLog).get, - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted)) - } else { - None + if (null != logInput) { + bus.replay(logInput, logPath.toString, !appCompleted) } + metaInput.map(bus.replay(_, logPath.toString, !appCompleted)) + Some("") } finally { - logInput.close() + metaInput.foreach(_.close) + if (null != logInput) { + logInput.close() + } } } @@ -563,7 +629,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) if (isLegacyLogDirectory(entry)) { fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE)) } else { - !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS) + !entry.getPath().getName().endsWith(EventLoggingWriterListener.IN_PROGRESS) } } @@ -592,8 +658,8 @@ private[history] object FsHistoryProvider { // Constants used to parse Spark 1.0.0 log directories. val LOG_PREFIX = "EVENT_LOG_" - val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_" - val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_" + val SPARK_VERSION_PREFIX = EventLoggingWriterListener.SPARK_VERSION_KEY + "_" + val COMPRESSION_CODEC_PREFIX = EventLoggingWriterListener.COMPRESSION_CODEC_KEY + "_" val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 6715d6c70f497..c14ecbca98ddb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -40,7 +40,7 @@ import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.deploy.rest.StandaloneRestServer import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus} +import org.apache.spark.scheduler.{EventLoggingWriterListener, EventLoggingListener, ReplayListenerBus} import org.apache.spark.serializer.{JavaSerializer, Serializer} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} @@ -924,11 +924,11 @@ private[deploy] class Master( return None } - val eventLogFilePrefix = EventLoggingListener.getLogPath( + val eventLogFilePrefix = EventLoggingWriterListener.getLogPath( eventLogDir, app.id, app.desc.eventLogCodec) val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf) val inProgressExists = fs.exists(new Path(eventLogFilePrefix + - EventLoggingListener.IN_PROGRESS)) + EventLoggingWriterListener.IN_PROGRESS)) if (inProgressExists) { // Event logging is enabled for this application, but the application is still in progress @@ -936,16 +936,16 @@ private[deploy] class Master( } val (eventLogFile, status) = if (inProgressExists) { - (eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)") + (eventLogFilePrefix + EventLoggingWriterListener.IN_PROGRESS, " (in progress)") } else { (eventLogFilePrefix, " (completed)") } - val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs) + val logInput = EventLoggingWriterListener.openEventLog(new Path(eventLogFile), fs) val replayBus = new ReplayListenerBus() val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime) - val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS) + val maybeTruncated = eventLogFile.endsWith(EventLoggingWriterListener.IN_PROGRESS) try { replayBus.replay(logInput, eventLogFile, maybeTruncated) } finally { diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLogGroupInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLogGroupInfo.scala new file mode 100644 index 0000000000000..81da493ac9747 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLogGroupInfo.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable + +import org.apache.hadoop.fs.FSDataOutputStream + +/** + * A SparkListener that logs events to persistent storage. + * + * Event logging is specified by the following configurable parameters: + * spark.eventLog.enabled - Whether event logging is enabled. + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files. + * spark.eventLog.dir - Path to the directory in which events are logged. + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + */ +private[spark] class EventLogGroupInfo { + var filePath: String = null + var hadoopDataStream: Option[FSDataOutputStream] = None + + var jobNum = 0 + var completedJobNum = 0 + val jobIdToStages = new mutable.HashMap[Int, Set[Int]] +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingGroupListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingGroupListener.scala new file mode 100644 index 0000000000000..83783517918f9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingGroupListener.scala @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io._ +import java.net.URI + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FSDataOutputStream} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.{JsonProtocol, JsonGroupProtocol, Utils} +import org.apache.spark._ +import org.json4s.JsonAST.JValue +import org.json4s.jackson.JsonMethods._ + +/** + * A SparkListener that logs events to persistent storage. + * + * Event logging is specified by the following configurable parameters: + * spark.eventLog.enabled - Whether event logging is enabled. + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files. + * spark.eventLog.dir - Path to the directory in which events are logged. + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + */ +private[spark] class EventLoggingGroupListener ( + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration) + extends SparkListener with EventLoggingWriterListener with Logging { + + import EventLoggingWriterListener._ + + def this(appId: String, appAttemptId: Option[String], logBaseDir: URI, sparkConf: SparkConf) = + this(appId, appAttemptId, logBaseDir, sparkConf, + SparkHadoopUtil.get.newConfiguration(sparkConf)) + + private val groupSize = sparkConf.getInt("spark.eventLog.group.size", 0) + private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) + private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) + private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 + private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + private val compressionCodec = + if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf)) + } else { + None + } + private val compressionCodecName = compressionCodec.map { c => + CompressionCodec.getShortName(c.getClass.getName) + } + private val codec = compressionCodecName.map("." + _).getOrElse("") + + // The Hadoop APIs have changed over time, so we use reflection to figure out + // the correct method to use to flush a hadoop data stream. See SPARK-1518 + // for details. + private val hadoopFlushMethod = getHadoopFlushMethod + + // Only defined if the file system scheme is not local + private var metaHadoopDataStream: Option[FSDataOutputStream] = None + private var metaFilePath: String = null + private var metaWriter: PrintWriter = null + private val groupWriters = new mutable.HashMap[PrintWriter, EventLogGroupInfo] + private var lastGroupWriter: PrintWriter = null + private var lastGroupInfo: EventLogGroupInfo = null + private var groupNum = 0 + + private val logPath = getLogPath(logBaseDir, appId, appAttemptId) + + /** + * Creates the meta file and the first part file in the configured log directory. + */ + def start() { + if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDir) { + throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.") + } + + // Create meta file writer + metaFilePath = logPath + "-meta" + codec + val metaWriterAndStream = getFileWriter(metaFilePath, fileSystem, shouldOverwrite, + outputBufferSize, hadoopConf, compressionCodec, true) + metaWriter = metaWriterAndStream._1 + metaHadoopDataStream = metaWriterAndStream._2 + + // Create the first group file writer and update the last group info + updateLastGroupInfo() + } + + def getLogPath(logBaseDir: URI, appId: String, appAttemptId: Option[String]): String = { + val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId) + if (appAttemptId.isDefined) { + base + "_" + sanitize(appAttemptId.get) + } else { + base + } + } + + /** Create a new group file and update the last groupWriter anf lastDataStream. */ + private def updateLastGroupInfo(): Unit = { + groupNum += 1 + val groupPath = logPath + "-part" + groupNum + codec + val writerAndStream = getFileWriter(groupPath, fileSystem, shouldOverwrite, outputBufferSize, + hadoopConf, compressionCodec, false) + lastGroupWriter = writerAndStream._1 + lastGroupInfo = new EventLogGroupInfo + lastGroupInfo.hadoopDataStream = writerAndStream._2 + lastGroupInfo.filePath = groupPath + groupWriters.put(writerAndStream._1, lastGroupInfo) + } + + /** Log event as JSON. */ + private def logEvent( + writer: Option[PrintWriter], + hadoopDataStream: Option[FSDataOutputStream], + json: JValue, + flushLogger: Boolean = false) { + writer.foreach(_.println(compact(render(json)))) + if (flushLogger) { + writer.foreach(_.flush()) + hadoopDataStream.foreach(hadoopFlushMethod.invoke(_)) + } + } + + /** Log meta event as JSON to meta file. */ + private def logMetaEvent(json: JValue, flushLogger: Boolean = false): Unit = { + metaWriter.println(compact(render(json))) + if (flushLogger) { + metaWriter.flush() + metaHadoopDataStream.foreach(hadoopFlushMethod.invoke(_)) + } + } + + /** Log job event as JSON to part file. */ + private def logJobEvent(stageId: Int, json: JValue, isFlush: Boolean = false): Unit = { + var rightWriter: Option[PrintWriter] = None + var hadoopDstream: Option[FSDataOutputStream] = None + for (writer <- groupWriters) { + for (stages <- writer._2.jobIdToStages.values) { + if (stages.contains(stageId)) { + rightWriter = Some(writer._1) + hadoopDstream = writer._2.hadoopDataStream + } + } + } + + logEvent(rightWriter, hadoopDstream, json, isFlush) + } + + // Events that do not trigger a flush + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + val jsonAndStage = JsonGroupProtocol.stageSubmittedToJson(event) + logJobEvent(jsonAndStage._2, jsonAndStage._1) + } + + override def onTaskStart(event: SparkListenerTaskStart): Unit = { + val jsonAndStage = JsonGroupProtocol.taskStartToJson(event) + logJobEvent(jsonAndStage._2, jsonAndStage._1) + } + + override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { + val jsonAndStage = JsonGroupProtocol.taskGettingResultToJson(event) + logJobEvent(jsonAndStage._2, jsonAndStage._1) + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { + val jsonAndStage = JsonGroupProtocol.taskEndToJson(event) + logJobEvent(jsonAndStage._2, jsonAndStage._1) + } + + // Events that trigger a flush + override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + val jsonAndStage = JsonGroupProtocol.stageCompletedToJson(event) + logJobEvent(jsonAndStage._2, jsonAndStage._1, true) + } + + override def onJobStart(event: SparkListenerJobStart): Unit = { + val jsonJobStage = JsonGroupProtocol.jobStartToJson(event) + if (lastGroupInfo.jobNum >= groupSize) { + updateLastGroupInfo() + } + logEvent( + Some(lastGroupWriter), lastGroupInfo.hadoopDataStream, jsonJobStage._1, flushLogger = true) + lastGroupInfo.jobNum += 1 + lastGroupInfo.jobIdToStages.put(jsonJobStage._2, jsonJobStage._3) + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + val jsonAndJobId = JsonGroupProtocol.jobEndToJson(event) + var jobWriter: Option[PrintWriter] = None + var groupInfo: EventLogGroupInfo = null + for (writer <- groupWriters) { + if (writer._2.jobIdToStages.contains(jsonAndJobId._2)) { + jobWriter = Some(writer._1) + groupInfo = writer._2 + } + } + + logEvent(jobWriter, groupInfo.hadoopDataStream, jsonAndJobId._1, flushLogger = true) + groupInfo.completedJobNum += 1 + if (groupInfo.jobNum == groupSize && groupInfo.completedJobNum == groupSize) { + jobWriter.foreach(_.close()) + groupWriters.remove(jobWriter.get) + renameFile(fileSystem, shouldOverwrite, groupInfo.filePath) + } + } + + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { + logMetaEvent(JsonProtocol.environmentUpdateToJson(event)) + } + + override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { + logMetaEvent(JsonProtocol.blockManagerAddedToJson(event), flushLogger = true) + } + + override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { + logMetaEvent(JsonProtocol.blockManagerRemovedToJson(event), flushLogger = true) + } + + override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { + logMetaEvent(JsonProtocol.unpersistRDDToJson(event), flushLogger = true) + } + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { + logMetaEvent(JsonProtocol.applicationStartToJson(event), flushLogger = true) + } + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { + logMetaEvent(JsonProtocol.applicationEndToJson(event), flushLogger = true) + } + + override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { + logMetaEvent(JsonProtocol.executorAddedToJson(event), flushLogger = true) + } + + override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { + logMetaEvent(JsonProtocol.executorRemovedToJson(event), flushLogger = true) + } + + // No-op because logging every update would be overkill + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + + /** + * Stop logging events. The log file will be renamed so that it loses the + * ".inprogress" suffix. + */ + def stop(): Unit = { + metaWriter.close() + renameFile(fileSystem, shouldOverwrite, metaFilePath) + for (groupWriter <- groupWriters) { + groupWriter._1.close() + renameFile(fileSystem, shouldOverwrite, groupWriter._2.filePath) + } + } + + override def getFilePath(): String = { + logPath + } + + override def getLoggedEvent: ArrayBuffer[JValue] = { + null + } +} + diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 000a021a528cf..74c8dac4c5c79 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -20,17 +20,17 @@ package org.apache.spark.scheduler import java.io._ import java.net.URI +import org.apache.hadoop.fs.{FileSystem, Path, FSDataOutputStream} +import org.apache.spark.scheduler.EventLoggingWriterListener._ +import org.json4s.JsonAST.JValue + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.google.common.base.Charsets import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} -import org.apache.hadoop.fs.permission.FsPermission -import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ -import org.apache.spark.{Logging, SparkConf, SPARK_VERSION} +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} @@ -51,9 +51,9 @@ private[spark] class EventLoggingListener( logBaseDir: URI, sparkConf: SparkConf, hadoopConf: Configuration) - extends SparkListener with Logging { + extends SparkListener with EventLoggingWriterListener with Logging { - import EventLoggingListener._ + import EventLoggingWriterListener._ def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) = this(appId, appAttemptId, logBaseDir, sparkConf, @@ -80,10 +80,7 @@ private[spark] class EventLoggingListener( // The Hadoop APIs have changed over time, so we use reflection to figure out // the correct method to use to flush a hadoop data stream. See SPARK-1518 // for details. - private val hadoopFlushMethod = { - val cls = classOf[FSDataOutputStream] - scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync")) - } + private val hadoopFlushMethod = getHadoopFlushMethod private var writer: Option[PrintWriter] = None @@ -101,42 +98,11 @@ private[spark] class EventLoggingListener( throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.") } - val workingPath = logPath + IN_PROGRESS - val uri = new URI(workingPath) - val path = new Path(workingPath) - val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme - val isDefaultLocal = defaultFs == null || defaultFs == "file" - - if (shouldOverwrite && fileSystem.exists(path)) { - logWarning(s"Event log $path already exists. Overwriting...") - if (!fileSystem.delete(path, true)) { - logWarning(s"Error deleting $path") - } - } + val writerAndStream = getFileWriter(logPath, fileSystem, shouldOverwrite, outputBufferSize, - /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). - * Therefore, for local files, use FileOutputStream instead. */ - val dstream = - if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { - new FileOutputStream(uri.getPath) - } else { - hadoopDataStream = Some(fileSystem.create(path)) - hadoopDataStream.get - } - - try { - val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) - val bstream = new BufferedOutputStream(cstream, outputBufferSize) - - EventLoggingListener.initEventLog(bstream) - fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) - writer = Some(new PrintWriter(bstream)) - logInfo("Logging events to %s".format(logPath)) - } catch { - case e: Exception => - dstream.close() - throw e - } + hadoopConf, compressionCodec, true) + hadoopDataStream = writerAndStream._2 + writer = Some(writerAndStream._1) } /** Log the event as JSON. */ @@ -214,112 +180,15 @@ private[spark] class EventLoggingListener( def stop(): Unit = { writer.foreach(_.close()) - val target = new Path(logPath) - if (fileSystem.exists(target)) { - if (shouldOverwrite) { - logWarning(s"Event log $target already exists. Overwriting...") - if (!fileSystem.delete(target, true)) { - logWarning(s"Error deleting $target") - } - } else { - throw new IOException("Target log file already exists (%s)".format(logPath)) - } - } - fileSystem.rename(new Path(logPath + IN_PROGRESS), target) - } - -} - -private[spark] object EventLoggingListener extends Logging { - // Suffix applied to the names of files still being written by applications. - val IN_PROGRESS = ".inprogress" - val DEFAULT_LOG_DIR = "/tmp/spark-events" - val SPARK_VERSION_KEY = "SPARK_VERSION" - val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC" - - private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) - - // A cache for compression codecs to avoid creating the same codec many times - private val codecMap = new mutable.HashMap[String, CompressionCodec] - - /** - * Write metadata about an event log to the given stream. - * The metadata is encoded in the first line of the event log as JSON. - * - * @param logStream Raw output stream to the event log file. - */ - def initEventLog(logStream: OutputStream): Unit = { - val metadata = SparkListenerLogStart(SPARK_VERSION) - val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n" - logStream.write(metadataJson.getBytes(Charsets.UTF_8)) - } - - /** - * Return a file-system-safe path to the log file for the given application. - * - * Note that because we currently only create a single log file for each application, - * we must encode all the information needed to parse this event log in the file name - * instead of within the file itself. Otherwise, if the file is compressed, for instance, - * we won't know which codec to use to decompress the metadata needed to open the file in - * the first place. - * - * The log file name will identify the compression codec used for the contents, if any. - * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log. - * - * @param logBaseDir Directory where the log file will be written. - * @param appId A unique app ID. - * @param appAttemptId A unique attempt id of appId. May be the empty string. - * @param compressionCodecName Name to identify the codec used to compress the contents - * of the log, or None if compression is not enabled. - * @return A path which consists of file-system-safe characters. - */ - def getLogPath( - logBaseDir: URI, - appId: String, - appAttemptId: Option[String], - compressionCodecName: Option[String] = None): String = { - val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId) - val codec = compressionCodecName.map("." + _).getOrElse("") - if (appAttemptId.isDefined) { - base + "_" + sanitize(appAttemptId.get) + codec - } else { - base + codec - } + renameFile(fileSystem, shouldOverwrite, logPath) } - private def sanitize(str: String): String = { - str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase + override def getFilePath(): String = { + logPath } - /** - * Opens an event log file and returns an input stream that contains the event data. - * - * @return input stream that holds one JSON record per line. - */ - def openEventLog(log: Path, fs: FileSystem): InputStream = { - // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain - // IOException when a file does not exist, so try our best to throw a proper exception. - if (!fs.exists(log)) { - throw new FileNotFoundException(s"File $log does not exist.") - } - - val in = new BufferedInputStream(fs.open(log)) - - // Compression codec is encoded as an extension, e.g. app_123.lzf - // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) - val codecName: Option[String] = logName.split("\\.").tail.lastOption - val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) - } - - try { - codec.map(_.compressedInputStream(in)).getOrElse(in) - } catch { - case e: Exception => - in.close() - throw e - } + override def getLoggedEvent: ArrayBuffer[JValue]= { + loggedEvents } - } + diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingWriterListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingWriterListener.scala new file mode 100644 index 0000000000000..fb47fa839ba10 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingWriterListener.scala @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io._ +import java.lang.reflect.Method +import java.net.URI + +import com.google.common.base.Charsets +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.spark._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.JsonProtocol +import org.json4s.JsonAST.JValue +import org.json4s.jackson.JsonMethods._ + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +private[spark] trait EventLoggingWriterListener { + def start() + + def stop () + + def getFilePath(): String + + def getLoggedEvent: ArrayBuffer[JValue] +} + +private[spark] object EventLoggingWriterListener extends Logging { + // Suffix applied to the names of files still being written by applications. + val IN_PROGRESS = ".inprogress" + val DEFAULT_LOG_DIR = "/tmp/spark-events" + val SPARK_VERSION_KEY = "SPARK_VERSION" + val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC" + + val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + // A cache for compression codecs to avoid creating the same codec many times + private val codecMap = new mutable.HashMap[String, CompressionCodec] + + + def getHadoopFlushMethod: Method = { + val cls = classOf[FSDataOutputStream] + scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync")) + } + + /** + * Write metadata about an event log to the given stream. + * The metadata is encoded in the first line of the event log as JSON. + * + * @param logStream Raw output stream to the event log file. + */ + def initEventLog(logStream: OutputStream): Unit = { + val metadata = SparkListenerLogStart(SPARK_VERSION) + val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n" + logStream.write(metadataJson.getBytes(Charsets.UTF_8)) + } + + def sanitize(str: String): String = { + str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase + } + + /** + * Return a file-system-safe path to the log file for the given application. + * + * Note that because we currently only create a single log file for each application, + * we must encode all the information needed to parse this event log in the file name + * instead of within the file itself. Otherwise, if the file is compressed, for instance, + * we won't know which codec to use to decompress the metadata needed to open the file in + * the first place. + * + * The log file name will identify the compression codec used for the contents, if any. + * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log. + * + * @param logBaseDir Directory where the log file will be written. + * @param appId A unique app ID. + * @param appAttemptId A unique attempt id of appId. May be the empty string. + * @param compressionCodecName Name to identify the codec used to compress the contents + * of the log, or None if compression is not enabled. + * @return A path which consists of file-system-safe characters. + */ + def getLogPath( + logBaseDir: URI, + appId: String, + appAttemptId: Option[String], + compressionCodecName: Option[String] = None): String = { + val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId) + val codec = compressionCodecName.map("." + _).getOrElse("") + if (appAttemptId.isDefined) { + base + "_" + sanitize(appAttemptId.get) + codec + } else { + base + codec + } + } + + /** + * Create the log file and return file writer and DataStream. + * @param logPath + * @param fileSystem + * @param shouldOverwrite + * @param outputBufferSize + * @param hadoopConf + * @param compressionCodec + * @param isMetaFile + * @return + */ + def getFileWriter( + logPath: String, + fileSystem: FileSystem, + shouldOverwrite: Boolean, + outputBufferSize: Int, + hadoopConf: Configuration, + compressionCodec: Option[CompressionCodec], + isMetaFile: Boolean): (PrintWriter, Option[FSDataOutputStream]) = { + var hadoopDataStream: Option[FSDataOutputStream] = None + val workingPath = logPath + IN_PROGRESS + val uri = new URI(workingPath) + val path = new Path(workingPath) + val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme + val isDefaultLocal = defaultFs == null || defaultFs == "file" + + if (shouldOverwrite && fileSystem.exists(path)) { + logWarning(s"Event log $path already exists. Overwriting...") + if (!fileSystem.delete(path, true)) { + logWarning(s"Error deleting $path") + } + } + + /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ + val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { + new FileOutputStream(uri.getPath) + } else { + hadoopDataStream = Some(fileSystem.create(path)) + hadoopDataStream.get + } + + try { + val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + + if (isMetaFile) { + initEventLog(bstream) + } + fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) + val writer = new PrintWriter(bstream) + logInfo("Logging events to %s".format(workingPath)) + + (writer, hadoopDataStream) + } catch { + case e: Exception => + dstream.close() + throw e + } + } + + /** + * Opens an event log file and returns an input stream that contains the event data. + * + * @return input stream that holds one JSON record per line. + */ + def openEventLog(log: Path, fs: FileSystem): InputStream = { + // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain + // IOException when a file does not exist, so try our best to throw a proper exception. + if (!fs.exists(log)) { + throw new FileNotFoundException(s"File $log does not exist.") + } + + val in = new BufferedInputStream(fs.open(log)) + + // Compression codec is encoded as an extension, e.g. app_123.lzf + // Since we sanitize the app ID to not include periods, it is safe to split on it + val logName = log.getName.stripSuffix(IN_PROGRESS) + val codecName: Option[String] = logName.split("\\.").tail.lastOption + val codec = codecName.map { c => + codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } + + try { + codec.map(_.compressedInputStream(in)).getOrElse(in) + } catch { + case e: Exception => + // When using snappy codec and the part file is empty, it will occur exception + val caseMess = e.getMessage + if (caseMess.contains("empty stream")) { + return null + } + in.close() + throw e + } + } + + def renameFile(fileSystem: FileSystem, shouldOverwrite: Boolean, logPath: String): Unit = { + val target = new Path(logPath) + if (fileSystem.exists(target)) { + if (shouldOverwrite) { + logWarning(s"Event log $target already exists. Overwriting...") + if (!fileSystem.delete(target, true)) { + logWarning(s"Error deleting $target") + } + } else { + throw new IOException("Target log file already exists (%s)".format(logPath)) + } + } + fileSystem.rename(new Path(logPath + IN_PROGRESS), target) + } + + def getMetaStream(log: Path, fs: FileSystem): InputStream = { + // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain + // IOException when a file does not exist, so try our best to throw a proper exception. + if (!fs.exists(log)) { + throw new FileNotFoundException(s"File $log does not exist.") + } + + val metaDir = log.toString.replaceAll("part\\d+", "meta") + val metaPath = + if (fs.exists(new Path(metaDir))) { + new Path(metaDir) + } else if (fs.exists(new Path(metaDir + IN_PROGRESS))) { + new Path(metaDir + IN_PROGRESS) + } else { + throw new FileNotFoundException(s"The meta file ${log.getName} does not exist.") + } + + val in = new BufferedInputStream(fs.open(metaPath)) + + // Compression codec is encoded as an extension, e.g. app_123.lzf + // Since we sanitize the app ID to not include periods, it is safe to split on it + val logName = log.getName.stripSuffix(IN_PROGRESS) + val codecName: Option[String] = logName.split("\\.").tail.lastOption + val codec = codecName.map { c => + codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } + + try { + codec.map(_.compressedInputStream(in)).getOrElse(in) + } catch { + case e: Exception => + in.close() + throw e + } + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index f113c2b1b8433..c5fb8db97ad59 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -34,7 +34,8 @@ class TaskInfo( val executorId: String, val host: String, val taskLocality: TaskLocality.TaskLocality, - val speculative: Boolean) { + val speculative: Boolean, + val stageId: Int = -1) { /** * The time when the task started remotely getting the result. Will not be set if the diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c02597c4365c9..0f7e0f63cb7e4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -452,7 +452,7 @@ private[spark] class TaskSetManager( copiesRunning(index) += 1 val attemptNum = taskAttempts(index).size val info = new TaskInfo(taskId, index, attemptNum, curTime, - execId, host, taskLocality, speculative) + execId, host, taskLocality, speculative, task.stageId) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling diff --git a/core/src/main/scala/org/apache/spark/util/JsonGroupProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonGroupProtocol.scala new file mode 100644 index 0000000000000..27ebdddd7f1da --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/JsonGroupProtocol.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.scheduler._ +import org.json4s.JsonAST._ +import org.json4s.JsonDSL._ + +private[spark] object JsonGroupProtocol { + def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): (JValue, Int) = { + val stageJson = stageInfoToJson(stageSubmitted.stageInfo) + val properties = JsonProtocol.propertiesToJson(stageSubmitted.properties) + val json = ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~ + ("Stage Info" -> stageJson._1) ~ + ("Properties" -> properties) + + (json, stageJson._2) + } + + def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): (JValue, Int) = { + val stageJson = stageInfoToJson(stageCompleted.stageInfo) + val json = ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~ + ("Stage Info" -> stageJson._1) + + (json, stageJson._2) + } + + def taskStartToJson(taskStart: SparkListenerTaskStart): (JValue, Int) = { + (JsonProtocol.taskStartToJson(taskStart), taskStart.stageId) + } + + def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): (JValue, Int) = { + (JsonProtocol.taskGettingResultToJson(taskGettingResult), taskGettingResult.taskInfo.stageId) + } + + def taskEndToJson(taskEnd: SparkListenerTaskEnd): (JValue, Int) = { + (JsonProtocol.taskEndToJson(taskEnd), taskEnd.stageId) + } + + def jobStartToJson(jobStart: SparkListenerJobStart): (JValue, Int, Set[Int]) = { + (JsonProtocol.jobStartToJson(jobStart), jobStart.jobId, jobStart.stageIds.toSet) + } + + def jobEndToJson(jobEnd: SparkListenerJobEnd): (JValue, Int) = { + (JsonProtocol.jobEndToJson(jobEnd), jobEnd.jobId) + } + + def stageInfoToJson(stageInfo: StageInfo): (JValue, Int) = { + (JsonProtocol.stageInfoToJson(stageInfo), stageInfo.stageId) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 73cff89544dc3..5b2b23e3fb9e3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -57,8 +57,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc appAttemptId: Option[String], inProgress: Boolean, codec: Option[String] = None): File = { - val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else "" - val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId) + val ip = if (inProgress) EventLoggingWriterListener.IN_PROGRESS else "" + val logUri = EventLoggingWriterListener.getLogPath(testDir.toURI, appId, appAttemptId) val logPath = new URI(logUri).getPath + ip new File(logPath) } @@ -204,14 +204,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc updateAndCheck(provider) { list => list.size should be (1) list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should - endWith(EventLoggingListener.IN_PROGRESS) + endWith(EventLoggingWriterListener.IN_PROGRESS) } logFile1.renameTo(newLogFile("app1", None, inProgress = false)) updateAndCheck(provider) { list => list.size should be (1) list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not - endWith(EventLoggingListener.IN_PROGRESS) + endWith(EventLoggingWriterListener.IN_PROGRESS) } } @@ -428,7 +428,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) val bstream = new BufferedOutputStream(cstream) if (isNewFormat) { - EventLoggingListener.initEventLog(new FileOutputStream(file)) + EventLoggingWriterListener.initEventLog(new FileOutputStream(file)) } val writer = new OutputStreamWriter(bstream, "UTF-8") Utils.tryWithSafeFinally { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 5cb2d4225d281..d048396e6c45c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -64,7 +64,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf) eventLogger.start() - val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS) + val logPath = new Path(eventLogger.logPath + EventLoggingWriterListener.IN_PROGRESS) assert(fileSystem.exists(logPath)) val logStatus = fileSystem.getFileStatus(logPath) assert(!logStatus.isDir) @@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } test("Log overwriting") { - val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None) + val logUri = EventLoggingWriterListener.getLogPath(testDir.toURI, "test", None) val logPath = new URI(logUri).getPath // Create file before writing the event log new FileOutputStream(new File(logPath)).close() @@ -107,18 +107,18 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit test("Event log name") { // without compression - assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath( + assert(s"file:/base-dir/app1" === EventLoggingWriterListener.getLogPath( Utils.resolveURI("/base-dir"), "app1", None)) // with compression assert(s"file:/base-dir/app1.lzf" === - EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf"))) + EventLoggingWriterListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf"))) // illegal characters in app ID assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" === - EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), + EventLoggingWriterListener.getLogPath(Utils.resolveURI("/base-dir"), "a fine:mind$dollar{bills}.1", None)) // illegal characters in app ID with compression assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" === - EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), + EventLoggingWriterListener.getLogPath(Utils.resolveURI("/base-dir"), "a fine:mind$dollar{bills}.1", None, Some("lz4"))) } @@ -155,7 +155,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit eventLogger.stop() // Verify file contains exactly the two events logged - val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + val logData = EventLoggingWriterListener.openEventLog(new Path(eventLogger.logPath), fileSystem) try { val lines = readLines(logData) val logStart = SparkListenerLogStart(SPARK_VERSION) @@ -183,9 +183,9 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit val sc = new SparkContext("local-cluster[2,2,1024]", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get - val eventLogPath = eventLogger.logPath + val eventLogPath = eventLogger.getFilePath() val expectedLogDir = testDir.toURI() - assert(eventLogPath === EventLoggingListener.getLogPath( + assert(eventLogPath === EventLoggingWriterListener.getLogPath( expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName))) // Begin listening for events that trigger asserts @@ -200,7 +200,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit eventExistenceListener.assertAllCallbacksInvoked() // Make sure expected events exist in the log file. - val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + val logData = EventLoggingWriterListener.openEventLog(new Path(eventLogger.getFilePath()), fileSystem) val logStart = SparkListenerLogStart(SPARK_VERSION) val lines = readLines(logData) val eventSet = mutable.Set( @@ -238,7 +238,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit * A listener that asserts certain events are logged by the given EventLoggingListener. * This is necessary because events are posted asynchronously in a different thread. */ - private class EventExistenceListener(eventLogger: EventLoggingListener) extends SparkListener { + private class EventExistenceListener(eventLogger: EventLoggingWriterListener) extends SparkListener { var jobStarted = false var jobEnded = false var appEnded = false diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 103fc19369c97..56882ead0e6a9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -118,7 +118,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(!eventLog.isDir) // Replay events - val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem) + val logData = EventLoggingWriterListener.openEventLog(eventLog.getPath(), fileSystem) val eventMonster = new EventMonster(conf) try { val replayer = new ReplayListenerBus() @@ -130,7 +130,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter { // Verify the same events are replayed in the same order assert(sc.eventLogger.isDefined) - val originalEvents = sc.eventLogger.get.loggedEvents + val originalEvents = sc.eventLogger.get.getLoggedEvent val replayedEvents = eventMonster.loggedEvents originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) } } From b8f2b3cddff62340107e2f1a8f8a7999cf9287b0 Mon Sep 17 00:00:00 2001 From: xutingjun Date: Fri, 23 Oct 2015 17:24:27 +0800 Subject: [PATCH 2/2] fix style --- .../apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- .../spark/scheduler/EventLoggingGroupListener.scala | 4 ++++ .../apache/spark/scheduler/EventLoggingListener.scala | 2 +- .../spark/scheduler/EventLoggingListenerSuite.scala | 9 ++++++--- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c0c225d847e70..8e1a01813decd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -391,7 +391,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) def shouldCleanMeta(fs: FileSystem, appId: String): (Boolean, Path) = { val metaFile = appId.replaceAll("part\\d+", "meta") - val metaPath = new Path(logDir, metaFile) + val metaPath = new Path(logDir, metaFile) val appPrefix = appId.split("-part")(0) if (fs.exists(metaPath) && applications.keySet.filter(_.contains(appPrefix)).size == 0) { (true, metaPath) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingGroupListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingGroupListener.scala index 83783517918f9..62b11cd2db34e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingGroupListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingGroupListener.scala @@ -135,7 +135,9 @@ private[spark] class EventLoggingGroupListener ( hadoopDataStream: Option[FSDataOutputStream], json: JValue, flushLogger: Boolean = false) { + // scalastyle:off println writer.foreach(_.println(compact(render(json)))) + // scalastyle:on println if (flushLogger) { writer.foreach(_.flush()) hadoopDataStream.foreach(hadoopFlushMethod.invoke(_)) @@ -144,7 +146,9 @@ private[spark] class EventLoggingGroupListener ( /** Log meta event as JSON to meta file. */ private def logMetaEvent(json: JValue, flushLogger: Boolean = false): Unit = { + // scalastyle:off println metaWriter.println(compact(render(json))) + // scalastyle:on println if (flushLogger) { metaWriter.flush() metaHadoopDataStream.foreach(hadoopFlushMethod.invoke(_)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 74c8dac4c5c79..ef2e2781834d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -187,7 +187,7 @@ private[spark] class EventLoggingListener( logPath } - override def getLoggedEvent: ArrayBuffer[JValue]= { + override def getLoggedEvent: ArrayBuffer[JValue] = { loggedEvents } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index d048396e6c45c..9bb3e9e8f31ea 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -111,7 +111,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit Utils.resolveURI("/base-dir"), "app1", None)) // with compression assert(s"file:/base-dir/app1.lzf" === - EventLoggingWriterListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf"))) + EventLoggingWriterListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, + Some("lzf"))) // illegal characters in app ID assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" === EventLoggingWriterListener.getLogPath(Utils.resolveURI("/base-dir"), @@ -200,7 +201,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit eventExistenceListener.assertAllCallbacksInvoked() // Make sure expected events exist in the log file. - val logData = EventLoggingWriterListener.openEventLog(new Path(eventLogger.getFilePath()), fileSystem) + val logData = EventLoggingWriterListener.openEventLog(new Path(eventLogger.getFilePath()), + fileSystem) val logStart = SparkListenerLogStart(SPARK_VERSION) val lines = readLines(logData) val eventSet = mutable.Set( @@ -238,7 +240,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit * A listener that asserts certain events are logged by the given EventLoggingListener. * This is necessary because events are posted asynchronously in a different thread. */ - private class EventExistenceListener(eventLogger: EventLoggingWriterListener) extends SparkListener { + private class EventExistenceListener(eventLogger: EventLoggingWriterListener) + extends SparkListener { var jobStarted = false var jobEnded = false var appEnded = false