Skip to content

Commit f9af303

Browse files
committed
HBASE-28582 ModifyTableProcedure should not reset TRSP on region node when closing unused region replicas (#5903)
Signed-off-by: Viraj Jasani <[email protected]> (cherry picked from commit c4a7606)
1 parent ad88fca commit f9af303

File tree

6 files changed

+327
-10
lines changed

6 files changed

+327
-10
lines changed

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -780,3 +780,13 @@ enum MigrateNamespaceTableProcedureState {
780780

781781
message MigrateNamespaceTableProcedureStateData {
782782
}
783+
784+
enum CloseExcessRegionReplicasProcedureState {
785+
CLOSE_EXCESS_REGION_REPLICAS_SCHEDULE = 1;
786+
CLOSE_EXCESS_REGION_REPLICAS_CONFIRM = 2;
787+
}
788+
789+
message CloseExcessRegionReplicasProcedureStateData {
790+
required TableName table_name = 1;
791+
required uint32 new_replica_count = 2;
792+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.atomic.AtomicBoolean;
3434
import java.util.concurrent.locks.Condition;
3535
import java.util.concurrent.locks.ReentrantLock;
36+
import java.util.function.Consumer;
3637
import java.util.stream.Collectors;
3738
import java.util.stream.Stream;
3839
import org.apache.hadoop.conf.Configuration;
@@ -1076,14 +1077,55 @@ public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableN
10761077
}
10771078

10781079
/**
1079-
* Called by ModifyTableProcedures to unassign all the excess region replicas for a table.
1080+
* Called by ModifyTableProcedure to unassign all the excess region replicas for a table. Will
1081+
* skip submit unassign procedure if the region is in transition, so you may need to call this
1082+
* method multiple times.
1083+
* @param tableName the table for closing excess region replicas
1084+
* @param newReplicaCount the new replica count, should be less than current replica count
1085+
* @param submit for submitting procedure
1086+
* @return the number of regions in transition that we can not schedule unassign procedures
10801087
*/
1081-
public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas(
1082-
TableName tableName, int newReplicaCount) {
1083-
return regionStates.getTableRegionStateNodes(tableName).stream()
1084-
.filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount)
1085-
.map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1086-
.toArray(TransitRegionStateProcedure[]::new);
1088+
public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName tableName,
1089+
int newReplicaCount, Consumer<TransitRegionStateProcedure> submit) {
1090+
int inTransitionCount = 0;
1091+
for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) {
1092+
regionNode.lock();
1093+
try {
1094+
if (regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) {
1095+
if (regionNode.isInTransition()) {
1096+
LOG.debug("skip scheduling unassign procedure for {} when closing excess region "
1097+
+ "replicas since it is in transition", regionNode);
1098+
inTransitionCount++;
1099+
continue;
1100+
}
1101+
if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1102+
continue;
1103+
}
1104+
submit.accept(regionNode.setProcedure(TransitRegionStateProcedure
1105+
.unassign(getProcedureEnvironment(), regionNode.getRegionInfo())));
1106+
}
1107+
} finally {
1108+
regionNode.unlock();
1109+
}
1110+
}
1111+
return inTransitionCount;
1112+
}
1113+
1114+
public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newReplicaCount) {
1115+
int unclosed = 0;
1116+
for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) {
1117+
regionNode.lock();
1118+
try {
1119+
if (regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) {
1120+
if (!regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1121+
unclosed++;
1122+
}
1123+
}
1124+
} finally {
1125+
regionNode.unlock();
1126+
}
1127+
}
1128+
return unclosed;
10871129
}
10881130

10891131
public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.procedure;
19+
20+
import java.io.IOException;
21+
import org.apache.commons.lang3.mutable.MutableBoolean;
22+
import org.apache.hadoop.hbase.TableName;
23+
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
24+
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
25+
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
26+
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
27+
import org.apache.hadoop.hbase.util.RetryCounter;
28+
import org.apache.yetus.audience.InterfaceAudience;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
33+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseExcessRegionReplicasProcedureState;
34+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseExcessRegionReplicasProcedureStateData;
35+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
36+
37+
/**
38+
* Procedure for close excess region replicas.
39+
*/
40+
@InterfaceAudience.Private
41+
public class CloseExcessRegionReplicasProcedure
42+
extends AbstractStateMachineTableProcedure<CloseExcessRegionReplicasProcedureState> {
43+
44+
private static final Logger LOG =
45+
LoggerFactory.getLogger(CloseExcessRegionReplicasProcedure.class);
46+
47+
private TableName tableName;
48+
private int newReplicaCount;
49+
50+
private RetryCounter retryCounter;
51+
52+
public CloseExcessRegionReplicasProcedure() {
53+
}
54+
55+
public CloseExcessRegionReplicasProcedure(TableName tableName, int newReplicaCount) {
56+
this.tableName = tableName;
57+
this.newReplicaCount = newReplicaCount;
58+
}
59+
60+
@Override
61+
public TableName getTableName() {
62+
return tableName;
63+
}
64+
65+
@Override
66+
public TableOperationType getTableOperationType() {
67+
return TableOperationType.REGION_EDIT;
68+
}
69+
70+
@Override
71+
protected Flow executeFromState(MasterProcedureEnv env,
72+
CloseExcessRegionReplicasProcedureState state)
73+
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
74+
LOG.trace("{} execute state={}", this, state);
75+
switch (state) {
76+
case CLOSE_EXCESS_REGION_REPLICAS_SCHEDULE:
77+
MutableBoolean submitted = new MutableBoolean(false);
78+
int inTransitionCount = env.getAssignmentManager()
79+
.submitUnassignProcedureForClosingExcessRegionReplicas(tableName, newReplicaCount, p -> {
80+
submitted.setTrue();
81+
addChildProcedure(p);
82+
});
83+
if (inTransitionCount > 0 && submitted.isFalse()) {
84+
// we haven't scheduled any unassign procedures and there are still regions in
85+
// transition, sleep for a while and try again
86+
if (retryCounter == null) {
87+
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
88+
}
89+
long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts();
90+
LOG.info(
91+
"There are still {} region(s) in transition for table {} when closing excess"
92+
+ " region replicas, suspend {}secs and try again later",
93+
inTransitionCount, tableName, backoffMillis / 1000);
94+
suspend((int) backoffMillis, true);
95+
}
96+
setNextState(CloseExcessRegionReplicasProcedureState.CLOSE_EXCESS_REGION_REPLICAS_CONFIRM);
97+
return Flow.HAS_MORE_STATE;
98+
case CLOSE_EXCESS_REGION_REPLICAS_CONFIRM:
99+
int unclosedCount = env.getAssignmentManager()
100+
.numberOfUnclosedExcessRegionReplicas(tableName, newReplicaCount);
101+
if (unclosedCount > 0) {
102+
LOG.info("There are still {} unclosed region(s) for table {} when closing excess"
103+
+ " region replicas, continue...");
104+
setNextState(
105+
CloseExcessRegionReplicasProcedureState.CLOSE_EXCESS_REGION_REPLICAS_SCHEDULE);
106+
return Flow.HAS_MORE_STATE;
107+
} else {
108+
return Flow.NO_MORE_STATE;
109+
}
110+
default:
111+
throw new UnsupportedOperationException("unhandled state=" + state);
112+
}
113+
}
114+
115+
@Override
116+
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
117+
setState(ProcedureProtos.ProcedureState.RUNNABLE);
118+
env.getProcedureScheduler().addFront(this);
119+
return false;
120+
}
121+
122+
@Override
123+
protected void rollbackState(MasterProcedureEnv env,
124+
CloseExcessRegionReplicasProcedureState state) throws IOException, InterruptedException {
125+
throw new UnsupportedOperationException();
126+
}
127+
128+
@Override
129+
protected CloseExcessRegionReplicasProcedureState getState(int stateId) {
130+
return CloseExcessRegionReplicasProcedureState.forNumber(stateId);
131+
}
132+
133+
@Override
134+
protected int getStateId(CloseExcessRegionReplicasProcedureState state) {
135+
return state.getNumber();
136+
}
137+
138+
@Override
139+
protected CloseExcessRegionReplicasProcedureState getInitialState() {
140+
return CloseExcessRegionReplicasProcedureState.CLOSE_EXCESS_REGION_REPLICAS_SCHEDULE;
141+
}
142+
143+
@Override
144+
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
145+
CloseExcessRegionReplicasProcedureStateData data = CloseExcessRegionReplicasProcedureStateData
146+
.newBuilder().setTableName(ProtobufUtil.toProtoTableName(tableName))
147+
.setNewReplicaCount(newReplicaCount).build();
148+
serializer.serialize(data);
149+
}
150+
151+
@Override
152+
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
153+
CloseExcessRegionReplicasProcedureStateData data =
154+
serializer.deserialize(CloseExcessRegionReplicasProcedureStateData.class);
155+
tableName = ProtobufUtil.toTableName(data.getTableName());
156+
newReplicaCount = data.getNewReplicaCount();
157+
}
158+
159+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -525,8 +525,7 @@ private void closeExcessReplicasIfNeeded(MasterProcedureEnv env) {
525525
if (newReplicaCount >= oldReplicaCount) {
526526
return;
527527
}
528-
addChildProcedure(env.getAssignmentManager()
529-
.createUnassignProceduresForClosingExcessRegionReplicas(getTableName(), newReplicaCount));
528+
addChildProcedure(new CloseExcessRegionReplicasProcedure(getTableName(), newReplicaCount));
530529
}
531530

532531
/**
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.assignment;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import java.util.concurrent.CompletableFuture;
25+
import org.apache.hadoop.hbase.HBaseClassTestRule;
26+
import org.apache.hadoop.hbase.HBaseTestingUtil;
27+
import org.apache.hadoop.hbase.TableName;
28+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
29+
import org.apache.hadoop.hbase.client.TableDescriptor;
30+
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
31+
import org.apache.hadoop.hbase.master.RegionState;
32+
import org.apache.hadoop.hbase.master.procedure.CloseExcessRegionReplicasProcedure;
33+
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
34+
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
35+
import org.apache.hadoop.hbase.testclassification.MasterTests;
36+
import org.apache.hadoop.hbase.testclassification.MediumTests;
37+
import org.junit.AfterClass;
38+
import org.junit.BeforeClass;
39+
import org.junit.ClassRule;
40+
import org.junit.Test;
41+
import org.junit.experimental.categories.Category;
42+
43+
/**
44+
* A test to make sure that we will wait for RIT to finish while closing excess region replicas. See
45+
* HBASE-28582 and related issues for more details.
46+
*/
47+
@Category({ MasterTests.class, MediumTests.class })
48+
public class TestReduceExcessRegionReplicasBlockedByRIT {
49+
50+
@ClassRule
51+
public static final HBaseClassTestRule CLASS_RULE =
52+
HBaseClassTestRule.forClass(TestReduceExcessRegionReplicasBlockedByRIT.class);
53+
54+
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
55+
56+
private static TableDescriptor TD =
57+
TableDescriptorBuilder.newBuilder(TableName.valueOf("CloseExcessRegionReplicas"))
58+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("family")).setRegionReplication(4).build();
59+
60+
@BeforeClass
61+
public static void setUp() throws Exception {
62+
UTIL.startMiniCluster(1);
63+
UTIL.getAdmin().createTable(TD);
64+
UTIL.waitTableAvailable(TD.getTableName());
65+
UTIL.waitUntilNoRegionsInTransition();
66+
}
67+
68+
@AfterClass
69+
public static void tearDown() throws Exception {
70+
UTIL.shutdownMiniCluster();
71+
}
72+
73+
@Test
74+
public void testRIT() throws Exception {
75+
RegionStateNode rsn = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager()
76+
.getRegionStates().getTableRegionStateNodes(TD.getTableName()).stream()
77+
.filter(rn -> rn.getRegionInfo().getReplicaId() > 1).findAny().get();
78+
// fake a TRSP to block the CloseExcessRegionReplicasProcedure
79+
TransitRegionStateProcedure trsp = new TransitRegionStateProcedure();
80+
rsn.setProcedure(trsp);
81+
TableDescriptor newTd = TableDescriptorBuilder.newBuilder(TD).setRegionReplication(2).build();
82+
CompletableFuture<Void> future = UTIL.getAsyncConnection().getAdmin().modifyTable(newTd);
83+
ProcedureExecutor<MasterProcedureEnv> procExec =
84+
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
85+
UTIL.waitFor(5000, () -> procExec.getProcedures().stream()
86+
.anyMatch(p -> p instanceof CloseExcessRegionReplicasProcedure && !p.isFinished()));
87+
CloseExcessRegionReplicasProcedure proc =
88+
procExec.getProcedures().stream().filter(p -> p instanceof CloseExcessRegionReplicasProcedure)
89+
.map(p -> (CloseExcessRegionReplicasProcedure) p).findFirst().get();
90+
// make sure that the procedure can not finish
91+
for (int i = 0; i < 5; i++) {
92+
Thread.sleep(3000);
93+
assertFalse(proc.isFinished());
94+
}
95+
assertTrue(rsn.isInState(RegionState.State.OPEN));
96+
// unset the procedure, so we could make progress on CloseExcessRegionReplicasProcedure
97+
rsn.unsetProcedure(trsp);
98+
UTIL.waitFor(60000, () -> proc.isFinished());
99+
100+
future.get();
101+
102+
// the region should be in CLOSED state, and should have been removed from AM
103+
assertTrue(rsn.isInState(RegionState.State.CLOSED));
104+
// only 2 replicas now
105+
assertEquals(2, UTIL.getMiniHBaseCluster().getRegions(TD.getTableName()).size());
106+
}
107+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicasWithModifyTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public void testRegionReplicasByEnableTableWhenReplicaCountIsDecreasedWithMultip
157157
}
158158

159159
@Test
160-
public void testRegionReplicasByEnableTableWhenReplicaCountIsIncreasedWithmultipleRegions()
160+
public void testRegionReplicasByEnableTableWhenReplicaCountIsIncreasedWithMultipleRegions()
161161
throws Exception {
162162
enableReplicationByModification(true, 2, 3, 15);
163163
}

0 commit comments

Comments
 (0)