Skip to content

Conversation

@witgo
Copy link
Contributor

@witgo witgo commented May 6, 2017

When AM re-registers, the changes are::

  1. The new AM sends a RetrieveLastRequestExecutors message to the driver, getting the latest executor number lastNumExecutors.
  2. The new AM requests to allocate lastNumExecutors executor instance.
  3. The driver no longer calls ExecutorAllocationManager#reset.

See #17480

@SparkQA
Copy link

SparkQA commented May 6, 2017

Test build #76523 has finished for PR 17882 at commit 917cf43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 7, 2017

Test build #76533 has finished for PR 17882 at commit 53d0c25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

ping @witgo how it is going?

@witgo
Copy link
Contributor Author

witgo commented Jun 2, 2017

@jerryshao @vanzin
Would you take some time to review this PR?

@witgo witgo changed the title [WIP][SPARK-20079][try 2][yarn] Re registration of AM hangs spark cluster in yarn-client mode. [SPARK-20079][try 2][yarn] Re registration of AM hangs spark cluster in yarn-client mode. Jun 2, 2017
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to change the first -1 to Utils#getDynamicAllocationInitialExecutors, and the second to be "0".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you remove this method, AFAIK it is still necessary, otherwise who will reset the state of ExecutorAllocationManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the current code, we reset the state of ExecutorAllocationManager is not correct. Iy causes the RetrieveLastRequestExecutors message to not work properly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then if ExecutorAllocationManager#reset is not required, then why would you still keep that method?

Also should we clean this two fields during AM restart in ExecutorAllocationManager?

    executorsPendingToRemove.clear()
    removeTimes.clear()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ExecutorAllocationManager#reset is still necessary, but the following code should be removed

    initializing = true
    numExecutorsTarget = initialNumExecutors
    numExecutorsToAdd = 1

@jerryshao
Copy link
Contributor

CC @mridulm , can you please review this PR?

@vanzin
Copy link
Contributor

vanzin commented Jun 6, 2017

Could you remove [try 2] from the title and write a proper commit message? I'll try to review the code later.

@witgo witgo changed the title [SPARK-20079][try 2][yarn] Re registration of AM hangs spark cluster in yarn-client mode. [SPARK-20079][yarn] Re registration of AM hangs spark cluster in yarn-client mode. Jun 7, 2017
@witgo witgo force-pushed the SPARK-20079_try2 branch from 53d0c25 to bce09cb Compare June 7, 2017 15:03
@witgo
Copy link
Contributor Author

witgo commented Jun 7, 2017

@vanzin Done.

@SparkQA
Copy link

SparkQA commented Jun 7, 2017

Test build #77797 has finished for PR 17882 at commit bce09cb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin Jun 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make a new call for this? What I had in mind would re-use RetrieveLastAllocatedExecutorId adding new properties that tell the AM what's the initial state. Might be good to change the name of the message at that point to (e.g. GetAMInitialState or something).

That call is already made during initialization of YarnAllocator, so it should be a smaller change overall.

So, you'd have one already existing RPC instead of a new RPC from the AM to the driver that may cause the AM to send another RPC to itself.

@witgo witgo changed the title [SPARK-20079][yarn] Re registration of AM hangs spark cluster in yarn-client mode. [WIP][SPARK-20079][yarn] Re registration of AM hangs spark cluster in yarn-client mode. Jun 14, 2017
@witgo witgo force-pushed the SPARK-20079_try2 branch from f019209 to 45aac34 Compare June 14, 2017 15:08
@SparkQA
Copy link

SparkQA commented Jun 14, 2017

Test build #78055 has finished for PR 17882 at commit 45aac34.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • protected class CurrentAMState(

@SparkQA
Copy link

SparkQA commented Jun 14, 2017

Test build #78052 has finished for PR 17882 at commit f019209.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • protected class CurrentAMState(

localResources,
executorIdCounter)
if (requestExecutors.requestedTotal != allocator.getTargetNumExecutors) {
amEndpoint.send(requestExecutors)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work; because you're using send, the handler would have to be in the receive method of the endpoint, and it's actually in receiveAndReply (which is the handler for ask).

You have to call requestTotalExecutorsWithPreferredLocalities directly here, or make that method take a RequestExecutors message as an argument, but you shouldn't go through the RPC system just to make a method call.

private val currentState = new CurrentAMState(0,
RequestExecutors(Utils.getDynamicAllocationInitialExecutors(conf), 0, Map.empty, Set.empty))

protected class CurrentAMState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this class is used isn't really helping. You could have the two fields as mutable fields in the parent instead of declaring a separate type.

I'd suggest either getting rid of this class, or turning it into a proper type to be sent as a reply to the "get" RPC, defined in CoarseGrainedClusterMessage.scala (meaning you'd keep state in fields here, and the message itself would be instantiated only when replying to the RPC, and would be immutable).

Also, either suggestion means you don't need the changes around setCurrentExecutorIdCounter. The old code was fine.

Finally, I know I suggested the name, but making the names of the classes more generic would be better (since that file is not YARN-specific). e.g. GetExecutorAllocatorState and ExecutorAllocatorState for the reply.

@HyukjinKwon
Copy link
Member

gentle ping @witgo for review comments above.

@jerryshao
Copy link
Contributor

I think this could be closed, @vanzin already created a new PR based on this (#18663).

@HyukjinKwon
Copy link
Member

Thank you @jerryshao.

@witgo
Copy link
Contributor Author

witgo commented Jul 26, 2017

I'm very sorry, I haven't taken the time to update this PR recently.
@vanzin , thank you for your work. I'll close this PR.

@witgo witgo closed this Jul 26, 2017
asfgit pushed a commit that referenced this pull request Aug 1, 2017
…art.

The main goal of this change is to avoid the situation described
in the bug, where an AM restart in the middle of a job may cause
no new executors to be allocated because of faulty logic in the
reset path.

The change does two things:

- fixes the executor alloc manager's reset() so that it does not
  stop allocation after a reset() in the middle of a job
- re-orders the initialization of the YarnAllocator class so that
  it fetches the current executor ID before triggering the reset()
  above.

This ensures both that the new allocator gets new requests for executors,
and that it starts from the correct executor id.

Tested with unit tests and by manually causing AM restarts while
running jobs using spark-shell in YARN mode.

Closes #17882

Author: Marcelo Vanzin <[email protected]>
Author: Guoqiang Li <[email protected]>

Closes #18663 from vanzin/SPARK-20079.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants