diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5da709c49dc2b..1cb86f9772898 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -783,15 +783,11 @@ public void removeVeryOldStoppedContainersFromCache() { break; } if (!context.getContainers().containsKey(cid)) { - ApplicationId appId = - cid.getApplicationAttemptId().getApplicationId(); - if (isApplicationStopped(appId)) { - i.remove(); - try { - context.getNMStateStore().removeContainer(cid); - } catch (IOException e) { - LOG.error("Unable to remove container {} in store.", cid, e); - } + i.remove(); + try { + context.getNMStateStore().removeContainer(cid); + } catch (IOException e) { + LOG.error("Unable to remove container {} in store.", cid, e); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e07a0e1cc18e1..fd579b68bcc9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -151,6 +151,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; @@ -165,6 +166,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogAggregatorState; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -446,6 +448,11 @@ private void recover() throws IOException, URISyntaxException { } } + RecoveredLogAggregatorState logAggregatorState = stateStore.loadLogAggregatorState(); + for (ContainerId containerId: logAggregatorState.getLogAggregators()) { + recoverLogAggregator(containerId); + } + // Recovery AMRMProxy state after apps and containers are recovered if (this.amrmProxyEnabled) { this.getAMRMProxyService().recover(); @@ -595,6 +602,11 @@ private void waitForRecoveredContainers() throws InterruptedException { } } + private void recoverLogAggregator(ContainerId containerId) { + LOG.info("Recovering log aggregator for " + containerId); + dispatcher.getEventHandler().handle(new LogHandlerContainerRecoveredEvent(containerId)); + } + protected LogHandler createLogHandler(Configuration conf, Context context, DeletionService deletionService) { if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java index 93436fa96da75..c8004d4c7b599 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java @@ -20,12 +20,15 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.ContainerLogContext; public interface AppLogAggregator extends Runnable { void startContainerLogAggregation(ContainerLogContext logContext); + void recoverContainerLogAggregation(ContainerId containerId); + void abortLogAggregation(); void finishLogAggregation(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index ce6397e390481..04137b1e42fb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -383,6 +383,11 @@ private void uploadLogsForContainers(boolean appFinished) // remove it from containerLogAggregators. if (finishedContainers.contains(container)) { containerLogAggregators.remove(container); + try { + context.getNMStateStore().removeLogAggregator(container); + } catch (IOException e) { + LOG.error("Unable to remove log aggregator {} from store.", container, e); + } } } @@ -616,10 +621,20 @@ public void startContainerLogAggregation(ContainerLogContext logContext) { if (shouldUploadLogs(logContext)) { LOG.info("Considering container " + logContext.getContainerId() + " for log-aggregation"); + try { + context.getNMStateStore().storeLogAggregator(logContext.getContainerId()); + } catch (IOException e) { + LOG.error("Unable to add log aggregator {} to store.", logContext.getContainerId(), e); + } this.pendingContainers.add(logContext.getContainerId()); } } + @Override + public void recoverContainerLogAggregation(ContainerId containerId) { + this.pendingContainers.add(containerId); + } + @Override public synchronized void finishLogAggregation() { LOG.info("Application just finished : " + this.applicationId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 0d3ea75f38fc5..ac105c57eb1c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; @@ -336,6 +337,22 @@ private void stopContainer(ContainerId containerId, new ContainerLogContext(containerId, containerType, exitCode)); } + private void recoverContainer(ContainerId containerId) { + AppLogAggregator aggregator = this.appLogAggregators.get( + containerId.getApplicationAttemptId().getApplicationId()); + if (aggregator == null) { + LOG.warn("Log aggregation is not initialized for " + containerId + + " during recovery, removing from store."); + try { + context.getNMStateStore().removeLogAggregator(containerId); + } catch (IOException e) { + LOG.error("Unable to remove log aggregator {} from store.", containerId, e); + } + return; + } + aggregator.recoverContainerLogAggregation(containerId); + } + @SuppressWarnings("unchecked") private void stopApp(ApplicationId appId) { @@ -381,6 +398,11 @@ public void handle(LogHandlerEvent event) { (LogHandlerAppFinishedEvent) event; stopApp(appFinishedEvent.getApplicationId()); break; + case CONTAINER_RECOVERED: + LogHandlerContainerRecoveredEvent containerRecoveredEvent = + (LogHandlerContainerRecoveredEvent) event; + recoverContainer(containerRecoveredEvent.getContainerId()); + break; case LOG_AGG_TOKEN_UPDATE: checkAndEnableAppAggregators(); break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerRecoveredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerRecoveredEvent.java new file mode 100644 index 0000000000000..c424cd2a74da6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerRecoveredEvent.java @@ -0,0 +1,35 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class LogHandlerContainerRecoveredEvent extends LogHandlerEvent { + + private final ContainerId containerId; + + public LogHandlerContainerRecoveredEvent(ContainerId containerId) { + super(LogHandlerEventType.CONTAINER_RECOVERED); + this.containerId = containerId; + } + + public ContainerId getContainerId() { + return this.containerId; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java index ec477c2b00f59..2d81a0d207715 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java @@ -21,5 +21,7 @@ public enum LogHandlerEventType { APPLICATION_STARTED, CONTAINER_FINISHED, - APPLICATION_FINISHED, LOG_AGG_TOKEN_UPDATE + APPLICATION_FINISHED, + CONTAINER_RECOVERED, + LOG_AGG_TOKEN_UPDATE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 81cfb2e743b7a..bf60339423455 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -152,6 +152,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX; private static final String LOG_DELETER_KEY_PREFIX = "LogDeleters/"; + private static final String LOG_AGGREGATOR_KEY_PREFIX = "LogAggregators/"; private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/"; @@ -1410,6 +1411,65 @@ public void removeLogDeleter(ApplicationId appId) throws IOException { } } + @Override + public RecoveredLogAggregatorState loadLogAggregatorState() throws IOException { + RecoveredLogAggregatorState state = new RecoveredLogAggregatorState(); + state.logAggregators = new ArrayList(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(LOG_AGGREGATOR_KEY_PREFIX)); + final int logAggregatorKeyPrefixLength = LOG_AGGREGATOR_KEY_PREFIX.length(); + while (iter.hasNext()) { + Entry entry = iter.next(); + String fullKey = asString(entry.getKey()); + if (!fullKey.startsWith(LOG_AGGREGATOR_KEY_PREFIX)) { + break; + } + + String containerIdStr = fullKey.substring(logAggregatorKeyPrefixLength); + ContainerId containerId = null; + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException e) { + LOG.warn("Skipping unknown log aggregator key " + fullKey); + continue; + } + state.logAggregators.add(containerId); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return state; + } + + @Override + public void storeLogAggregator(ContainerId containerId) + throws IOException { + String key = getLogAggregatorKey(containerId); + try { + db.put(bytes(key), new byte[0]); + } catch (DBException e) { + markStoreUnHealthy(e); + throw new IOException(e); + } + } + + @Override + public void removeLogAggregator(ContainerId containerId) throws IOException { + String key = getLogAggregatorKey(containerId); + try { + db.delete(bytes(key)); + } catch (DBException e) { + markStoreUnHealthy(e); + throw new IOException(e); + } + } + @Override public void storeAssignedResources(Container container, String resourceType, List assignedResources) @@ -1490,6 +1550,10 @@ private String getLogDeleterKey(ApplicationId appId) { return LOG_DELETER_KEY_PREFIX + appId; } + private String getLogAggregatorKey(ContainerId containerId) { + return LOG_AGGREGATOR_KEY_PREFIX + containerId; + } + @Override public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException { RecoveredAMRMProxyState result = new RecoveredAMRMProxyState(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 3ae00f72a945d..441a04685e4aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -244,6 +244,22 @@ public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto) public void removeLogDeleter(ApplicationId appId) throws IOException { } + @Override + public RecoveredLogAggregatorState loadLogAggregatorState() throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeLogAggregator(ContainerId containerId) + throws IOException { + } + + @Override + public void removeLogAggregator(ContainerId containerId) + throws IOException { + } + @Override public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException { throw new UnsupportedOperationException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index ee41eab77ed83..7b91941df6ad2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -325,6 +325,14 @@ public Map getLogDeleterMap() { } } + public static class RecoveredLogAggregatorState { + List logAggregators; + + public List getLogAggregators() { + return logAggregators; + } + } + /** * Recovered states for AMRMProxy. */ @@ -722,6 +730,30 @@ public abstract void storeLogDeleter(ApplicationId appId, public abstract void removeLogDeleter(ApplicationId appId) throws IOException; + /** + * Load the state of the log aggregators + * @return recovered log aggregator state + * @throws IOException if fails + */ + public abstract RecoveredLogAggregatorState loadLogAggregatorState() + throws IOException; + + /** + * Store the state of a log aggregator + * @param containerId the container ID for the log aggregator + * @throws IOException if fails + */ + public abstract void storeLogAggregator(ContainerId containerId) + throws IOException; + + /** + * Remove the state of a log aggregator + * @param containerId the container ID for the log aggregator + * @throws IOException if fails + */ + public abstract void removeLogAggregator(ContainerId containerId) + throws IOException; + /** * Load the state of AMRMProxy. * @return recovered state of AMRMProxy diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 89010bb3342e9..876070b00163b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -965,18 +965,6 @@ public void testRecentlyFinishedContainers() throws Exception { nodeStatusUpdater.addCompletedContainer(cId); assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); - - // verify container remains even after expiration if app - // is still active - nm.getNMContext().getContainers().remove(cId); - Thread.sleep(10); - nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); - assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); - - // complete the application and verify container is removed - nm.getNMContext().getApplications().remove(appId); - nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); - assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index a90172a096007..2d680c8b7b73c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -144,6 +144,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -2863,4 +2864,44 @@ public void testRollingMonitorIntervalLessThanSet() { long interval = logAggregationService.getRollingMonitorInterval(); assertEquals(1800L, interval); } + + @Test + public void testLogAggregationRecovery() throws Exception { + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler); + logAggregationService.init(this.conf); + logAggregationService.start(); + + ApplicationId application = BuilderUtils.newApplicationId(1234, 1); + + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(application, 1); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + + File appLogDir1 = + new File(localLogDir, application.toString()); + appLogDir1.mkdir(); + + // Simulate log-file creation + writeContainerLogs(appLogDir1, containerId, new String[] { "stdout", + "syslog" }, new String[] {}); + + LogAggregationContext logAggregationContext = + Records.newRecord(LogAggregationContext.class); + + logAggregationService.handle(new LogHandlerAppStartedEvent(application, + this.user, null, this.acls, + logAggregationContext)); + + logAggregationService.handle(new LogHandlerContainerRecoveredEvent(containerId)); + + logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); + + String[] logFiles = new String[] { "stdout", "syslog" }; + verifyContainerLogs(logAggregationService, application, + new ContainerId[] {containerId}, logFiles, 2, false, new String[] {}); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 16abe46dca031..35fe3b9cae5ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -23,9 +23,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -60,6 +62,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { private Map applicationMasterKeys; private Map activeTokens; private Map logDeleterState; + private Set logAggregatorState; private RecoveredAMRMProxyState amrmProxyState; public NMMemoryStateStoreService() { @@ -77,6 +80,7 @@ protected void initStorage(Configuration conf) { trackerStates = new HashMap(); deleteTasks = new HashMap(); logDeleterState = new HashMap(); + logAggregatorState = new HashSet(); amrmProxyState = new RecoveredAMRMProxyState(); } @@ -513,6 +517,26 @@ public synchronized void removeLogDeleter(ApplicationId appId) logDeleterState.remove(appId); } + @Override + public synchronized RecoveredLogAggregatorState loadLogAggregatorState() + throws IOException { + RecoveredLogAggregatorState state = new RecoveredLogAggregatorState(); + state.logAggregators = new ArrayList<>(logAggregatorState); + return state; + } + + @Override + public synchronized void storeLogAggregator(ContainerId containerId) + throws IOException { + logAggregatorState.add(containerId); + } + + @Override + public synchronized void removeLogAggregator(ContainerId containerId) + throws IOException { + logAggregatorState.remove(containerId); + } + @Override public synchronized RecoveredAMRMProxyState loadAMRMProxyState() throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index d88f2211e9e7b..2608ab3bc3ec4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogAggregatorState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; @@ -1547,6 +1548,49 @@ public void testLogDeleterStorage() throws IOException { assertTrue(state.getLogDeleterMap().isEmpty()); } + @Test + public void testLogAggregatorStorage() throws IOException { + // test empty when no state + RecoveredLogAggregatorState state = stateStore.loadLogAggregatorState(); + assertTrue(state.getLogAggregators().isEmpty()); + + // store log deleter state + final ApplicationId appId1 = ApplicationId.newInstance(1, 1); + ApplicationAttemptId appAttemptId1 = ApplicationAttemptId.newInstance(appId1, 1); + final ContainerId containerId1 = ContainerId.newContainerId(appAttemptId1, 1); + stateStore.storeLogAggregator(containerId1); + + // restart state store and verify recovered + restartStateStore(); + state = stateStore.loadLogAggregatorState(); + assertEquals(1, state.getLogAggregators().size()); + assertEquals(containerId1, state.getLogAggregators().get(0)); + + // store another log aggregator + final ApplicationId appId2 = ApplicationId.newInstance(2, 2); + ApplicationAttemptId appAttemptId2 = ApplicationAttemptId.newInstance(appId2, 1); + final ContainerId containerId2 = ContainerId.newContainerId(appAttemptId2, 1); + stateStore.storeLogAggregator(containerId2); + + // restart state store and verify recovered + restartStateStore(); + state = stateStore.loadLogAggregatorState(); + assertEquals(2, state.getLogAggregators().size()); + + // remove a deleter and verify removed after restart and recovery + stateStore.removeLogAggregator(containerId1); + restartStateStore(); + state = stateStore.loadLogAggregatorState(); + assertEquals(1, state.getLogAggregators().size()); + assertEquals(containerId2, state.getLogAggregators().get(0)); + + // remove last deleter and verify empty after restart and recovery + stateStore.removeLogAggregator(containerId2); + restartStateStore(); + state = stateStore.loadLogAggregatorState(); + assertTrue(state.getLogAggregators().isEmpty()); + } + @Test public void testCompactionCycle() throws IOException { final DB mockdb = mock(DB.class);