Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,10 @@ public Path localizeClasspathJar(Path jarPath, Path target, String owner)
* for starting a localizer.
* @throws IOException for most application init failures
* @throws InterruptedException if application init thread is halted by NM
* @throws ConfigurationException if config error was found
*/
public abstract void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException;
throws IOException, InterruptedException, ConfigurationException;

/**
* Prepare the container prior to the launch environment being written.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public void stop() {

@Override
public void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException {
throws IOException, InterruptedException, ConfigurationException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
Expand Down Expand Up @@ -440,9 +440,9 @@ public void startLocalizer(LocalizerStartContext ctx)
localizerArgs = replaceWithContainerLogDir(localizerArgs, containerLogDir);

initializeContainerOp.appendArgs(localizerArgs);
Configuration conf = super.getConf();

try {
Configuration conf = super.getConf();
PrivilegedOperationExecutor privilegedOperationExecutor =
getPrivilegedOperationExecutor();

Expand All @@ -452,7 +452,26 @@ public void startLocalizer(LocalizerStartContext ctx)
} catch (PrivilegedOperationException e) {
int exitCode = e.getExitCode();
LOG.warn("Exit code from container {} startLocalizer is : {}",
locId, exitCode, e);
locId, exitCode, e);

if (exitCode ==
ExitCode.INVALID_CONTAINER_EXEC_PERMISSIONS.getExitCode() ||
exitCode == ExitCode.INVALID_CONFIG_FILE.getExitCode()) {
throw new ConfigurationException("Application " + appId + " initialization failed" +
" (exitCode=" + exitCode + ") with an unrecoverable config error. " +
"Output: " + e.getOutput(), e);
}

// Check if the failure was due to a missing container-executor binary
Throwable cause = e.getCause() != null ? e.getCause() : e;
if (cause instanceof IOException) {
IOException io = (IOException) cause;
if (io.getMessage().contains("No such file or directory")) {
throw new ConfigurationException("Application " + appId + " initialization failed" +
"(exitCode=" + exitCode + "). Container executor not found at "
+ getContainerExecutorExecutablePath(conf), e);
}
}

throw new IOException("Application " + appId + " initialization failed" +
" (exitCode=" + exitCode + ") with output: " + e.getOutput(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;

import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1255,7 +1256,7 @@ public void run() {
try {
// Get nmPrivateDir
nmPrivateCTokensPath = dirsHandler.getLocalPathForWrite(
NM_PRIVATE_DIR + Path.SEPARATOR + tokenFileName);
NM_PRIVATE_DIR + Path.SEPARATOR + tokenFileName);

// 0) init queue, etc.
// 1) write credentials to private dir
Expand All @@ -1275,10 +1276,13 @@ public void run() {
throw new IOException("All disks failed. "
+ dirsHandler.getDisksHealthReport(false));
}
// TODO handle ExitCodeException separately?
} catch (FSError fe) {
exception = fe;
} catch (Exception e) {
// TODO handle ExitCodeException separately?
} catch (ConfigurationException e) {
exception = e;
LOG.error("Failed to launch localizer for {}, due to configuration error. " +
"Marking the node unhealthy.", localizerId, e);
nmContext.getNodeStatusUpdater().reportException(e);
} catch (Exception | FSError e) {
exception = e;
} finally {
if (exception != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void testStartLocalizer() throws IOException {
assertThat(result.get(23)).isEqualTo("8040");
assertThat(result.get(24)).isEqualTo("nmPrivateCTokensPath");

} catch (InterruptedException e) {
} catch (ConfigurationException | InterruptedException e) {
LOG.error("Error:"+e.getMessage(),e);
Assert.fail();
}
Expand Down Expand Up @@ -643,6 +643,61 @@ protected PrivilegedOperationExecutor getPrivilegedOperationExecutor() {
e.getMessage().contains("exitCode"));
}

final int[] exitCodesToThrow = {
LinuxContainerExecutor.ExitCode.INVALID_CONTAINER_EXEC_PERMISSIONS.getExitCode(),
LinuxContainerExecutor.ExitCode.INVALID_CONFIG_FILE.getExitCode(),
};

for (int i = 0; i < exitCodesToThrow.length; i++) {
int exitCode = exitCodesToThrow[i];
doThrow(new PrivilegedOperationException("invalid config", exitCode, null, null))
.when(spyPrivilegedExecutor).executePrivilegedOperation(
any(), any(PrivilegedOperation.class),
any(), any(), anyBoolean(), anyBoolean());

try {
lce.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(address)
.setUser(appSubmitter)
.setAppId(appId.toString())
.setLocId("12345")
.setDirsHandler(dirService)
.build());
Assert.fail("startLocalizer should have thrown a ConfigurationException");
} catch (ConfigurationException e) {
assertTrue("Unexpected exception " + e,
e.getMessage().contains("exitCode=" + exitCode));
}
}

doThrow(new PrivilegedOperationException("IO error",
new IOException("No such file or directory")))
.when(spyPrivilegedExecutor).executePrivilegedOperation(
any(), any(PrivilegedOperation.class),
any(), any(), anyBoolean(), anyBoolean());

try {
lce.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(address)
.setUser(appSubmitter)
.setAppId(appId.toString())
.setLocId("12345")
.setDirsHandler(dirService)
.build());
Assert.fail("startLocalizer should have thrown a ConfigurationException");
} catch (ConfigurationException e) {
assertTrue("Unexpected exception " + e,
e.getMessage().contains("Container executor not found"));
}


doThrow(new PrivilegedOperationException("interrupted"))
.when(spyPrivilegedExecutor).executePrivilegedOperation(
any(), any(PrivilegedOperation.class),
any(), any(), anyBoolean(), anyBoolean());

lce.activateContainer(cid, new Path(workDir, "pid.txt"));
lce.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
Expand Down