Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, List => JList}

import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}

import com.google.common.collect.HashBiMap
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}

import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -64,16 +63,31 @@ private[spark] class CoarseMesosSchedulerBackend(
// This is for cleaning up shuffle files reliably.
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)

private val maxExecutorsPerSlave = conf.getInt("spark.mesos.coarse.executors.max", 1)

private val maxCpusPerExecutor =
conf.getOption("spark.mesos.coarse.coresPerExecutor.max").map { m => m.toInt }

if (conf.getOption("spark.mesos.coarse.executors.max").isDefined && maxCpusPerExecutor.isEmpty) {
throw new IllegalArgumentException(
"Must configure spark.mesos.coarse.coresPerExecutor.max when " +
"spark.mesos.coarse.executors.max is set")
}

// Cores we have acquired with each Mesos task ID
val coresByTaskId = new HashMap[Int, Int]
val coresByTaskId = new HashMap[String, Int]
var totalCoresAcquired = 0

val slaveIdsWithExecutors = new HashSet[String]

// Maping from slave Id to hostname
private val slaveIdToHost = new HashMap[String, String]

val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
// Contains the list of slave ids that we have connect shuffle service to
private val existingSlaveShuffleConnections = new HashSet[String]

// Contains a mapping of slave ids to the number of executors launched.
val slaveIdsWithExecutors = new HashMap[String, Int]

val taskIdToSlaveId: HashMap[String, String] = new HashMap[String, String]
// How many times tasks on each slave failed
val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]

Expand All @@ -89,8 +103,6 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)

private val pendingRemovedSlaveIds = new HashSet[String]

// private lock object protecting mutable state above. Using the intrinsic lock
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock
Expand Down Expand Up @@ -118,10 +130,10 @@ private[spark] class CoarseMesosSchedulerBackend(

@volatile var appId: String = _

def newMesosTaskId(): Int = {
def newMesosTaskId(slaveId: String): String = {
val id = nextMesosTaskId
nextMesosTaskId += 1
id
slaveId + "/" + id
}

override def start() {
Expand All @@ -136,7 +148,7 @@ private[spark] class CoarseMesosSchedulerBackend(
startScheduler(driver)
}

def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
val executorSparkHome = conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome())
.getOrElse {
Expand Down Expand Up @@ -180,20 +192,20 @@ private[spark] class CoarseMesosSchedulerBackend(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
.format(prefixEnv, runScript) +
s" --driver-url $driverURL" +
s" --executor-id ${offer.getSlaveId.getValue}" +
s" --executor-id $taskId" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.get.split('/').last.split('.').head
val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString)

command.setValue(
s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
s" --driver-url $driverURL" +
s" --executor-id $executorId" +
s" --executor-id $taskId" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
Expand Down Expand Up @@ -240,38 +252,47 @@ private[spark] class CoarseMesosSchedulerBackend(
* unless we've already launched more than we wanted to.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
val memoryPerExecutor = calculateTotalMemory(sc)
stateLock.synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
for (offer <- offers.asScala) {
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
var remainingMem = mem
var remainingCores = cpus
val tasks = new util.ArrayList[MesosTaskInfo]()
val offerAttributes = toAttributeMap(offer.getAttributesList)
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.getValue
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
if (taskIdToSlaveId.size < executorLimit &&
var executorCount = slaveIdsWithExecutors.getOrElse(slaveId, 0)
while (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
meetsConstraints &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
remainingMem >= calculateTotalMemory(sc) &&
remainingCores >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
taskIdToSlaveId.put(taskId, slaveId)
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
executorCount < maxExecutorsPerSlave) {
val coresToUse =
math.min(maxCpusPerExecutor.getOrElse(Int.MaxValue),
math.min(remainingCores, maxCores - totalCoresAcquired))
totalCoresAcquired += coresToUse
remainingCores -= coresToUse
remainingMem -= memoryPerExecutor
val taskId = newMesosTaskId(slaveId)
taskIdToSlaveId(taskId) = slaveId
executorCount += 1
slaveIdsWithExecutors(slaveId) = executorCount
coresByTaskId(taskId) = coresToUse
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
partitionResources(offer.getResourcesList, "cpus", coresToUse)
val (_, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setCommand(createCommand(offer, coresToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
Expand All @@ -280,13 +301,14 @@ private[spark] class CoarseMesosSchedulerBackend(
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
}
tasks.add(taskBuilder.build())
}

if (!tasks.isEmpty) {
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(taskBuilder.build()), filters)
d.launchTasks(Collections.singleton(offer.getId()), tasks, filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
Expand All @@ -298,7 +320,7 @@ private[spark] class CoarseMesosSchedulerBackend(


override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
val taskId = status.getTaskId.getValue
val state = status.getState
logInfo(s"Mesos task $taskId is now $state")
val slaveId: String = status.getSlaveId.getValue
Expand All @@ -309,7 +331,8 @@ private[spark] class CoarseMesosSchedulerBackend(
// this through Mesos, since the shuffle services are set up independently.
if (TaskState.fromMesos(state).equals(TaskState.RUNNING) &&
slaveIdToHost.contains(slaveId) &&
shuffleServiceEnabled) {
shuffleServiceEnabled &&
!existingSlaveShuffleConnections.contains(slaveId)) {
assume(mesosExternalShuffleClient.isDefined,
"External shuffle client was not instantiated even though shuffle service is enabled.")
// TODO: Remove this and allow the MesosExternalShuffleService to detect
Expand All @@ -320,12 +343,10 @@ private[spark] class CoarseMesosSchedulerBackend(
s"host $hostname, port $externalShufflePort for app ${conf.getAppId}")
mesosExternalShuffleClient.get
.registerDriverWithShuffleService(hostname, externalShufflePort)
}
existingSlaveShuffleConnections += slaveId
} else if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)

if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId.get(taskId)
slaveIdsWithExecutors -= slaveId
taskIdToSlaveId.remove(taskId)
// Remove the cores we have remembered for this task, if it's in the hashmap
for (cores <- coresByTaskId.get(taskId)) {
totalCoresAcquired -= cores
Expand All @@ -339,7 +360,7 @@ private[spark] class CoarseMesosSchedulerBackend(
"is Spark installed on it?")
}
}
executorTerminated(d, slaveId, s"Executor finished with state $state")
executorTerminated(d, taskId, slaveId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
d.reviveOffers()
}
Expand All @@ -366,35 +387,39 @@ private[spark] class CoarseMesosSchedulerBackend(
* slave IDs that we might have asked to be killed. It also notifies the driver
* that an executor was removed.
*/
private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = {
private def executorTerminated(
d: SchedulerDriver,
executorId: String,
slaveId: String,
reason: String): Unit = {
stateLock.synchronized {
if (slaveIdsWithExecutors.contains(slaveId)) {
val slaveIdToTaskId = taskIdToSlaveId.inverse()
if (slaveIdToTaskId.containsKey(slaveId)) {
val taskId: Int = slaveIdToTaskId.get(slaveId)
taskIdToSlaveId.remove(taskId)
removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason))
if (slaveIdsWithExecutors.contains(slaveId) && taskIdToSlaveId.contains(executorId)) {
taskIdToSlaveId.remove(executorId)
removeExecutor(executorId, SlaveLost(reason))
val newCount = slaveIdsWithExecutors(slaveId) - 1
if (newCount == 0) {
slaveIdsWithExecutors.remove(slaveId)
} else {
slaveIdsWithExecutors(slaveId) = newCount
}
// TODO: This assumes one Spark executor per Mesos slave,
// which may no longer be true after SPARK-5095
pendingRemovedSlaveIds -= slaveId
slaveIdsWithExecutors -= slaveId
}
}
}

private def sparkExecutorId(slaveId: String, taskId: String): String = {
s"$slaveId/$taskId"
}

override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
logInfo(s"Mesos slave lost: ${slaveId.getValue}")
executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
// Terminate all executors in the slave
stateLock.synchronized {
val lostExecutors = taskIdToSlaveId.filter(_._2.equals(slaveId.getValue)).map(_._1)
lostExecutors.foreach { taskId =>
executorTerminated(d, taskId, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
}
}
}

override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
slaveLost(d, s)
logInfo("Executor lost: %s".format(e.getValue))
executorTerminated(d, e.getValue, s.getValue, "Mesos Executor lost: " + e.getValue)
}

override def applicationId(): String =
Expand All @@ -417,13 +442,9 @@ private[spark] class CoarseMesosSchedulerBackend(
return false
}

val slaveIdToTaskId = taskIdToSlaveId.inverse()
for (executorId <- executorIds) {
val slaveId = executorId.split("/")(0)
if (slaveIdToTaskId.containsKey(slaveId)) {
mesosDriver.killTask(
TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
pendingRemovedSlaveIds += slaveId
if (taskIdToSlaveId.contains(executorId)) {
mesosDriver.killTask(TaskID.newBuilder().setValue(executorId).build())
} else {
logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler")
}
Expand Down
Loading