Skip to content

Commit c5f1cc3

Browse files
committed
[SPARK-20131][CORE] Don't use this lock in StandaloneSchedulerBackend.stop
## What changes were proposed in this pull request? `o.a.s.streaming.StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` is flaky is because there is a potential dead-lock in StandaloneSchedulerBackend which causes `await` timeout. Here is the related stack trace: ``` "Thread-31" apache#211 daemon prio=5 os_prio=31 tid=0x00007fedd4808000 nid=0x16403 waiting on condition [0x00007000239b7000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000079b49ca10> (a scala.concurrent.impl.Promise$CompletionLatch) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:402) at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.org$apache$spark$scheduler$cluster$StandaloneSchedulerBackend$$stop(StandaloneSchedulerBackend.scala:213) - locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend) at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.stop(StandaloneSchedulerBackend.scala:116) - locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend) at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:517) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1657) at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1921) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1302) at org.apache.spark.SparkContext.stop(SparkContext.scala:1920) at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:708) at org.apache.spark.streaming.StreamingContextSuite$$anonfun$43$$anonfun$apply$mcV$sp$66$$anon$3.run(StreamingContextSuite.scala:827) "dispatcher-event-loop-3" #18 daemon prio=5 os_prio=31 tid=0x00007fedd603a000 nid=0x6203 waiting for monitor entry [0x0000700003be4000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:253) - waiting to lock <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:124) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` This PR removes `synchronized` and changes `stopping` to AtomicBoolean to ensure idempotent to fix the dead-lock. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#17610 from zsxwing/SPARK-20131.
1 parent a7b430b commit c5f1cc3

File tree

1 file changed

+17
-16
lines changed

1 file changed

+17
-16
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler.cluster
1919

2020
import java.util.concurrent.Semaphore
21+
import java.util.concurrent.atomic.AtomicBoolean
2122

2223
import scala.concurrent.Future
2324

@@ -42,7 +43,7 @@ private[spark] class StandaloneSchedulerBackend(
4243
with Logging {
4344

4445
private var client: StandaloneAppClient = null
45-
private var stopping = false
46+
private val stopping = new AtomicBoolean(false)
4647
private val launcherBackend = new LauncherBackend() {
4748
override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
4849
}
@@ -112,7 +113,7 @@ private[spark] class StandaloneSchedulerBackend(
112113
launcherBackend.setState(SparkAppHandle.State.RUNNING)
113114
}
114115

115-
override def stop(): Unit = synchronized {
116+
override def stop(): Unit = {
116117
stop(SparkAppHandle.State.FINISHED)
117118
}
118119

@@ -125,14 +126,14 @@ private[spark] class StandaloneSchedulerBackend(
125126

126127
override def disconnected() {
127128
notifyContext()
128-
if (!stopping) {
129+
if (!stopping.get) {
129130
logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
130131
}
131132
}
132133

133134
override def dead(reason: String) {
134135
notifyContext()
135-
if (!stopping) {
136+
if (!stopping.get) {
136137
launcherBackend.setState(SparkAppHandle.State.KILLED)
137138
logError("Application has been killed. Reason: " + reason)
138139
try {
@@ -206,20 +207,20 @@ private[spark] class StandaloneSchedulerBackend(
206207
registrationBarrier.release()
207208
}
208209

209-
private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
210-
try {
211-
stopping = true
212-
213-
super.stop()
214-
client.stop()
210+
private def stop(finalState: SparkAppHandle.State): Unit = {
211+
if (stopping.compareAndSet(false, true)) {
212+
try {
213+
super.stop()
214+
client.stop()
215215

216-
val callback = shutdownCallback
217-
if (callback != null) {
218-
callback(this)
216+
val callback = shutdownCallback
217+
if (callback != null) {
218+
callback(this)
219+
}
220+
} finally {
221+
launcherBackend.setState(finalState)
222+
launcherBackend.close()
219223
}
220-
} finally {
221-
launcherBackend.setState(finalState)
222-
launcherBackend.close()
223224
}
224225
}
225226

0 commit comments

Comments
 (0)