-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[WIP][SPARK-20079][yarn] Re registration of AM hangs spark cluster in yarn-client mode. #17882
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,7 +29,7 @@ import org.apache.spark.rpc._ | |
| import org.apache.spark.scheduler._ | ||
| import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ | ||
| import org.apache.spark.ui.JettyUtils | ||
| import org.apache.spark.util.{RpcUtils, ThreadUtils} | ||
| import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} | ||
|
|
||
| /** | ||
| * Abstract Yarn scheduler backend that contains common logic | ||
|
|
@@ -68,6 +68,12 @@ private[spark] abstract class YarnSchedulerBackend( | |
| // Flag to specify whether this schedulerBackend should be reset. | ||
| private var shouldResetOnAmRegister = false | ||
|
|
||
| private val currentState = new CurrentAMState(0, | ||
| RequestExecutors(Utils.getDynamicAllocationInitialExecutors(conf), 0, Map.empty, Set.empty)) | ||
|
|
||
| protected class CurrentAMState( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way this class is used isn't really helping. You could have the two fields as mutable fields in the parent instead of declaring a separate type. I'd suggest either getting rid of this class, or turning it into a proper type to be sent as a reply to the "get" RPC, defined in Also, either suggestion means you don't need the changes around Finally, I know I suggested the name, but making the names of the classes more generic would be better (since that file is not YARN-specific). e.g. |
||
| var executorIdCounter: Int, | ||
| var requestExecutors: RequestExecutors) | ||
| /** | ||
| * Bind to YARN. This *must* be done before calling [[start()]]. | ||
| * | ||
|
|
@@ -135,7 +141,20 @@ private[spark] abstract class YarnSchedulerBackend( | |
| * This includes executors already pending or running. | ||
| */ | ||
| override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { | ||
| yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal)) | ||
| val requestExecutors = prepareRequestExecutors(requestedTotal) | ||
| val future = yarnSchedulerEndpointRef.ask[Boolean](requestExecutors) | ||
| setCurrentRequestExecutors(requestExecutors) | ||
| future | ||
| } | ||
|
|
||
| override def setCurrentExecutorIdCounter(executorId: Int): Unit = synchronized { | ||
| if (currentState.executorIdCounter < executorId.toInt) { | ||
| currentState.executorIdCounter = executorId.toInt | ||
| } | ||
| } | ||
|
|
||
| def setCurrentRequestExecutors(requestExecutors: RequestExecutors): Unit = synchronized { | ||
| currentState.requestExecutors = requestExecutors | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -305,8 +324,8 @@ private[spark] abstract class YarnSchedulerBackend( | |
| context.reply(false) | ||
| } | ||
|
|
||
| case RetrieveLastAllocatedExecutorId => | ||
| context.reply(currentExecutorIdCounter) | ||
| case GetAMInitialState => | ||
| context.reply((currentState.executorIdCounter, currentState.requestExecutors)) | ||
| } | ||
|
|
||
| override def onDisconnected(remoteAddress: RpcAddress): Unit = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't work; because you're using
send, the handler would have to be in thereceivemethod of the endpoint, and it's actually inreceiveAndReply(which is the handler forask).You have to call
requestTotalExecutorsWithPreferredLocalitiesdirectly here, or make that method take aRequestExecutorsmessage as an argument, but you shouldn't go through the RPC system just to make a method call.