Skip to content

Commit 874b837

Browse files
RocMarshalWeiZhong94
authored andcommitted
[FLINK-33390][runtime] Support slot balancing at TM level for Adaptive Scheduler
1 parent ca20e1d commit 874b837

20 files changed

+626
-58
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.configuration.DeploymentOptions;
2323
import org.apache.flink.configuration.JobManagerOptions;
2424
import org.apache.flink.configuration.StateRecoveryOptions;
25+
import org.apache.flink.configuration.TaskManagerOptions;
2526
import org.apache.flink.core.failure.FailureEnricher;
2627
import org.apache.flink.runtime.blob.BlobWriter;
2728
import org.apache.flink.runtime.blocklist.BlocklistOperations;
@@ -123,7 +124,9 @@ public SchedulerNG createInstance(
123124
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
124125
jobMasterConfiguration.get(DeploymentOptions.TARGET),
125126
jobMasterConfiguration.get(
126-
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED));
127+
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED),
128+
jobMasterConfiguration.get(
129+
TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE));
127130

128131
final ExecutionGraphFactory executionGraphFactory =
129132
new DefaultExecutionGraphFactory(
@@ -169,13 +172,15 @@ public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
169172
DeclarativeSlotPool declarativeSlotPool,
170173
boolean localRecoveryEnabled,
171174
@Nullable String executionTarget,
172-
boolean minimalTaskManagerPreferred) {
175+
boolean minimalTaskManagerPreferred,
176+
TaskManagerOptions.TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
173177
return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
174178
declarativeSlotPool::reserveFreeSlot,
175179
declarativeSlotPool::freeReservedSlot,
176180
declarativeSlotPool::containsFreeSlot,
177181
localRecoveryEnabled,
178182
executionTarget,
179-
minimalTaskManagerPreferred);
183+
minimalTaskManagerPreferred,
184+
taskManagerLoadBalanceMode);
180185
}
181186
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818

1919
package org.apache.flink.runtime.scheduler.adaptive.allocator;
2020

21+
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2122
import org.apache.flink.runtime.instance.SlotSharingGroupId;
2223
import org.apache.flink.runtime.jobmaster.SlotInfo;
24+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
2325

2426
import java.util.Collection;
2527
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.stream.Collectors;
2630

2731
import static org.apache.flink.util.Preconditions.checkState;
2832

@@ -54,4 +58,13 @@ static void checkMinimumRequiredSlots(
5458
freeSlots.size(),
5559
minimumRequiredSlots);
5660
}
61+
62+
static Map<ResourceID, Set<PhysicalSlot>> getSlotsPerTaskExecutor(
63+
Collection<PhysicalSlot> physicalSlots) {
64+
return physicalSlots.stream()
65+
.collect(
66+
Collectors.groupingBy(
67+
slot -> slot.getTaskManagerLocation().getResourceID(),
68+
Collectors.toSet()));
69+
}
5770
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.annotation.VisibleForTesting;
2222
import org.apache.flink.runtime.jobmaster.SlotInfo;
23+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
2324
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
2425
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
2526
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -52,20 +53,23 @@ public class DefaultSlotAssigner implements SlotAssigner {
5253
private final @Nullable String executionTarget;
5354
private final boolean minimalTaskManagerPreferred;
5455
private final SlotSharingResolver slotSharingResolver;
56+
private final SlotMatchingResolver slotMatchingResolver;
5557

5658
DefaultSlotAssigner(
5759
@Nullable String executionTarget,
5860
boolean minimalTaskManagerPreferred,
59-
SlotSharingResolver slotSharingResolver) {
61+
SlotSharingResolver slotSharingResolver,
62+
SlotMatchingResolver slotMatchingResolver) {
6063
this.executionTarget = executionTarget;
6164
this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
6265
this.slotSharingResolver = slotSharingResolver;
66+
this.slotMatchingResolver = slotMatchingResolver;
6367
}
6468

6569
@Override
6670
public Collection<SlotAssignment> assignSlots(
6771
JobInformation jobInformation,
68-
Collection<? extends SlotInfo> freeSlots,
72+
Collection<PhysicalSlot> freeSlots,
6973
VertexParallelism vertexParallelism,
7074
JobAllocationsInformation previousAllocations) {
7175
checkMinimumRequiredSlots(jobInformation, freeSlots);
@@ -74,26 +78,20 @@ public Collection<SlotAssignment> assignSlots(
7478
slotSharingResolver.getExecutionSlotSharingGroups(
7579
jobInformation, vertexParallelism);
7680

77-
final Collection<? extends SlotInfo> pickedSlots =
78-
pickSlotsIfNeeded(allGroups.size(), freeSlots);
81+
final Collection<PhysicalSlot> pickedSlots = pickSlotsIfNeeded(allGroups.size(), freeSlots);
7982

80-
Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
81-
Collection<SlotAssignment> assignments = new ArrayList<>();
82-
for (ExecutionSlotSharingGroup group : allGroups) {
83-
assignments.add(new SlotAssignment(iterator.next(), group));
84-
}
85-
return assignments;
83+
return slotMatchingResolver.matchSlotSharingGroupWithSlots(allGroups, pickedSlots);
8684
}
8785

8886
@VisibleForTesting
89-
Collection<? extends SlotInfo> pickSlotsIfNeeded(
90-
int requestExecutionSlotSharingGroups, Collection<? extends SlotInfo> freeSlots) {
91-
Collection<? extends SlotInfo> pickedSlots = freeSlots;
87+
Collection<PhysicalSlot> pickSlotsIfNeeded(
88+
int requestExecutionSlotSharingGroups, Collection<PhysicalSlot> freeSlots) {
89+
Collection<PhysicalSlot> pickedSlots = freeSlots;
9290
if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
9391
&& minimalTaskManagerPreferred
9492
// To avoid the sort-work loading.
9593
&& freeSlots.size() > requestExecutionSlotSharingGroups) {
96-
final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> slotsPerTaskExecutor =
94+
final Map<TaskManagerLocation, Set<PhysicalSlot>> slotsPerTaskExecutor =
9795
getSlotsPerTaskExecutor(freeSlots);
9896
pickedSlots =
9997
pickSlotsInMinimalTaskExecutors(
@@ -127,21 +125,20 @@ private Iterator<TaskManagerLocation> getSortedTaskExecutors(
127125
* @param requestedGroups the number of the request execution slot sharing groups.
128126
* @return the target slots that are distributed on the minimal task executors.
129127
*/
130-
private Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
131-
Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> slotsByTaskExecutor,
132-
int requestedGroups) {
133-
final List<SlotInfo> pickedSlots = new ArrayList<>();
128+
private Collection<PhysicalSlot> pickSlotsInMinimalTaskExecutors(
129+
Map<TaskManagerLocation, Set<PhysicalSlot>> slotsByTaskExecutor, int requestedGroups) {
130+
final List<PhysicalSlot> pickedSlots = new ArrayList<>();
134131
final Iterator<TaskManagerLocation> sortedTaskExecutors =
135132
getSortedTaskExecutors(slotsByTaskExecutor);
136133
while (pickedSlots.size() < requestedGroups) {
137-
Set<? extends SlotInfo> slotInfos = slotsByTaskExecutor.get(sortedTaskExecutors.next());
134+
Set<PhysicalSlot> slotInfos = slotsByTaskExecutor.get(sortedTaskExecutors.next());
138135
pickedSlots.addAll(slotInfos);
139136
}
140137
return pickedSlots;
141138
}
142139

143-
private Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> getSlotsPerTaskExecutor(
144-
Collection<? extends SlotInfo> slots) {
140+
private Map<TaskManagerLocation, Set<PhysicalSlot>> getSlotsPerTaskExecutor(
141+
Collection<PhysicalSlot> slots) {
145142
return slots.stream()
146143
.collect(
147144
Collectors.groupingBy(
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.runtime.scheduler.adaptive.allocator;
19+
20+
import org.apache.flink.runtime.jobmaster.SlotInfo;
21+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
22+
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
23+
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
24+
25+
import java.util.ArrayList;
26+
import java.util.Collection;
27+
import java.util.Iterator;
28+
29+
/** The simple slot matching resolver implementation. */
30+
public enum SimpleSlotMatchingResolver implements SlotMatchingResolver {
31+
INSTANCE;
32+
33+
@Override
34+
public Collection<JobSchedulingPlan.SlotAssignment> matchSlotSharingGroupWithSlots(
35+
Collection<ExecutionSlotSharingGroup> requestGroups,
36+
Collection<PhysicalSlot> freeSlots) {
37+
Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
38+
Collection<JobSchedulingPlan.SlotAssignment> assignments = new ArrayList<>();
39+
for (SlotSharingSlotAllocator.ExecutionSlotSharingGroup group : requestGroups) {
40+
assignments.add(new JobSchedulingPlan.SlotAssignment(iterator.next(), group));
41+
}
42+
return assignments;
43+
}
44+
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.runtime.scheduler.adaptive.allocator;
1919

2020
import org.apache.flink.runtime.jobmaster.SlotInfo;
21+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
2122
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
2223
import org.apache.flink.runtime.util.ResourceCounter;
2324

@@ -61,7 +62,7 @@ Optional<VertexParallelism> determineParallelism(
6162
*/
6263
Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(
6364
JobInformation jobInformation,
64-
Collection<? extends SlotInfo> slots,
65+
Collection<PhysicalSlot> slots,
6566
JobAllocationsInformation jobAllocationsInformation);
6667

6768
/**

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.flink.runtime.scheduler.adaptive.allocator;
1919

2020
import org.apache.flink.annotation.Internal;
21-
import org.apache.flink.runtime.jobmaster.SlotInfo;
21+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
2222
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
2323

2424
import java.util.Collection;
@@ -29,7 +29,7 @@ public interface SlotAssigner {
2929

3030
Collection<SlotAssignment> assignSlots(
3131
JobInformation jobInformation,
32-
Collection<? extends SlotInfo> freeSlots,
32+
Collection<PhysicalSlot> freeSlots,
3333
VertexParallelism vertexParallelism,
3434
JobAllocationsInformation previousAllocations);
3535
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.runtime.scheduler.adaptive.allocator;
19+
20+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
21+
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
22+
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
23+
import org.apache.flink.util.FlinkRuntimeException;
24+
25+
import java.util.Collection;
26+
import java.util.function.Supplier;
27+
28+
/** The interface to define the methods for request slot matching resolver. */
29+
public interface SlotMatchingResolver {
30+
31+
Supplier<FlinkRuntimeException> NO_SLOTS_EXCEPTION_GETTER =
32+
() -> new FlinkRuntimeException("No suitable slots enough.");
33+
34+
/**
35+
* Match slots from the free slots with the given collection of requests execution groups.
36+
*
37+
* @param requestGroups the requested execution slot sharing groups.
38+
* @param freeSlots the free slots.
39+
* @return The assignment result.
40+
*/
41+
Collection<SlotAssignment> matchSlotSharingGroupWithSlots(
42+
Collection<ExecutionSlotSharingGroup> requestGroups,
43+
Collection<PhysicalSlot> freeSlots);
44+
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.runtime.scheduler.adaptive.allocator;
1919

2020
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.configuration.TaskManagerOptions;
2122
import org.apache.flink.runtime.clusterframework.types.AllocationID;
2223
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2324
import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -62,20 +63,23 @@ public class SlotSharingSlotAllocator implements SlotAllocator {
6263
private final @Nullable String executionTarget;
6364
private final boolean minimalTaskManagerPreferred;
6465
private final SlotSharingResolver slotSharingResolver = DefaultSlotSharingResolver.INSTANCE;
66+
private final SlotMatchingResolver slotMatchingResolver;
6567

6668
private SlotSharingSlotAllocator(
6769
ReserveSlotFunction reserveSlot,
6870
FreeSlotFunction freeSlotFunction,
6971
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
7072
boolean localRecoveryEnabled,
7173
@Nullable String executionTarget,
72-
boolean minimalTaskManagerPreferred) {
74+
boolean minimalTaskManagerPreferred,
75+
TaskManagerOptions.TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
7376
this.reserveSlotFunction = reserveSlot;
7477
this.freeSlotFunction = freeSlotFunction;
7578
this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
7679
this.localRecoveryEnabled = localRecoveryEnabled;
7780
this.executionTarget = executionTarget;
7881
this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
82+
this.slotMatchingResolver = getSlotMatchingResolver(taskManagerLoadBalanceMode);
7983
}
8084

8185
public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
@@ -84,14 +88,16 @@ public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
8488
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
8589
boolean localRecoveryEnabled,
8690
@Nullable String executionTarget,
87-
boolean minimalTaskManagerPreferred) {
91+
boolean minimalTaskManagerPreferred,
92+
TaskManagerOptions.TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
8893
return new SlotSharingSlotAllocator(
8994
reserveSlot,
9095
freeSlotFunction,
9196
isSlotAvailableAndFreeFunction,
9297
localRecoveryEnabled,
9398
executionTarget,
94-
minimalTaskManagerPreferred);
99+
minimalTaskManagerPreferred,
100+
taskManagerLoadBalanceMode);
95101
}
96102

97103
@Override
@@ -146,7 +152,7 @@ public Optional<VertexParallelism> determineParallelism(
146152
@Override
147153
public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(
148154
JobInformation jobInformation,
149-
Collection<? extends SlotInfo> slots,
155+
Collection<PhysicalSlot> slots,
150156
JobAllocationsInformation jobAllocationsInformation) {
151157
return determineParallelism(jobInformation, slots)
152158
.map(
@@ -157,7 +163,8 @@ public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(
157163
: new DefaultSlotAssigner(
158164
executionTarget,
159165
minimalTaskManagerPreferred,
160-
slotSharingResolver);
166+
slotSharingResolver,
167+
slotMatchingResolver);
161168
return new JobSchedulingPlan(
162169
parallelism,
163170
slotAssigner.assignSlots(
@@ -168,6 +175,22 @@ public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(
168175
});
169176
}
170177

178+
private SlotMatchingResolver getSlotMatchingResolver(
179+
TaskManagerOptions.TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
180+
switch (taskManagerLoadBalanceMode) {
181+
case NONE:
182+
case MIN_RESOURCES:
183+
return SimpleSlotMatchingResolver.INSTANCE;
184+
case SLOTS:
185+
return SlotsBalancedSlotMatchingResolver.INSTANCE;
186+
default:
187+
throw new UnsupportedOperationException(
188+
String.format(
189+
"Unsupported task manager load mode: %s",
190+
taskManagerLoadBalanceMode));
191+
}
192+
}
193+
171194
/**
172195
* Distributes free slots across the slot-sharing groups of the job. Slots are distributed as
173196
* evenly as possible. If a group requires less than an even share of slots the remainder is

0 commit comments

Comments
 (0)