Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager(
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
listenerBus.addToManagementQueue(listener)

val scheduleTask = new Runnable() {
override def run(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
this(sc, new SystemClock)
}

sc.addSparkListener(this)
sc.listenerBus.addToManagementQueue(this)

override val rpcEnv: RpcEnv = sc.env.rpcEnv

Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ class SparkContext(config: SparkConf) extends Logging {
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)
listenerBus.addToStatusQueue(jobProgressListener)

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
Expand All @@ -442,7 +442,7 @@ class SparkContext(config: SparkConf) extends Logging {

_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
Some(SparkUI.createLiveUI(this, _conf, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
Expand Down Expand Up @@ -522,7 +522,7 @@ class SparkContext(config: SparkConf) extends Logging {
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
Expand Down Expand Up @@ -1563,7 +1563,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
@DeveloperApi
def addSparkListener(listener: SparkListenerInterface) {
listenerBus.addListener(listener)
listenerBus.addToSharedQueue(listener)
}

/**
Expand Down Expand Up @@ -1879,8 +1879,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def stop(): Unit = {
if (LiveListenerBus.withinListenerThread.value) {
throw new SparkException(
s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}")
throw new SparkException(s"Cannot stop SparkContext within listener bus thread.")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
Expand Down Expand Up @@ -2378,7 +2377,7 @@ class SparkContext(config: SparkConf) extends Logging {
" parameter from breaking Spark's ability to find a valid constructor.")
}
}
listenerBus.addListener(listener)
listenerBus.addToSharedQueue(listener)
logInfo(s"Registered listener $className")
}
} catch {
Expand Down
196 changes: 196 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import com.codahale.metrics.{Gauge, Timer}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

/**
* An asynchronous queue for events. All events posted to this queue will be delivered to the child
* listeners in a separate thread.
*
* Delivery will only begin when the `start()` method is called. The `stop()` method should be
* called when no more events need to be delivered.
*/
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics)
extends SparkListenerBus
with Logging {

import AsyncEventQueue._

// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

// Keep the event count separately, so that waitUntilEmpty() can be implemented properly;
// this allows that method to return only when the events in the queue have been fully
// processed (instead of just dequeued).
private val eventCount = new AtomicLong()

/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)

/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L

private val logDroppedEvent = new AtomicBoolean(false)

private var sc: SparkContext = null

private val started = new AtomicBoolean(false)
private val stopped = new AtomicBoolean(false)

private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")

// Remove the queue size gauge first, in case it was created by a previous incarnation of
// this queue that was removed from the listener bus.
metrics.metricRegistry.remove(s"queue.$name.size")
metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
override def getValue: Int = eventQueue.size()
})

private val dispatchThread = new Thread(s"spark-listener-group-$name") {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
dispatch()
}
}

private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
try {
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
next = eventQueue.take()
}
eventCount.decrementAndGet()
} catch {
case ie: InterruptedException =>
logInfo(s"Stopping listener queue $name.", ie)
}
}

override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
}

/**
* Start an asynchronous thread to dispatch events to the underlying listeners.
*
* @param sc Used to stop the SparkContext in case the async dispatcher fails.
*/
private[scheduler] def start(sc: SparkContext): Unit = {
if (started.compareAndSet(false, true)) {
this.sc = sc
dispatchThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to be a no-op here, or a warning log instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems fine to me -- its only called in LiveListenerBus, where we guarantee this is true. seems better to fail-fast if its messed up

}
}

/**
* Stop the listener bus. It will wait until the queued events have been processed, but new
* events will be dropped.
*/
private[scheduler] def stop(): Unit = {
if (!started.get()) {
throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
}
if (stopped.compareAndSet(false, true)) {
eventQueue.put(POISON_PILL)
eventCount.incrementAndGet()
}
dispatchThread.join()
}

def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}

eventCount.incrementAndGet()
if (eventQueue.offer(event)) {
return
}

eventCount.decrementAndGet()
droppedEvents.inc()
droppedEventsCounter.incrementAndGet()
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError(s"Dropping event from queue $name. " +
"This likely means one of the listeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
logTrace(s"Dropping event $event")

val droppedCount = droppedEventsCounter.get
if (droppedCount > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped $droppedEvents events from $name since $previous.")
}
}
}
}

/**
* For testing only. Wait until there are no more events in the queue.
*
* @return true if the queue is empty.
*/
def waitUntilEmpty(deadline: Long): Boolean = {
while (eventCount.get() != 0) {
if (System.currentTimeMillis > deadline) {
return false
}
Thread.sleep(10)
}
true
}

}

private object AsyncEventQueue {

val POISON_PILL = new SparkListenerEvent() { }

}
Loading