Skip to content

Commit 9fcd201

Browse files
author
Marcelo Vanzin
committed
[SPARK-11655] [core] Fix deadlock in handling of launcher stop().
The stop() callback was trying to close the launcher connection in the same thread that handles connection data, which ended up causing a deadlock. So avoid that by dispatching the stop() request in its own thread. On top of that, add some exception safety to a few parts of the code, and use "destroyForcibly" from Java 8 if it's available, to force kill the child process. The flip side is that "kill()" may not actually work if running Java 7.
1 parent 6e101d2 commit 9fcd201

File tree

4 files changed

+39
-13
lines changed

4 files changed

+39
-13
lines changed

core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.net.{InetAddress, Socket}
2121

2222
import org.apache.spark.SPARK_VERSION
2323
import org.apache.spark.launcher.LauncherProtocol._
24-
import org.apache.spark.util.ThreadUtils
24+
import org.apache.spark.util.{ThreadUtils, Utils}
2525

2626
/**
2727
* A class that can be used to talk to a launcher server. Users should extend this class to
@@ -88,12 +88,20 @@ private[spark] abstract class LauncherBackend {
8888
*/
8989
protected def onDisconnected() : Unit = { }
9090

91+
private def fireStopRequest(): Unit = {
92+
val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
93+
override def run(): Unit = Utils.tryLogNonFatalError {
94+
onStopRequest()
95+
}
96+
})
97+
thread.start()
98+
}
9199

92100
private class BackendConnection(s: Socket) extends LauncherConnection(s) {
93101

94102
override protected def handle(m: Message): Unit = m match {
95103
case _: Stop =>
96-
onStopRequest()
104+
fireStopRequest()
97105

98106
case _ =>
99107
throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -191,17 +191,19 @@ private[spark] class SparkDeploySchedulerBackend(
191191
}
192192

193193
private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
194-
stopping = true
194+
try {
195+
stopping = true
195196

196-
launcherBackend.setState(finalState)
197-
launcherBackend.close()
197+
super.stop()
198+
client.stop()
198199

199-
super.stop()
200-
client.stop()
201-
202-
val callback = shutdownCallback
203-
if (callback != null) {
204-
callback(this)
200+
val callback = shutdownCallback
201+
if (callback != null) {
202+
callback(this)
203+
}
204+
} finally {
205+
launcherBackend.setState(finalState)
206+
launcherBackend.close()
205207
}
206208
}
207209

launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.launcher;
1919

2020
import java.io.IOException;
21+
import java.lang.reflect.Method;
2122
import java.util.ArrayList;
2223
import java.util.List;
2324
import java.util.concurrent.ThreadFactory;
@@ -102,8 +103,20 @@ public synchronized void kill() {
102103
disconnect();
103104
}
104105
if (childProc != null) {
105-
childProc.destroy();
106-
childProc = null;
106+
try {
107+
childProc.exitValue();
108+
} catch (IllegalThreadStateException e) {
109+
// Child is still alive. Try to use Java 8's "destroyForcibly()" if available,
110+
// fall back to the old API if it's not there.
111+
try {
112+
Method destroy = childProc.getClass().getMethod("destroyForcibly");
113+
destroy.invoke(childProc);
114+
} catch (Exception inner) {
115+
childProc.destroy();
116+
}
117+
} finally {
118+
childProc = null;
119+
}
107120
}
108121
}
109122

launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ public boolean isFinal() {
8989
* Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send
9090
* a {@link #stop()} message to the application, so it's recommended that users first try to
9191
* stop the application cleanly and only resort to this method if that fails.
92+
* <p>
93+
* Note that if the application is running as a child process, this method fail to kill the
94+
* process when using Java 7. This may happen if, for example, the application is deadlocked.
9295
*/
9396
void kill();
9497

0 commit comments

Comments
 (0)