Skip to content

Commit d98c63a

Browse files
authored
HBASE-29168 Add configurable throttling of region moves in CacheAwareLoadBalancer. (#6763)
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
1 parent b625bda commit d98c63a

File tree

7 files changed

+240
-2
lines changed

7 files changed

+240
-2
lines changed

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import edu.umd.cs.findbugs.annotations.NonNull;
2121
import java.io.IOException;
22+
import java.time.Duration;
2223
import java.util.List;
2324
import java.util.Map;
2425
import org.apache.hadoop.conf.Configuration;
@@ -76,6 +77,18 @@ public interface LoadBalancer extends Stoppable, ConfigurationObserver {
7677
@Deprecated
7778
String HBASE_RSGROUP_LOADBALANCER_CLASS = "hbase.rsgroup.grouploadbalancer.class";
7879

80+
/**
81+
* Configuration to determine the time to sleep when throttling (if throttling is implemented by
82+
* the underlying implementation).
83+
*/
84+
String MOVE_THROTTLING = "hbase.master.balancer.move.throttlingMillis";
85+
86+
/**
87+
* The default value, in milliseconds, for the hbase.master.balancer.move.throttlingMillis if
88+
* throttling is implemented.
89+
*/
90+
Duration MOVE_THROTTLING_DEFAULT = Duration.ofMillis(60 * 1000);
91+
7992
/**
8093
* Set the current cluster status. This allows a LoadBalancer to map host name to a server
8194
*/
@@ -154,4 +167,8 @@ Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> r
154167
default void
155168
updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
156169
}
170+
171+
default void throttle(RegionPlan plan) throws Exception {
172+
// noop
173+
}
157174
}

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@
4343
import org.apache.hadoop.hbase.ServerMetrics;
4444
import org.apache.hadoop.hbase.ServerName;
4545
import org.apache.hadoop.hbase.Size;
46+
import org.apache.hadoop.hbase.TableName;
4647
import org.apache.hadoop.hbase.client.RegionInfo;
48+
import org.apache.hadoop.hbase.master.RegionPlan;
4749
import org.apache.hadoop.hbase.util.Pair;
4850
import org.apache.yetus.audience.InterfaceAudience;
4951
import org.slf4j.Logger;
@@ -53,6 +55,13 @@
5355
public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
5456
private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class);
5557

58+
public static final String CACHE_RATIO_THRESHOLD =
59+
"hbase.master.balancer.stochastic.throttling.cacheRatio";
60+
public static final float CACHE_RATIO_THRESHOLD_DEFAULT = 0.8f;
61+
62+
public Float ratioThreshold;
63+
64+
private Long sleepTime;
5665
private Configuration configuration;
5766

5867
public enum GeneratorFunctionType {
@@ -65,6 +74,9 @@ public synchronized void loadConf(Configuration configuration) {
6574
this.configuration = configuration;
6675
this.costFunctions = new ArrayList<>();
6776
super.loadConf(configuration);
77+
ratioThreshold =
78+
this.configuration.getFloat(CACHE_RATIO_THRESHOLD, CACHE_RATIO_THRESHOLD_DEFAULT);
79+
sleepTime = configuration.getLong(MOVE_THROTTLING, MOVE_THROTTLING_DEFAULT.toMillis());
6880
}
6981

7082
@Override
@@ -160,6 +172,53 @@ private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, Stri
160172
return null;
161173
}
162174

175+
@Override
176+
public void throttle(RegionPlan plan) {
177+
Pair<ServerName, Float> rsRatio = this.regionCacheRatioOnOldServerMap.get(plan.getRegionName());
178+
if (
179+
rsRatio != null && plan.getDestination().equals(rsRatio.getFirst())
180+
&& rsRatio.getSecond() >= ratioThreshold
181+
) {
182+
LOG.debug("Moving region {} to server {} with cache ratio {}. No throttling needed.",
183+
plan.getRegionInfo().getEncodedName(), plan.getDestination(), rsRatio.getSecond());
184+
} else {
185+
if (rsRatio != null) {
186+
LOG.debug("Moving region {} to server {} with cache ratio: {}. Throttling move for {}ms.",
187+
plan.getRegionInfo().getEncodedName(), plan.getDestination(),
188+
plan.getDestination().equals(rsRatio.getFirst()) ? rsRatio.getSecond() : "unknown",
189+
sleepTime);
190+
} else {
191+
LOG.debug(
192+
"Moving region {} to server {} with no cache ratio info for the region. "
193+
+ "Throttling move for {}ms.",
194+
plan.getRegionInfo().getEncodedName(), plan.getDestination(), sleepTime);
195+
}
196+
try {
197+
Thread.sleep(sleepTime);
198+
} catch (InterruptedException e) {
199+
throw new RuntimeException(e);
200+
}
201+
}
202+
}
203+
204+
@Override
205+
protected List<RegionPlan> balanceTable(TableName tableName,
206+
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
207+
final Map<String, Pair<ServerName, Float>> snapshot = new HashMap<>();
208+
snapshot.putAll(this.regionCacheRatioOnOldServerMap);
209+
List<RegionPlan> plans = super.balanceTable(tableName, loadOfOneTable);
210+
plans.sort((p1, p2) -> {
211+
Pair<ServerName, Float> pair1 = snapshot.get(p1.getRegionName());
212+
Float ratio1 =
213+
pair1 == null ? 0 : pair1.getFirst().equals(p1.getDestination()) ? pair1.getSecond() : 0f;
214+
Pair<ServerName, Float> pair2 = snapshot.get(p2.getRegionName());
215+
Float ratio2 =
216+
pair2 == null ? 0 : pair2.getFirst().equals(p2.getDestination()) ? pair2.getSecond() : 0f;
217+
return ratio1.compareTo(ratio2) * (-1);
218+
});
219+
return plans;
220+
}
221+
163222
private class CacheAwareCandidateGenerator extends CandidateGenerator {
164223
@Override
165224
protected BalanceAction generate(BalancerClusterState cluster) {

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
621621
try {
622622
successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
623623
} catch (InterruptedException e) {
624+
LOG.error("Thread interrupted: ", e);
624625
Thread.currentThread().interrupt();
625626
}
626627
} else {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2188,10 +2188,14 @@ public List<RegionPlan> executeRegionPlansWithThrottling(List<RegionPlan> plans)
21882188
// TODO: bulk assign
21892189
try {
21902190
this.assignmentManager.balance(plan);
2191+
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
2192+
this.balancer.throttle(plan);
21912193
} catch (HBaseIOException hioe) {
21922194
// should ignore failed plans here, avoiding the whole balance plans be aborted
21932195
// later calls of balance() can fetch up the failed and skipped plans
21942196
LOG.warn("Failed balance plan {}, skipping...", plan, hioe);
2197+
} catch (Exception e) {
2198+
LOG.warn("Failed throttling assigning a new plan.", e);
21952199
}
21962200
// rpCount records balance plans processed, does not care if a plan succeeds
21972201
rpCount++;

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.io.hfile;
1919

2020
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
21+
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertNotEquals;
2324
import static org.junit.Assert.assertTrue;
@@ -82,6 +83,7 @@ public void setup() throws Exception {
8283
conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence");
8384
conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 100);
8485
conf.setBoolean(CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY, true);
86+
conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
8587
zkCluster = TEST_UTIL.startMiniZKCluster();
8688
cluster = TEST_UTIL.startMiniHBaseCluster(option);
8789
cluster.setConf(conf);
@@ -99,7 +101,7 @@ public void testBlockEvictionOnRegionMove() throws Exception {
99101
assertTrue(regionServingRS.getBlockCache().isPresent());
100102
long oldUsedCacheSize =
101103
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
102-
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
104+
assertNotEquals(0, oldUsedCacheSize);
103105

104106
Admin admin = TEST_UTIL.getAdmin();
105107
RegionInfo regionToMove = regionServingRS.getRegions(tableRegionMove).get(0).getRegionInfo();

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
2121
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD;
22+
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
2223
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
2324
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
2425
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
@@ -870,7 +871,7 @@ public void testBlockAdditionWaitWhenCache() throws Exception {
870871
config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000);
871872

872873
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
873-
constructedBlockSizes, 1, 1, persistencePath);
874+
constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config);
874875
assertTrue(bucketCache.waitForCacheInitialization(10000));
875876
long usedByteSize = bucketCache.getAllocator().getUsedSize();
876877
assertEquals(0, usedByteSize);

hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.master.balancer;
1919

2020
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertFalse;
2122
import static org.junit.Assert.assertTrue;
2223
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.when;
@@ -47,6 +48,8 @@
4748
import org.apache.hadoop.hbase.master.RegionPlan;
4849
import org.apache.hadoop.hbase.testclassification.LargeTests;
4950
import org.apache.hadoop.hbase.util.Bytes;
51+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52+
import org.apache.hadoop.hbase.util.Pair;
5053
import org.junit.BeforeClass;
5154
import org.junit.ClassRule;
5255
import org.junit.Test;
@@ -234,6 +237,157 @@ public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() t
234237
assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
235238
}
236239

240+
@Test
241+
public void testThrottlingRegionBeyondThreshold() throws Exception {
242+
Configuration conf = HBaseConfiguration.create();
243+
CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
244+
balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
245+
balancer.loadConf(conf);
246+
balancer.initialize();
247+
ServerName server0 = servers.get(0);
248+
ServerName server1 = servers.get(1);
249+
Pair<ServerName, Float> regionRatio = new Pair<>();
250+
regionRatio.setFirst(server0);
251+
regionRatio.setSecond(1.0f);
252+
balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
253+
RegionInfo mockedInfo = mock(RegionInfo.class);
254+
when(mockedInfo.getEncodedName()).thenReturn("region1");
255+
RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
256+
long startTime = EnvironmentEdgeManager.currentTime();
257+
balancer.throttle(plan);
258+
long endTime = EnvironmentEdgeManager.currentTime();
259+
assertTrue((endTime - startTime) < 10);
260+
}
261+
262+
@Test
263+
public void testThrottlingRegionBelowThreshold() throws Exception {
264+
Configuration conf = HBaseConfiguration.create();
265+
conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
266+
CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
267+
balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
268+
balancer.loadConf(conf);
269+
balancer.initialize();
270+
ServerName server0 = servers.get(0);
271+
ServerName server1 = servers.get(1);
272+
Pair<ServerName, Float> regionRatio = new Pair<>();
273+
regionRatio.setFirst(server0);
274+
regionRatio.setSecond(0.1f);
275+
balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
276+
RegionInfo mockedInfo = mock(RegionInfo.class);
277+
when(mockedInfo.getEncodedName()).thenReturn("region1");
278+
RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
279+
long startTime = EnvironmentEdgeManager.currentTime();
280+
balancer.throttle(plan);
281+
long endTime = EnvironmentEdgeManager.currentTime();
282+
assertTrue((endTime - startTime) >= 100);
283+
}
284+
285+
@Test
286+
public void testThrottlingCacheRatioUnknownOnTarget() throws Exception {
287+
Configuration conf = HBaseConfiguration.create();
288+
conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
289+
CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
290+
balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
291+
balancer.loadConf(conf);
292+
balancer.initialize();
293+
ServerName server0 = servers.get(0);
294+
ServerName server1 = servers.get(1);
295+
ServerName server3 = servers.get(2);
296+
// setting region cache ratio 100% on server 3, though this is not the target in the region plan
297+
Pair<ServerName, Float> regionRatio = new Pair<>();
298+
regionRatio.setFirst(server3);
299+
regionRatio.setSecond(1.0f);
300+
balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
301+
RegionInfo mockedInfo = mock(RegionInfo.class);
302+
when(mockedInfo.getEncodedName()).thenReturn("region1");
303+
RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
304+
long startTime = EnvironmentEdgeManager.currentTime();
305+
balancer.throttle(plan);
306+
long endTime = EnvironmentEdgeManager.currentTime();
307+
assertTrue((endTime - startTime) >= 100);
308+
}
309+
310+
@Test
311+
public void testThrottlingCacheRatioUnknownForRegion() throws Exception {
312+
Configuration conf = HBaseConfiguration.create();
313+
conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
314+
CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
315+
balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
316+
balancer.loadConf(conf);
317+
balancer.initialize();
318+
ServerName server0 = servers.get(0);
319+
ServerName server1 = servers.get(1);
320+
ServerName server3 = servers.get(2);
321+
// No cache ratio available for region1
322+
RegionInfo mockedInfo = mock(RegionInfo.class);
323+
when(mockedInfo.getEncodedName()).thenReturn("region1");
324+
RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
325+
long startTime = EnvironmentEdgeManager.currentTime();
326+
balancer.throttle(plan);
327+
long endTime = EnvironmentEdgeManager.currentTime();
328+
assertTrue((endTime - startTime) >= 100);
329+
}
330+
331+
@Test
332+
public void testRegionPlansSortedByCacheRatioOnTarget() throws Exception {
333+
// The regions are fully cached on old server
334+
335+
Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
336+
ServerName server0 = servers.get(0);
337+
ServerName server1 = servers.get(1);
338+
ServerName server2 = servers.get(2);
339+
340+
// Simulate on RS with all regions, and two RSes with no regions
341+
List<RegionInfo> regionsOnServer0 = randomRegions(15);
342+
List<RegionInfo> regionsOnServer1 = randomRegions(0);
343+
List<RegionInfo> regionsOnServer2 = randomRegions(0);
344+
345+
clusterState.put(server0, regionsOnServer0);
346+
clusterState.put(server1, regionsOnServer1);
347+
clusterState.put(server2, regionsOnServer2);
348+
349+
// Mock cluster metrics
350+
// Mock 5 regions from server0 were previously hosted on server1
351+
List<RegionInfo> oldCachedRegions1 = regionsOnServer0.subList(5, 10);
352+
List<RegionInfo> oldCachedRegions2 = regionsOnServer0.subList(10, regionsOnServer0.size());
353+
Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
354+
// mock server metrics to set cache ratio as 0 in the RS 0
355+
serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
356+
0.0f, new ArrayList<>(), 0, 10));
357+
// mock server metrics to set cache ratio as 1 in the RS 1
358+
serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
359+
0.0f, oldCachedRegions1, 10, 10));
360+
// mock server metrics to set cache ratio as .8 in the RS 2
361+
serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
362+
0.0f, oldCachedRegions2, 8, 10));
363+
ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
364+
when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
365+
loadBalancer.updateClusterMetrics(clusterMetrics);
366+
367+
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
368+
(Map) mockClusterServersWithTables(clusterState);
369+
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
370+
LOG.debug("plans size: {}", plans.size());
371+
LOG.debug("plans: {}", plans);
372+
LOG.debug("server1 name: {}", server1.getServerName());
373+
// assert the plans are in descending order from the most cached to the least cached
374+
int highCacheCount = 0;
375+
for (RegionPlan plan : plans) {
376+
LOG.debug("plan region: {}, target server: {}", plan.getRegionInfo().getEncodedName(),
377+
plan.getDestination().getServerName());
378+
if (highCacheCount < 5) {
379+
LOG.debug("Count: {}", highCacheCount);
380+
assertTrue(oldCachedRegions1.contains(plan.getRegionInfo()));
381+
assertFalse(oldCachedRegions2.contains(plan.getRegionInfo()));
382+
highCacheCount++;
383+
} else {
384+
assertTrue(oldCachedRegions2.contains(plan.getRegionInfo()));
385+
assertFalse(oldCachedRegions1.contains(plan.getRegionInfo()));
386+
}
387+
}
388+
389+
}
390+
237391
@Test
238392
public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception {
239393
// The regions are fully cached on old server

0 commit comments

Comments
 (0)