Skip to content

Commit d6614ca

Browse files
authored
Merge pull request apache#78 from criteo-forks/SPARK-22850
[SPARK-22850][CORE] Ensure queued events are delivered to all event q…
2 parents ec5cce3 + 8f463ba commit d6614ca

File tree

2 files changed

+44
-7
lines changed

2 files changed

+44
-7
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.concurrent._
2222
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2323

2424
import scala.collection.JavaConverters._
25+
import scala.collection.mutable
2526
import scala.reflect.ClassTag
2627
import scala.util.DynamicVariable
2728

@@ -55,6 +56,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
5556

5657
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
5758

59+
// Visible for testing.
60+
@volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
61+
5862
/** Add a listener to queue shared by all non-internal listeners. */
5963
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
6064
addToQueue(listener, SHARED_QUEUE)
@@ -116,12 +120,37 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
116120

117121
/** Post an event to all queues. */
118122
def post(event: SparkListenerEvent): Unit = {
119-
if (!stopped.get()) {
120-
val it = queues.iterator()
121-
while (it.hasNext()) {
122-
it.next().post(event)
123+
if (stopped.get()) {
124+
return
125+
}
126+
127+
// If the event buffer is null, it means the bus has been started and we can avoid
128+
// synchronization and post events directly to the queues. This should be the most
129+
// common case during the life of the bus.
130+
if (queuedEvents == null) {
131+
postToQueues(event)
132+
return
133+
}
134+
135+
// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
136+
// calling start() picks up the new event.
137+
synchronized {
138+
if (!started.get()) {
139+
queuedEvents += event
140+
return
123141
}
124142
}
143+
144+
// If the bus was already started when the check above was made, just post directly to the
145+
// queues.
146+
postToQueues(event)
147+
}
148+
149+
private def postToQueues(event: SparkListenerEvent): Unit = {
150+
val it = queues.iterator()
151+
while (it.hasNext()) {
152+
it.next().post(event)
153+
}
125154
}
126155

127156
/**
@@ -138,7 +167,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
138167
}
139168

140169
this.sparkContext = sc
141-
queues.asScala.foreach(_.start(sc))
170+
queues.asScala.foreach { q =>
171+
q.start(sc)
172+
queuedEvents.foreach(q.post)
173+
}
174+
queuedEvents = null
142175
}
143176

144177
/**

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.scheduler
1919

2020
import java.util.concurrent.Semaphore
2121

22-
import scala.collection.mutable
2322
import scala.collection.JavaConverters._
23+
import scala.collection.mutable
2424

2525
import org.mockito.Mockito
2626
import org.scalatest.Matchers
@@ -58,20 +58,24 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
5858
sc = new SparkContext("local", "SparkListenerSuite", conf)
5959
val counter = new BasicJobCounter
6060
val bus = new LiveListenerBus(conf)
61-
bus.addToSharedQueue(counter)
6261

6362
// Post five events:
6463
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
6564

6665
// Five messages should be marked as received and queued, but no messages should be posted to
6766
// listeners yet because the the listener bus hasn't been started.
67+
assert(bus.queuedEvents.size === 5)
68+
bus.addToSharedQueue(counter)
6869
assert(counter.count === 0)
6970

7071
// Starting listener bus should flush all buffered events
7172
bus.start(mockSparkContext)
7273
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
7374
assert(counter.count === 5)
7475

76+
// After the bus is started, there should be no more queued events.
77+
assert(bus.queuedEvents === null)
78+
7579
// After listener bus has stopped, posting events should not increment counter
7680
bus.stop()
7781
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }

0 commit comments

Comments
 (0)