Skip to content

Commit a5c0476

Browse files
committed
MAPREDUCE-6697. Concurrent task limits should only be applied when necessary. Contributed by Nathan Roberts.
1 parent 7e031c2 commit a5c0476

File tree

2 files changed

+73
-6
lines changed

2 files changed

+73
-6
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,8 @@ void processFinishedContainer(ContainerStatus container) {
919919

920920
private void applyConcurrentTaskLimits() {
921921
int numScheduledMaps = scheduledRequests.maps.size();
922-
if (maxRunningMaps > 0 && numScheduledMaps > 0) {
922+
if (maxRunningMaps > 0 && numScheduledMaps > 0 &&
923+
getJob().getTotalMaps() > maxRunningMaps) {
923924
int maxRequestedMaps = Math.max(0,
924925
maxRunningMaps - assignedRequests.maps.size());
925926
int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size();
@@ -936,7 +937,8 @@ private void applyConcurrentTaskLimits() {
936937
}
937938

938939
int numScheduledReduces = scheduledRequests.reduces.size();
939-
if (maxRunningReduces > 0 && numScheduledReduces > 0) {
940+
if (maxRunningReduces > 0 && numScheduledReduces > 0 &&
941+
getJob().getTotalReduces() > maxRunningReduces) {
940942
int maxRequestedReduces = Math.max(0,
941943
maxRunningReduces - assignedRequests.reduces.size());
942944
int reduceRequestLimit = Math.min(maxRequestedReduces,

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2781,15 +2781,78 @@ public Token<AMRMTokenIdentifier> run() throws Exception {
27812781
new Text(rmAddr), ugiToken.getService());
27822782
}
27832783

2784+
@Test
2785+
public void testConcurrentTaskLimitsDisabledIfSmaller() throws Exception {
2786+
final int MAP_COUNT = 1;
2787+
final int REDUCE_COUNT = 1;
2788+
final int MAP_LIMIT = 1;
2789+
final int REDUCE_LIMIT = 1;
2790+
Configuration conf = new Configuration();
2791+
conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
2792+
conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
2793+
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
2794+
ApplicationId appId = ApplicationId.newInstance(1, 1);
2795+
ApplicationAttemptId appAttemptId =
2796+
ApplicationAttemptId.newInstance(appId, 1);
2797+
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
2798+
Job mockJob = mock(Job.class);
2799+
when(mockJob.getReport()).thenReturn(
2800+
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
2801+
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
2802+
when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
2803+
when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
2804+
2805+
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
2806+
MyContainerAllocator allocator =
2807+
new MyContainerAllocator(null, conf, appAttemptId, mockJob,
2808+
SystemClock.getInstance()) {
2809+
@Override
2810+
protected void register() {
2811+
}
2812+
2813+
@Override
2814+
protected ApplicationMasterProtocol createSchedulerProxy() {
2815+
return mockScheduler;
2816+
}
2817+
2818+
@Override
2819+
protected void setRequestLimit(Priority priority,
2820+
Resource capability, int limit) {
2821+
Assert.fail("setRequestLimit() should not be invoked");
2822+
}
2823+
};
2824+
2825+
// create some map requests
2826+
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
2827+
for (int i = 0; i < reqMapEvents.length; ++i) {
2828+
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
2829+
}
2830+
allocator.sendRequests(Arrays.asList(reqMapEvents));
2831+
// create some reduce requests
2832+
ContainerRequestEvent[] reqReduceEvents =
2833+
new ContainerRequestEvent[REDUCE_COUNT];
2834+
for (int i = 0; i < reqReduceEvents.length; ++i) {
2835+
reqReduceEvents[i] =
2836+
createReq(jobId, i, 1024, new String[] {}, false, true);
2837+
}
2838+
allocator.sendRequests(Arrays.asList(reqReduceEvents));
2839+
allocator.schedule();
2840+
allocator.schedule();
2841+
allocator.schedule();
2842+
allocator.close();
2843+
}
2844+
27842845
@Test
27852846
public void testConcurrentTaskLimits() throws Exception {
2847+
final int MAP_COUNT = 5;
2848+
final int REDUCE_COUNT = 2;
27862849
final int MAP_LIMIT = 3;
27872850
final int REDUCE_LIMIT = 1;
27882851
LOG.info("Running testConcurrentTaskLimits");
27892852
Configuration conf = new Configuration();
27902853
conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
27912854
conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
2792-
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
2855+
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
27932856
ApplicationId appId = ApplicationId.newInstance(1, 1);
27942857
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
27952858
appId, 1);
@@ -2798,6 +2861,9 @@ public void testConcurrentTaskLimits() throws Exception {
27982861
when(mockJob.getReport()).thenReturn(
27992862
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
28002863
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
2864+
when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
2865+
when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
2866+
28012867
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
28022868
MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
28032869
appAttemptId, mockJob, SystemClock.getInstance()) {
@@ -2812,14 +2878,13 @@ protected ApplicationMasterProtocol createSchedulerProxy() {
28122878
};
28132879

28142880
// create some map requests
2815-
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5];
2881+
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
28162882
for (int i = 0; i < reqMapEvents.length; ++i) {
28172883
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
28182884
}
28192885
allocator.sendRequests(Arrays.asList(reqMapEvents));
2820-
28212886
// create some reduce requests
2822-
ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2];
2887+
ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT];
28232888
for (int i = 0; i < reqReduceEvents.length; ++i) {
28242889
reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
28252890
false, true);

0 commit comments

Comments
 (0)