Skip to content

Commit 1553b39

Browse files
ddupgsunxin
authored andcommitted
HBASE-25113 [testing] HBaseCluster support ReplicationServer for UTs (#2662)
Signed-off-by: Guanghao Zhang <[email protected]>
1 parent c8d8782 commit 1553b39

File tree

8 files changed

+242
-44
lines changed

8 files changed

+242
-44
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
3333
import org.apache.hadoop.hbase.master.HMaster;
3434
import org.apache.hadoop.hbase.regionserver.HRegionServer;
35+
import org.apache.hadoop.hbase.replication.HReplicationServer;
3536
import org.apache.hadoop.hbase.security.User;
3637
import org.apache.hadoop.hbase.util.JVMClusterUtil;
3738
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
39+
import org.apache.hadoop.hbase.util.JVMClusterUtil.ReplicationServerThread;
3840
import org.apache.hadoop.hbase.util.Threads;
3941
import org.apache.yetus.audience.InterfaceAudience;
4042
import org.slf4j.Logger;
@@ -60,7 +62,10 @@
6062
public class LocalHBaseCluster {
6163
private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class);
6264
private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>();
63-
private final List<JVMClusterUtil.RegionServerThread> regionThreads = new CopyOnWriteArrayList<>();
65+
private final List<JVMClusterUtil.RegionServerThread> regionThreads =
66+
new CopyOnWriteArrayList<>();
67+
private final List<JVMClusterUtil.ReplicationServerThread> replicationThreads =
68+
new CopyOnWriteArrayList<>();
6469
private final static int DEFAULT_NO = 1;
6570
/** local mode */
6671
public static final String LOCAL = "local";
@@ -259,6 +264,26 @@ public JVMClusterUtil.MasterThread run() throws Exception {
259264
});
260265
}
261266

267+
@SuppressWarnings("unchecked")
268+
public JVMClusterUtil.ReplicationServerThread addReplicationServer(
269+
Configuration config, final int index) throws IOException {
270+
// Create each replication server with its own Configuration instance so each has
271+
// its Connection instance rather than share (see HBASE_INSTANCES down in
272+
// the guts of ConnectionManager).
273+
JVMClusterUtil.ReplicationServerThread rst =
274+
JVMClusterUtil.createReplicationServerThread(config, index);
275+
this.replicationThreads.add(rst);
276+
return rst;
277+
}
278+
279+
public JVMClusterUtil.ReplicationServerThread addReplicationServer(
280+
final Configuration config, final int index, User user)
281+
throws IOException, InterruptedException {
282+
return user.runAs(
283+
(PrivilegedExceptionAction<ReplicationServerThread>) () -> addReplicationServer(config,
284+
index));
285+
}
286+
262287
/**
263288
* @param serverNumber
264289
* @return region server
@@ -289,6 +314,40 @@ public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
289314
return liveServers;
290315
}
291316

317+
/**
318+
* @param serverNumber replication server number
319+
* @return replication server
320+
*/
321+
public HReplicationServer getReplicationServer(int serverNumber) {
322+
return replicationThreads.get(serverNumber).getReplicationServer();
323+
}
324+
325+
/**
326+
* @return Read-only list of replication server threads.
327+
*/
328+
public List<JVMClusterUtil.ReplicationServerThread> getReplicationServers() {
329+
return Collections.unmodifiableList(this.replicationThreads);
330+
}
331+
332+
/**
333+
* @return List of running servers (Some servers may have been killed or
334+
* aborted during lifetime of cluster; these servers are not included in this
335+
* list).
336+
*/
337+
public List<JVMClusterUtil.ReplicationServerThread> getLiveReplicationServers() {
338+
List<JVMClusterUtil.ReplicationServerThread> liveServers = new ArrayList<>();
339+
List<ReplicationServerThread> list = getReplicationServers();
340+
for (JVMClusterUtil.ReplicationServerThread rst: list) {
341+
if (rst.isAlive()) {
342+
liveServers.add(rst);
343+
}
344+
else {
345+
LOG.info("Not alive {}", rst.getName());
346+
}
347+
}
348+
return liveServers;
349+
}
350+
292351
/**
293352
* @return the Configuration used by this LocalHBaseCluster
294353
*/
@@ -430,7 +489,7 @@ public void join() {
430489
* Start the cluster.
431490
*/
432491
public void startup() throws IOException {
433-
JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
492+
JVMClusterUtil.startup(this.masterThreads, this.regionThreads, this.replicationThreads);
434493
}
435494

436495
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,19 @@ public boolean isStopped() {
445445
return this.stopped;
446446
}
447447

448+
public void waitForServerOnline(){
449+
while (!isStopped() && !isOnline()) {
450+
synchronized (online) {
451+
try {
452+
online.wait(msgInterval);
453+
} catch (InterruptedException ie) {
454+
Thread.currentThread().interrupt();
455+
break;
456+
}
457+
}
458+
}
459+
}
460+
448461
/**
449462
* Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
450463
* be hooked up to WAL.

hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.function.Supplier;
2828

2929
import org.apache.hadoop.hbase.HConstants;
30+
import org.apache.hadoop.hbase.replication.HReplicationServer;
3031
import org.apache.yetus.audience.InterfaceAudience;
3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
@@ -71,6 +72,33 @@ public void waitForServerOnline() {
7172
}
7273
}
7374

75+
/**
76+
* Datastructure to hold ReplicationServer Thread and ReplicationServer instance
77+
*/
78+
public static class ReplicationServerThread extends Thread {
79+
private final HReplicationServer replicationServer;
80+
81+
public ReplicationServerThread(final HReplicationServer r, final int index) {
82+
super(r, "ReplicationServer:" + index + ";" + r.getServerName().toShortString());
83+
this.replicationServer = r;
84+
}
85+
86+
/**
87+
* @return the replication server
88+
*/
89+
public HReplicationServer getReplicationServer() {
90+
return this.replicationServer;
91+
}
92+
93+
/**
94+
* Block until the replication server has come online, indicating it is ready
95+
* to be used.
96+
*/
97+
public void waitForServerOnline() {
98+
replicationServer.waitForServerOnline();
99+
}
100+
}
101+
74102
/**
75103
* Creates a {@link RegionServerThread}.
76104
* Call 'start' on the returned thread to make it run.
@@ -98,6 +126,24 @@ public static JVMClusterUtil.RegionServerThread createRegionServerThread(final C
98126
return new JVMClusterUtil.RegionServerThread(server, index);
99127
}
100128

129+
/**
130+
* Creates a {@link ReplicationServerThread}.
131+
* Call 'start' on the returned thread to make it run.
132+
* @param c Configuration to use.
133+
* @param index Used distinguishing the object returned.
134+
* @throws IOException
135+
* @return Replication server added.
136+
*/
137+
public static JVMClusterUtil.ReplicationServerThread createReplicationServerThread(
138+
final Configuration c, final int index) throws IOException {
139+
HReplicationServer server;
140+
try {
141+
server = new HReplicationServer(c);
142+
} catch (Exception e) {
143+
throw new IOException(e);
144+
}
145+
return new JVMClusterUtil.ReplicationServerThread(server, index);
146+
}
101147

102148
/**
103149
* Datastructure to hold Master Thread and Master instance
@@ -122,7 +168,7 @@ public HMaster getMaster() {
122168
* @param c Configuration to use.
123169
* @param hmc Class to create.
124170
* @param index Used distinguishing the object returned.
125-
* @throws IOException
171+
* @throws IOException exception
126172
* @return Master added.
127173
*/
128174
public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
@@ -165,7 +211,8 @@ private static JVMClusterUtil.MasterThread findActiveMaster(
165211
* @return Address to use contacting primary master.
166212
*/
167213
public static String startup(final List<JVMClusterUtil.MasterThread> masters,
168-
final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
214+
final List<JVMClusterUtil.RegionServerThread> regionservers,
215+
final List<JVMClusterUtil.ReplicationServerThread> replicationServers) throws IOException {
169216
// Implementation note: This method relies on timed sleeps in a loop. It's not great, and
170217
// should probably be re-written to use actual synchronization objects, but it's ok for now
171218

@@ -193,6 +240,12 @@ public static String startup(final List<JVMClusterUtil.MasterThread> masters,
193240
}
194241
}
195242

243+
if (replicationServers != null) {
244+
for (JVMClusterUtil.ReplicationServerThread t: replicationServers) {
245+
t.start();
246+
}
247+
}
248+
196249
// Wait for an active master to be initialized (implies being master)
197250
// with this, when we return the cluster is complete
198251
final int initTimeout = configuration != null ? Integer.parseInt(

hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,8 +1115,8 @@ public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
11151115
Configuration c = new Configuration(this.conf);
11161116
TraceUtil.initTracer(c);
11171117
this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
1118-
option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1119-
option.getMasterClass(), option.getRsClass());
1118+
option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1119+
option.getNumReplicationServers(), option.getMasterClass(), option.getRsClass());
11201120
// Populate the master address configuration from mini cluster configuration.
11211121
conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
11221122
// Don't leave here till we've done a successful scan of the hbase:meta
@@ -1241,8 +1241,8 @@ public void restartHBaseCluster(StartMiniClusterOption option)
12411241
closeConnection();
12421242
this.hbaseCluster =
12431243
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
1244-
option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
1245-
option.getRsClass());
1244+
option.getNumRegionServers(), option.getRsPorts(), option.getNumReplicationServers(),
1245+
option.getMasterClass(), option.getRsClass());
12461246
// Don't leave here till we've done a successful scan of the hbase:meta
12471247
Connection conn = ConnectionFactory.createConnection(this.conf);
12481248
Table t = conn.getTable(TableName.META_TABLE_NAME);

hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
3333
import org.apache.hadoop.hbase.regionserver.HRegionServer;
3434
import org.apache.hadoop.hbase.regionserver.Region;
35+
import org.apache.hadoop.hbase.replication.HReplicationServer;
3536
import org.apache.hadoop.hbase.security.User;
3637
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
3738
import org.apache.hadoop.hbase.util.JVMClusterUtil;
3839
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
3940
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
41+
import org.apache.hadoop.hbase.util.JVMClusterUtil.ReplicationServerThread;
4042
import org.apache.hadoop.hbase.util.Threads;
4143
import org.apache.yetus.audience.InterfaceAudience;
4244
import org.slf4j.Logger;
@@ -86,31 +88,33 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers
8688
* @param numRegionServers initial number of region servers to start.
8789
*/
8890
public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
89-
Class<? extends HMaster> masterClass,
90-
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
91+
Class<? extends HMaster> masterClass,
92+
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
9193
throws IOException, InterruptedException {
92-
this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
94+
this(conf, numMasters, 0, numRegionServers, null, 0, masterClass, regionserverClass);
9395
}
9496

9597
/**
9698
* @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
9799
* restart where for sure the regionservers come up on same address+port (but
98100
* just with different startcode); by default mini hbase clusters choose new
99101
* arbitrary ports on each cluster start.
102+
* @param numReplicationServers initial number of replication servers to start.
100103
* @throws IOException
101104
* @throws InterruptedException
102105
*/
103106
public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
104-
int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
105-
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
107+
int numRegionServers, List<Integer> rsPorts, int numReplicationServers,
108+
Class<? extends HMaster> masterClass,
109+
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
106110
throws IOException, InterruptedException {
107111
super(conf);
108112

109113
// Hadoop 2
110114
CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
111115

112-
init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
113-
regionserverClass);
116+
init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, numReplicationServers,
117+
masterClass, regionserverClass);
114118
this.initialClusterStatus = getClusterMetrics();
115119
}
116120

@@ -227,7 +231,8 @@ public void run() {
227231
}
228232

229233
private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
230-
final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
234+
final int nRegionNodes, List<Integer> rsPorts, int numReplicationServers,
235+
Class<? extends HMaster> masterClass,
231236
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
232237
throws IOException, InterruptedException {
233238
try {
@@ -248,11 +253,17 @@ private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
248253
if (rsPorts != null) {
249254
rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
250255
}
251-
User user = HBaseTestingUtility.getDifferentUser(rsConf,
252-
".hfs."+index++);
256+
User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
253257
hbaseCluster.addRegionServer(rsConf, i, user);
254258
}
255259

260+
// manually add the replication servers as other users
261+
for (int i = 0; i < numReplicationServers; i++) {
262+
Configuration rsConf = HBaseConfiguration.create(conf);
263+
User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
264+
hbaseCluster.addReplicationServer(rsConf, i, user);
265+
}
266+
256267
hbaseCluster.startup();
257268
} catch (IOException e) {
258269
shutdown();
@@ -791,7 +802,7 @@ public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
791802

792803
/**
793804
* Grab a numbered region server of your choice.
794-
* @param serverNumber
805+
* @param serverNumber region server number
795806
* @return region server
796807
*/
797808
public HRegionServer getRegionServer(int serverNumber) {
@@ -805,6 +816,43 @@ public HRegionServer getRegionServer(ServerName serverName) {
805816
.findFirst().orElse(null);
806817
}
807818

819+
/**
820+
* @return Number of live replication servers in the cluster currently.
821+
*/
822+
public int getNumLiveReplicationServers() {
823+
return this.hbaseCluster.getLiveReplicationServers().size();
824+
}
825+
826+
/**
827+
* @return List of replication server threads.
828+
*/
829+
public List<JVMClusterUtil.ReplicationServerThread> getReplicationServerThreads() {
830+
return this.hbaseCluster.getReplicationServers();
831+
}
832+
833+
/**
834+
* @return List of live replication server threads (skips the aborted and the killed)
835+
*/
836+
public List<JVMClusterUtil.ReplicationServerThread> getLiveReplicationServerThreads() {
837+
return this.hbaseCluster.getLiveReplicationServers();
838+
}
839+
840+
/**
841+
* Grab a numbered replication server of your choice.
842+
* @param serverNumber
843+
* @return replication server
844+
*/
845+
public HReplicationServer getReplicationServer(int serverNumber) {
846+
return hbaseCluster.getReplicationServer(serverNumber);
847+
}
848+
849+
public HReplicationServer getReplicationServer(ServerName serverName) {
850+
return hbaseCluster.getReplicationServers().stream()
851+
.map(ReplicationServerThread::getReplicationServer)
852+
.filter(r -> r.getServerName().equals(serverName))
853+
.findFirst().orElse(null);
854+
}
855+
808856
public List<HRegion> getRegions(byte[] tableName) {
809857
return getRegions(TableName.valueOf(tableName));
810858
}

0 commit comments

Comments
 (0)