Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,6 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
def reset(): Unit = synchronized {
initializing = true
numExecutorsTarget = initialNumExecutors
numExecutorsToAdd = 1

executorsPendingToRemove.clear()
removeTimes.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[spark] object CoarseGrainedClusterMessages {
ioEncryptionKey: Option[Array[Byte]])
extends CoarseGrainedClusterMessage

case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
case object GetAMInitialState extends CoarseGrainedClusterMessage

// Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
@GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var localityAwareTasks = 0

// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

Expand Down Expand Up @@ -184,9 +181,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
setCurrentExecutorIdCounter(executorId.toInt)
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
Expand Down Expand Up @@ -637,6 +632,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
defaultAskTimeout.awaitResult(response)
}

// Set the num of current max ExecutorId used to re-register appMaster
protected def setCurrentExecutorIdCounter(executorId: Int): Unit = {}

/**
* Kill the given list of executors through the cluster manager.
* @return whether the kill request is acknowledged.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,14 +358,26 @@ private[spark] class ApplicationMaster(
dummyRunner.launchContextDebugInfo()
}

/**
* (executorIdCounter, requestExecutors) should be the initial state
* or the last state AM restart.
*
* @see SPARK-12864, SPARK-20079
*/
val (executorIdCounter, requestExecutors) =
driverRef.askSync[(Int, RequestExecutors)](GetAMInitialState)
allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
securityMgr,
localResources)
localResources,
executorIdCounter)
if (requestExecutors.requestedTotal != allocator.getTargetNumExecutors) {
amEndpoint.send(requestExecutors)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work; because you're using send, the handler would have to be in the receive method of the endpoint, and it's actually in receiveAndReply (which is the handler for ask).

You have to call requestTotalExecutorsWithPreferredLocalities directly here, or make that method take a RequestExecutors message as an argument, but you shouldn't go through the RPC system just to make a method call.

}

allocator.allocateResources()
reporterThread = launchReporterThread()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}

/**
Expand All @@ -65,7 +64,8 @@ private[yarn] class YarnAllocator(
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver)
resolver: SparkRackResolver,
private var executorIdCounter: Int)
extends Logging {

import YarnAllocator._
Expand All @@ -82,22 +82,6 @@ private[yarn] class YarnAllocator(

@volatile private var numExecutorsRunning = 0

/**
* Used to generate a unique ID per executor
*
* Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
* the id of new executor will start from 1, this will conflict with the executor has
* already created before. So, we should initialize the `executorIdCounter` by getting
* the max executorId from driver.
*
* And this situation of executorId conflict is just in yarn client mode, so this is an issue
* in yarn client mode. For more details, can check in jira.
*
* @see SPARK-12864
*/
private var executorIdCounter: Int =
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)

// Queue to store the timestamp of failed executors
private val failedExecutorsTimeStamps = new Queue[Long]()

Expand Down Expand Up @@ -163,6 +147,8 @@ private[yarn] class YarnAllocator(
clock = newClock
}

def getTargetNumExecutors: Int = targetNumExecutors

def getNumExecutorsRunning: Int = numExecutorsRunning

def getNumExecutorsFailed: Int = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ private[spark] class YarnRMClient extends Logging {
uiAddress: Option[String],
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
localResources: Map[String, LocalResource],
executorIdCounter: Int
): YarnAllocator = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
Expand All @@ -75,7 +76,7 @@ private[spark] class YarnRMClient extends Logging {
registered = true
}
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
localResources, new SparkRackResolver(), executorIdCounter)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.ui.JettyUtils
import org.apache.spark.util.{RpcUtils, ThreadUtils}
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}

/**
* Abstract Yarn scheduler backend that contains common logic
Expand Down Expand Up @@ -68,6 +68,12 @@ private[spark] abstract class YarnSchedulerBackend(
// Flag to specify whether this schedulerBackend should be reset.
private var shouldResetOnAmRegister = false

private val currentState = new CurrentAMState(0,
RequestExecutors(Utils.getDynamicAllocationInitialExecutors(conf), 0, Map.empty, Set.empty))

protected class CurrentAMState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this class is used isn't really helping. You could have the two fields as mutable fields in the parent instead of declaring a separate type.

I'd suggest either getting rid of this class, or turning it into a proper type to be sent as a reply to the "get" RPC, defined in CoarseGrainedClusterMessage.scala (meaning you'd keep state in fields here, and the message itself would be instantiated only when replying to the RPC, and would be immutable).

Also, either suggestion means you don't need the changes around setCurrentExecutorIdCounter. The old code was fine.

Finally, I know I suggested the name, but making the names of the classes more generic would be better (since that file is not YARN-specific). e.g. GetExecutorAllocatorState and ExecutorAllocatorState for the reply.

var executorIdCounter: Int,
var requestExecutors: RequestExecutors)
/**
* Bind to YARN. This *must* be done before calling [[start()]].
*
Expand Down Expand Up @@ -135,7 +141,20 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
val requestExecutors = prepareRequestExecutors(requestedTotal)
val future = yarnSchedulerEndpointRef.ask[Boolean](requestExecutors)
setCurrentRequestExecutors(requestExecutors)
future
}

override def setCurrentExecutorIdCounter(executorId: Int): Unit = synchronized {
if (currentState.executorIdCounter < executorId.toInt) {
currentState.executorIdCounter = executorId.toInt
}
}

def setCurrentRequestExecutors(requestExecutors: RequestExecutors): Unit = synchronized {
currentState.requestExecutors = requestExecutors
}

/**
Expand Down Expand Up @@ -305,8 +324,8 @@ private[spark] abstract class YarnSchedulerBackend(
context.reply(false)
}

case RetrieveLastAllocatedExecutorId =>
context.reply(currentExecutorIdCounter)
case GetAMInitialState =>
context.reply((currentState.executorIdCounter, currentState.requestExecutors))
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
appAttemptId,
new SecurityManager(sparkConf),
Map(),
new MockResolver())
new MockResolver(),
0)
}

def createContainer(host: String): Container = {
Expand Down