Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1475,53 +1475,6 @@ private Resource getOrInheritMaxResource(Resource resourceByLabel, String label)
configuredMaxResource, parentMaxResource));
}

void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf,
LeafQueue leafQueue) {
int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath);
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
String maxLabel = RMNodeLabelsManager.NO_LABEL;

if (maxApplications < 0) {
for (String label : configuredNodeLabels) {
int maxApplicationsByLabel = 0;
if (maxGlobalPerQueueApps > 0) {
// In absolute mode, should
// shrink when change to corresponding label capacity.
maxApplicationsByLabel = this.capacityConfigType
!= CapacityConfigType.ABSOLUTE_RESOURCE ?
maxGlobalPerQueueApps :
(int) (maxGlobalPerQueueApps * queueCapacities
.getAbsoluteCapacity(label));
} else {
maxApplicationsByLabel = (int) (conf.getMaximumSystemApplications()
* queueCapacities.getAbsoluteCapacity(label));
}
if (maxApplicationsByLabel > maxApplications) {
maxApplications = maxApplicationsByLabel;
maxLabel = label;
}
}
}
leafQueue.setMaxApplications(maxApplications);

int maxApplicationsPerUser = Math.min(maxApplications,
(int) (maxApplications
* (leafQueue.getUsersManager().getUserLimit() / 100.0f)
* leafQueue.getUsersManager().getUserLimitFactor()));
if (leafQueue.getUsersManager().getUserLimitFactor() == -1) {
maxApplicationsPerUser = maxApplications;
}

leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser);
LOG.info("LeafQueue:" + leafQueue.getQueuePath() +
"update max app related, maxApplications="
+ maxApplications + ", maxApplicationsPerUser="
+ maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
.getAbsoluteCapacity(maxLabel) + ", Cap: " + queueCapacities
.getCapacity(maxLabel) + ", MaxCap : " + queueCapacities
.getMaximumCapacity(maxLabel));
}

void deriveCapacityFromAbsoluteConfigurations(String label,
Resource clusterResource) {
// Update capacity with a float calculated from the parent's minResources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ private String getNodeLabelPrefix(String queue, String label) {
return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT;
}

public void setMaximumSystemApplications(int numMaxApps) {
setInt(MAXIMUM_SYSTEM_APPLICATIONS, numMaxApps);
}

public int getMaximumSystemApplications() {
int maxApplications =
getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand Down Expand Up @@ -1939,8 +1938,9 @@ public void updateClusterResource(Resource clusterResource,
updateAbsoluteCapacities();

super.updateEffectiveResources(clusterResource);
super.updateMaxAppRelatedField(csContext.getConfiguration(),
this);

// Update maximum applications for the queue and for users
updateMaximumApplications(csContext.getConfiguration());

updateCurrentResourceLimits(currentResourceLimits, clusterResource);

Expand Down Expand Up @@ -2326,6 +2326,58 @@ public void stopQueue() {
}
}

void updateMaximumApplications(CapacitySchedulerConfiguration conf) {
int maxAppsForQueue = conf.getMaximumApplicationsPerQueue(getQueuePath());

int maxDefaultPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
int maxSystemApps = conf.getMaximumSystemApplications();
int baseMaxApplications = maxDefaultPerQueueApps > 0 ?
Math.min(maxDefaultPerQueueApps, maxSystemApps)
: maxSystemApps;

String maxLabel = RMNodeLabelsManager.NO_LABEL;
if (maxAppsForQueue < 0) {
if (maxDefaultPerQueueApps > 0 && this.capacityConfigType
!= CapacityConfigType.ABSOLUTE_RESOURCE) {
maxAppsForQueue = baseMaxApplications;
} else {
for (String label : configuredNodeLabels) {
int maxApplicationsByLabel = (int) (baseMaxApplications
* queueCapacities.getAbsoluteCapacity(label));
if (maxApplicationsByLabel > maxAppsForQueue) {
maxAppsForQueue = maxApplicationsByLabel;
maxLabel = label;
}
}
}
}

setMaxApplications(maxAppsForQueue);

updateMaxAppsPerUser();

LOG.info("LeafQueue:" + getQueuePath() +
"update max app related, maxApplications="
+ maxAppsForQueue + ", maxApplicationsPerUser="
+ maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
.getAbsoluteCapacity(maxLabel) + ", Cap: " + queueCapacities
.getCapacity(maxLabel) + ", MaxCap : " + queueCapacities
.getMaximumCapacity(maxLabel));
}

private void updateMaxAppsPerUser() {
int maxAppsPerUser = maxApplications;
if (getUsersManager().getUserLimitFactor() != -1) {
int maxApplicationsWithUserLimits = (int) (maxApplications
* (getUsersManager().getUserLimit() / 100.0f)
* getUsersManager().getUserLimitFactor());
maxAppsPerUser = Math.min(maxApplications,
maxApplicationsWithUserLimits);
}

setMaxApplicationsPerUser(maxAppsPerUser);
}

/**
* Get all valid users in this queue.
* @return user list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,8 @@ public RMNodeLabelsManager createNodeLabelManager() {
+ "submission of application: " + app3.getApplicationId(),
app3.getDiagnostics().toString());

// based on Global limit of queue usert application is rejected
RMApp app11 = MockRMAppSubmitter.submit(rm,
// based on per user max app settings, app should be rejected instantly
RMApp app13 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("app")
.withUser("user")
Expand All @@ -880,36 +880,36 @@ public RMNodeLabelsManager createNodeLabelManager() {
.withWaitForAppAcceptedState(false)
.build());
rm.drainEvents();
rm.waitForState(app11.getApplicationId(), RMAppState.ACCEPTED);
assertEquals(RMAppState.ACCEPTED, app11.getState());
RMApp app12 = MockRMAppSubmitter.submit(rm,
rm.waitForState(app13.getApplicationId(), RMAppState.FAILED);
assertEquals(RMAppState.FAILED, app13.getState());
assertEquals(
"org.apache.hadoop.security.AccessControlException: Queue"
+ " root.d already has 0 applications from user user cannot"
+ " accept submission of application: " + app13.getApplicationId(),
app13.getDiagnostics().toString());

RMApp app11 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("app")
.withUser("user")
.withUser("user2")
.withAcls(null)
.withQueue("d")
.withQueue("a2")
.withWaitForAppAcceptedState(false)
.build());
rm.drainEvents();
rm.waitForState(app12.getApplicationId(), RMAppState.ACCEPTED);
assertEquals(RMAppState.ACCEPTED, app12.getState());
RMApp app13 = MockRMAppSubmitter.submit(rm,
rm.waitForState(app11.getApplicationId(), RMAppState.ACCEPTED);
assertEquals(RMAppState.ACCEPTED, app11.getState());
RMApp app12 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("app")
.withUser("user")
.withUser("user2")
.withAcls(null)
.withQueue("d")
.withQueue("a2")
.withWaitForAppAcceptedState(false)
.build());
rm.drainEvents();
rm.waitForState(app13.getApplicationId(), RMAppState.FAILED);
assertEquals(RMAppState.FAILED, app13.getState());
assertEquals(
"org.apache.hadoop.security.AccessControlException: Queue"
+ " root.d already has 2 applications from user user cannot"
+ " accept submission of application: " + app13.getApplicationId(),
app13.getDiagnostics().toString());

rm.waitForState(app12.getApplicationId(), RMAppState.ACCEPTED);
assertEquals(RMAppState.ACCEPTED, app12.getState());
// based on system max limit application is rejected
RMApp app14 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
Expand Down Expand Up @@ -938,7 +938,6 @@ public RMNodeLabelsManager createNodeLabelManager() {
app15.getDiagnostics().toString());

rm.killApp(app2.getApplicationId());
rm.killApp(app11.getApplicationId());
rm.killApp(app13.getApplicationId());
rm.killApp(app14.getApplicationId());
rm.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5136,6 +5136,13 @@ public void testSetupQueueConfigsWithSpecifiedConfiguration()
assertEquals(1.0, leafQueue.getAbsoluteMaximumCapacity(),
EPSILON);

// limit maximum apps by max system apps
csConf.setMaximumSystemApplications(15);
leafQueue.updateClusterResource(Resource.newInstance(0, 0),
new ResourceLimits(Resource.newInstance(0, 0)));

assertEquals(15, leafQueue.getMaxApplications());

} finally {
//revert config changes
csConf.setNodeLocalityDelay(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception {
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());

// Set GlobalMaximumApplicationsPerQueue in csConf
csConf.setGlobalMaximumApplicationsPerQueue(20000);
csConf.setGlobalMaximumApplicationsPerQueue(8000);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

Expand Down