Skip to content

Commit dba35b2

Browse files
committed
Add a test that onReceive swallows InterruptException
1 parent 460f7b3 commit dba35b2

File tree

1 file changed

+31
-1
lines changed

1 file changed

+31
-1
lines changed

core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717

1818
package org.apache.spark.util
1919

20+
import java.util.concurrent.CountDownLatch
21+
2022
import scala.collection.mutable
2123
import scala.concurrent.duration._
2224
import scala.language.postfixOps
2325

2426
import org.scalatest.concurrent.Eventually._
27+
import org.scalatest.concurrent.Timeouts
2528
import org.scalatest.FunSuite
2629

27-
class EventLoopSuite extends FunSuite {
30+
class EventLoopSuite extends FunSuite with Timeouts {
2831

2932
test("EventLoop") {
3033
val buffer = new mutable.ArrayBuffer[Int] with mutable.SynchronizedBuffer[Int]
@@ -155,4 +158,31 @@ class EventLoopSuite extends FunSuite {
155158
}
156159
eventLoop.stop()
157160
}
161+
162+
test("EventLoop: onReceive swallows InterruptException") {
163+
val onReceiveLatch = new CountDownLatch(1)
164+
val eventLoop = new EventLoop[Int]("test") {
165+
166+
override def onReceive(event: Int): Unit = {
167+
onReceiveLatch.countDown()
168+
try {
169+
Thread.sleep(5000)
170+
} catch {
171+
case ie: InterruptedException => // swallow
172+
}
173+
}
174+
175+
override def onError(e: Throwable): Unit = {
176+
}
177+
178+
}
179+
eventLoop.start()
180+
eventLoop.post(1)
181+
failAfter(5 seconds) {
182+
// Wait until we enter `onReceive`
183+
onReceiveLatch.await()
184+
eventLoop.stop()
185+
}
186+
assert(false === eventLoop.isActive)
187+
}
158188
}

0 commit comments

Comments
 (0)