Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit ac97db0

Browse files
author
Anton Kirillov
committed
[DCOS-51158] Improved Task ID assignment for Executor tasks
1 parent 43ec23f commit ac97db0

File tree

3 files changed

+8
-15
lines changed

3 files changed

+8
-15
lines changed

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -611,14 +611,12 @@ private[spark] class MesosClusterScheduler(
611611
partitionResources(remainingResources.asJava, "mem", desc.mem)
612612
offer.remainingResources = finalResources.asJava
613613

614-
val appName = desc.conf.get("spark.app.name")
615-
616614
val driverLabels = MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS)
617615
.getOrElse(""))
618616

619617
TaskInfo.newBuilder()
620618
.setTaskId(taskId)
621-
.setName(s"Driver for ${appName}")
619+
.setName(s"Driver for ${desc.name}")
622620
.setSlaveId(offer.offer.getSlaveId)
623621
.setCommand(buildDriverCommand(desc))
624622
.setContainer(getContainerInfo(desc))

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -182,19 +182,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
182182

183183
private val metricsSource = new MesosCoarseGrainedSchedulerSource(this)
184184

185-
private val schedulerUuid: String = UUID.randomUUID().toString
186-
187-
private var nextMesosTaskId = 0
188-
189185
@volatile var appId: String = _
190186

191187
private var schedulerDriver: SchedulerDriver = _
192188

193-
def newMesosTaskId(): String = {
194-
val id = nextMesosTaskId
195-
nextMesosTaskId += 1
196-
id.toString
197-
}
189+
private val schedulerUuid: String = UUID.randomUUID().toString
190+
private val nextExecutorNumber = new AtomicLong()
198191

199192
override def start() {
200193
super.start()
@@ -527,7 +520,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
527520
if (canLaunchTask(slaveId, offer.getHostname, resources)) {
528521
// Create a task
529522
launchTasks = true
530-
val taskId = newMesosTaskId()
523+
val taskSeqNumber = nextExecutorNumber.getAndIncrement()
524+
val taskId = s"${schedulerUuid}-$taskSeqNumber"
531525
val offerCPUs = getResource(resources, "cpus").toInt
532526
val offerGPUs = getResource(resources, "gpus").toInt
533527
var taskGPUs = executorGpus(offerGPUs)
@@ -540,10 +534,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
540534
partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)
541535

542536
val taskBuilder = MesosTaskInfo.newBuilder()
543-
.setTaskId(TaskID.newBuilder().setValue( s"$schedulerUuid-$taskId").build())
537+
.setTaskId(TaskID.newBuilder().setValue(taskId).build())
544538
.setSlaveId(offer.getSlaveId)
545539
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
546-
.setName(s"${sc.appName} $taskId")
540+
.setName(s"${sc.appName} $taskSeqNumber")
547541
.setLabels(MesosProtoUtils.mesosLabels(taskLabels))
548542
.addAllResources(resourcesToUse.asJava)
549543
.setContainer(getContainerInfo(sc.conf))

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,7 @@ trait MesosSchedulerUtils extends Logging {
557557
* the same frameworkID. To enforce that only the first driver registers with the configured
558558
* framework ID, the driver calls this method after the first registration.
559559
*/
560+
@deprecated("Multiple Spark Contexts and fine-grained scheduler are deprecated")
560561
def unsetFrameworkID(sc: SparkContext) {
561562
sc.conf.remove("spark.mesos.driver.frameworkId")
562563
System.clearProperty("spark.mesos.driver.frameworkId")

0 commit comments

Comments
 (0)