Skip to content

Commit f580ce1

Browse files
committed
Log how many Spark events got dropped in LiveListenerBus
1 parent f89808b commit f580ce1

File tree

1 file changed

+24
-1
lines changed

1 file changed

+24
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.util.concurrent._
21-
import java.util.concurrent.atomic.AtomicBoolean
21+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2222

2323
import scala.util.DynamicVariable
2424

@@ -57,6 +57,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
5757
// Indicate if `stop()` is called
5858
private val stopped = new AtomicBoolean(false)
5959

60+
/** A counter for dropped events. It will be reset every time we log it. */
61+
private val droppedEventsCounter = new AtomicLong(0L)
62+
63+
/** When `droppedEventsCounter` was logged last time. */
64+
@volatile private var lastReportTimestamp = 0L
65+
6066
// Indicate if we are processing some event
6167
// Guarded by `self`
6268
private var processingEvent = false
@@ -123,6 +129,23 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
123129
eventLock.release()
124130
} else {
125131
onDropEvent(event)
132+
droppedEventsCounter.incrementAndGet()
133+
}
134+
// Don't log too frequently
135+
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
136+
var droppedEvents = droppedEventsCounter.get
137+
while (droppedEvents > 0) {
138+
// There may be multiple threads trying to decrease droppedEventsCounter.
139+
// Use "compareAndSet" to make sure only one thread can win.
140+
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail
141+
// and we will try again.
142+
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
143+
lastReportTimestamp = System.currentTimeMillis()
144+
logWarning(s"Dropped $droppedEvents SparkListenerEvents")
145+
return
146+
}
147+
droppedEvents = droppedEventsCounter.get
148+
}
126149
}
127150
}
128151

0 commit comments

Comments
 (0)