Skip to content

Commit 2eaeafe

Browse files
drcrallenAndrew Or
authored andcommitted
[SPARK-12330][MESOS] Fix mesos coarse mode cleanup
In the current implementation the mesos coarse scheduler does not wait for the mesos tasks to complete before ending the driver. This causes a race where the task has to finish cleaning up before the mesos driver terminates it with a SIGINT (and SIGKILL after 3 seconds if the SIGINT doesn't work). This PR causes the mesos coarse scheduler to wait for the mesos tasks to finish (with a timeout defined by `spark.mesos.coarse.shutdown.ms`) This PR also fixes a regression caused by [SPARK-10987] whereby submitting a shutdown causes a race between the local shutdown procedure and the notification of the scheduler driver disconnection. If the scheduler driver disconnection wins the race, the coarse executor incorrectly exits with status 1 (instead of the proper status 0) With this patch the mesos coarse scheduler terminates properly, the executors clean up, and the tasks are reported as `FINISHED` in the Mesos console (as opposed to `KILLED` in < 1.6 or `FAILED` in 1.6 and later) Author: Charles Allen <[email protected]> Closes #10319 from drcrallen/SPARK-12330.
1 parent dee801a commit 2eaeafe

File tree

2 files changed

+45
-2
lines changed

2 files changed

+45
-2
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.executor
1919

2020
import java.net.URL
2121
import java.nio.ByteBuffer
22+
import java.util.concurrent.atomic.AtomicBoolean
2223

2324
import scala.collection.mutable
2425
import scala.util.{Failure, Success}
@@ -42,6 +43,7 @@ private[spark] class CoarseGrainedExecutorBackend(
4243
env: SparkEnv)
4344
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
4445

46+
private[this] val stopping = new AtomicBoolean(false)
4547
var executor: Executor = null
4648
@volatile var driver: Option[RpcEndpointRef] = None
4749

@@ -102,19 +104,23 @@ private[spark] class CoarseGrainedExecutorBackend(
102104
}
103105

104106
case StopExecutor =>
107+
stopping.set(true)
105108
logInfo("Driver commanded a shutdown")
106109
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
107110
// a message to self to actually do the shutdown.
108111
self.send(Shutdown)
109112

110113
case Shutdown =>
114+
stopping.set(true)
111115
executor.stop()
112116
stop()
113117
rpcEnv.shutdown()
114118
}
115119

116120
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
117-
if (driver.exists(_.address == remoteAddress)) {
121+
if (stopping.get()) {
122+
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
123+
} else if (driver.exists(_.address == remoteAddress)) {
118124
logError(s"Driver $remoteAddress disassociated! Shutting down.")
119125
System.exit(1)
120126
} else {

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ package org.apache.spark.scheduler.cluster.mesos
1919

2020
import java.io.File
2121
import java.util.{Collections, List => JList}
22+
import java.util.concurrent.TimeUnit
2223
import java.util.concurrent.locks.ReentrantLock
2324

2425
import scala.collection.JavaConverters._
2526
import scala.collection.mutable.{HashMap, HashSet}
2627

28+
import com.google.common.base.Stopwatch
2729
import com.google.common.collect.HashBiMap
2830
import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
2931
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
@@ -60,6 +62,12 @@ private[spark] class CoarseMesosSchedulerBackend(
6062
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
6163
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
6264

65+
private[this] val shutdownTimeoutMS = conf.getTimeAsMs("spark.mesos.coarse.shutdown.ms", "10s")
66+
.ensuring(_ >= 0, "spark.mesos.coarse.shutdown.ms must be >= 0")
67+
68+
// Synchronization protected by stateLock
69+
private[this] var stopCalled: Boolean = false
70+
6371
// If shuffle service is enabled, the Spark driver will register with the shuffle service.
6472
// This is for cleaning up shuffle files reliably.
6573
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -245,6 +253,13 @@ private[spark] class CoarseMesosSchedulerBackend(
245253
*/
246254
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
247255
stateLock.synchronized {
256+
if (stopCalled) {
257+
logDebug("Ignoring offers during shutdown")
258+
// Driver should simply return a stopped status on race
259+
// condition between this.stop() and completing here
260+
offers.asScala.map(_.getId).foreach(d.declineOffer)
261+
return
262+
}
248263
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
249264
for (offer <- offers.asScala) {
250265
val offerAttributes = toAttributeMap(offer.getAttributesList)
@@ -364,7 +379,29 @@ private[spark] class CoarseMesosSchedulerBackend(
364379
}
365380

366381
override def stop() {
367-
super.stop()
382+
// Make sure we're not launching tasks during shutdown
383+
stateLock.synchronized {
384+
if (stopCalled) {
385+
logWarning("Stop called multiple times, ignoring")
386+
return
387+
}
388+
stopCalled = true
389+
super.stop()
390+
}
391+
// Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them.
392+
// See SPARK-12330
393+
val stopwatch = new Stopwatch()
394+
stopwatch.start()
395+
// slaveIdsWithExecutors has no memory barrier, so this is eventually consistent
396+
while (slaveIdsWithExecutors.nonEmpty &&
397+
stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) {
398+
Thread.sleep(100)
399+
}
400+
if (slaveIdsWithExecutors.nonEmpty) {
401+
logWarning(s"Timed out waiting for ${slaveIdsWithExecutors.size} remaining executors "
402+
+ s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files "
403+
+ "on the mesos nodes.")
404+
}
368405
if (mesosDriver != null) {
369406
mesosDriver.stop()
370407
}

0 commit comments

Comments
 (0)