Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ private[spark] object CoarseGrainedClusterMessages {
hadoopDelegationCreds: Option[Array[Byte]])
extends CoarseGrainedClusterMessage

case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage

// Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
@GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var localityAwareTasks = 0

// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

Expand Down Expand Up @@ -189,9 +186,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{Collections, List => JList}
import java.util.{Collections, UUID, List => JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock

Expand Down Expand Up @@ -162,6 +162,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

private val metricsSource = new MesosCoarseGrainedSchedulerSource(this)

private val schedulerUuid: String = UUID.randomUUID().toString

private var nextMesosTaskId = 0

@volatile var appId: String = _
Expand Down Expand Up @@ -469,7 +471,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)

val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setTaskId(TaskID.newBuilder().setValue( s"$schedulerUuid-$taskId").build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName(s"${sc.appName} $taskId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.scheduler.cluster.mesos

import java.util.UUID
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.concurrent.duration._

import scala.util.Try
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
import org.mockito.Matchers
Expand All @@ -30,14 +31,13 @@ import org.mockito.Mockito._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter

import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
import org.apache.spark.scheduler.cluster.mesos.Utils._

class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
Expand Down Expand Up @@ -598,6 +598,21 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getName == "test-mesos-dynamic-alloc 0")
}

test("mesos sets different task ids across executions") {
setBackend()
var offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val firstLaunchTaskId = verifyTaskLaunched(driver, "o1").head.getTaskId.getValue
sc.stop()

setBackend()
offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val secondLaunchTaskId = verifyTaskLaunched(driver, "o1").head.getTaskId.getValue

assert(firstLaunchTaskId != secondLaunchTaskId)
}

test("mesos sets configurable labels on tasks") {
val taskLabelsString = "mesos:test,label:test"
setBackend(Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.yarn

import java.util.Collections
import java.util.{Collections, UUID}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
Expand All @@ -26,12 +26,10 @@ import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
Expand All @@ -40,7 +38,6 @@ import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}

/**
Expand Down Expand Up @@ -84,21 +81,9 @@ private[yarn] class YarnAllocator(

private val numExecutorsStarting = new AtomicInteger(0)

/**
* Used to generate a unique ID per executor
*
* Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
* the id of new executor will start from 1, this will conflict with the executor has
* already created before. So, we should initialize the `executorIdCounter` by getting
* the max executorId from driver.
*
* And this situation of executorId conflict is just in yarn client mode, so this is an issue
* in yarn client mode. For more details, can check in jira.
*
* @see SPARK-12864
*/
private var executorIdCounter: Int =
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
// Used to generate a unique ID per executor
private val allocatorUuid: String = UUID.randomUUID().toString
private var executorIdCounter: Int = 0

// Queue to store the timestamp of failed executors
private val failedExecutorsTimeStamps = new Queue[Long]()
Expand Down Expand Up @@ -495,7 +480,7 @@ private[yarn] class YarnAllocator(
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
val executorId = s"$allocatorUuid-$executorIdCounter"
assert(container.getResource.getMemory >= resource.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,6 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
}

case RetrieveLastAllocatedExecutorId =>
context.reply(currentExecutorIdCounter)
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down