Skip to content

Commit 18ae733

Browse files
authored
HBASE-27806 Support dynamic reinitializing replication peer storage (#5195)
Signed-off-by: Liangjun He <[email protected]>
1 parent ced4ff1 commit 18ae733

File tree

8 files changed

+204
-26
lines changed

8 files changed

+204
-26
lines changed

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323
import java.util.Set;
2424
import org.apache.hadoop.conf.Configuration;
2525
import org.apache.hadoop.hbase.TableName;
26+
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
2627
import org.apache.hadoop.hbase.util.Pair;
2728
import org.apache.yetus.audience.InterfaceAudience;
2829

2930
@InterfaceAudience.Private
30-
public class ReplicationPeerImpl implements ReplicationPeer {
31+
public class ReplicationPeerImpl implements ReplicationPeer, ConfigurationObserver {
3132

32-
private final Configuration conf;
33+
private volatile Configuration conf;
3334

3435
private final String id;
3536

@@ -151,4 +152,9 @@ public long getPeerBandwidth() {
151152
public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
152153
this.peerConfigListeners.add(listener);
153154
}
155+
156+
@Override
157+
public void onConfigurationChange(Configuration conf) {
158+
this.conf = conf;
159+
}
154160
}

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,38 @@
2424
import java.util.concurrent.ConcurrentMap;
2525
import org.apache.hadoop.conf.Configuration;
2626
import org.apache.hadoop.fs.FileSystem;
27+
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
2728
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
2829
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
2930
import org.apache.yetus.audience.InterfaceAudience;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3033

3134
/**
3235
* This provides an class for maintaining a set of peer clusters. These peers are remote slave
3336
* clusters that data is replicated to.
37+
* <p>
38+
* We implement {@link ConfigurationObserver} mainly for recreating the
39+
* {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without
40+
* restarting the region server.
3441
*/
3542
@InterfaceAudience.Private
36-
public class ReplicationPeers {
43+
public class ReplicationPeers implements ConfigurationObserver {
3744

38-
private final Configuration conf;
45+
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
46+
47+
private volatile Configuration conf;
3948

4049
// Map of peer clusters keyed by their id
4150
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
42-
private final ReplicationPeerStorage peerStorage;
51+
private final FileSystem fs;
52+
private final ZKWatcher zookeeper;
53+
private volatile ReplicationPeerStorage peerStorage;
4354

4455
ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
4556
this.conf = conf;
57+
this.fs = fs;
58+
this.zookeeper = zookeeper;
4659
this.peerCache = new ConcurrentHashMap<>();
4760
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
4861
}
@@ -145,4 +158,18 @@ private ReplicationPeerImpl createPeer(String peerId) throws ReplicationExceptio
145158
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
146159
peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
147160
}
161+
162+
@Override
163+
public void onConfigurationChange(Configuration conf) {
164+
this.conf = conf;
165+
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
166+
for (ReplicationPeerImpl peer : peerCache.values()) {
167+
try {
168+
peer.onConfigurationChange(
169+
ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf));
170+
} catch (ReplicationException e) {
171+
LOG.warn("failed to reload configuration for peer {}", peer.getId(), e);
172+
}
173+
}
174+
}
148175
}

hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,12 @@ public MetaRegionLocationCache getMetaRegionLocationCache() {
605605
return this.metaRegionLocationCache;
606606
}
607607

608+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
609+
allowedOnPath = ".*/src/test/.*")
610+
public ConfigurationManager getConfigurationManager() {
611+
return configurationManager;
612+
}
613+
608614
/**
609615
* Reload the configuration from disk.
610616
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@
107107
import org.apache.hadoop.hbase.client.TableDescriptor;
108108
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
109109
import org.apache.hadoop.hbase.client.TableState;
110-
import org.apache.hadoop.hbase.conf.ConfigurationManager;
111110
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
112111
import org.apache.hadoop.hbase.exceptions.DeserializationException;
113112
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
@@ -791,6 +790,7 @@ private void initializeZKBasedSystemTrackers()
791790

792791
this.replicationPeerManager =
793792
ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId);
793+
this.configurationManager.registerObserver(replicationPeerManager);
794794
this.replicationPeerModificationStateStore =
795795
new ReplicationPeerModificationStateStore(masterRegion);
796796

@@ -4293,12 +4293,6 @@ static void setDisableBalancerChoreForTest(boolean disable) {
42934293
disableBalancerChoreForTest = disable;
42944294
}
42954295

4296-
@RestrictedApi(explanation = "Should only be called in tests", link = "",
4297-
allowedOnPath = ".*/src/test/.*")
4298-
public ConfigurationManager getConfigurationManager() {
4299-
return configurationManager;
4300-
}
4301-
43024296
private void setQuotasObserver(Configuration conf) {
43034297
// Add the Observer to delete quotas on table deletion before starting all CPs by
43044298
// default with quota support, avoiding if user specifically asks to not load this Observer.

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hbase.ServerName;
4242
import org.apache.hadoop.hbase.TableName;
4343
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
44+
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
4445
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
4546
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
4647
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -69,13 +70,16 @@
6970
* Manages and performs all replication admin operations.
7071
* <p>
7172
* Used to add/remove a replication peer.
73+
* <p>
74+
* Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for
75+
* supporting migrating across different replication peer storages without restarting master.
7276
*/
7377
@InterfaceAudience.Private
74-
public class ReplicationPeerManager {
78+
public class ReplicationPeerManager implements ConfigurationObserver {
7579

7680
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class);
7781

78-
private final ReplicationPeerStorage peerStorage;
82+
private volatile ReplicationPeerStorage peerStorage;
7983

8084
private final ReplicationQueueStorage queueStorage;
8185

@@ -94,10 +98,18 @@ public class ReplicationPeerManager {
9498

9599
private final String clusterId;
96100

97-
private final Configuration conf;
101+
private volatile Configuration conf;
102+
103+
// for dynamic recreating ReplicationPeerStorage.
104+
private final FileSystem fs;
105+
106+
private final ZKWatcher zk;
98107

99-
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
100-
ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
108+
ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage,
109+
ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers,
110+
Configuration conf, String clusterId) {
111+
this.fs = fs;
112+
this.zk = zk;
101113
this.peerStorage = peerStorage;
102114
this.queueStorage = queueStorage;
103115
this.peers = peers;
@@ -582,7 +594,7 @@ public static ReplicationPeerManager create(FileSystem fs, ZKWatcher zk, Configu
582594
SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
583595
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
584596
}
585-
return new ReplicationPeerManager(peerStorage,
597+
return new ReplicationPeerManager(fs, zk, peerStorage,
586598
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
587599
}
588600

@@ -604,4 +616,10 @@ public boolean tryAcquireSyncReplicationPeerLock() {
604616
public void releaseSyncReplicationPeerLock() {
605617
syncReplicationPeerLock.release();
606618
}
619+
620+
@Override
621+
public void onConfigurationChange(Configuration conf) {
622+
this.conf = conf;
623+
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
624+
}
607625
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
100100
import org.apache.hadoop.hbase.client.locking.EntityLock;
101101
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
102-
import org.apache.hadoop.hbase.conf.ConfigurationManager;
102+
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
103103
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
104104
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
105105
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -2065,6 +2065,14 @@ private void initializeThreads() {
20652065
}
20662066

20672067
private void registerConfigurationObservers() {
2068+
// Register Replication if possible, as now we support recreating replication peer storage, for
2069+
// migrating across different replication peer storages online
2070+
if (replicationSourceHandler instanceof ConfigurationObserver) {
2071+
configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
2072+
}
2073+
if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
2074+
configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
2075+
}
20682076
// Registering the compactSplitThread object with the ConfigurationManager.
20692077
configurationManager.registerObserver(this.compactSplitThread);
20702078
configurationManager.registerObserver(this.rpcServices);
@@ -3315,11 +3323,6 @@ public Optional<MobFileCache> getMobFileCache() {
33153323
return Optional.ofNullable(this.mobFileCache);
33163324
}
33173325

3318-
/** Returns : Returns the ConfigurationManager object for testing purposes. */
3319-
ConfigurationManager getConfigurationManager() {
3320-
return configurationManager;
3321-
}
3322-
33233326
CacheEvictionStats clearRegionBlockCache(Region region) {
33243327
long evictedBlocks = 0;
33253328

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.hadoop.hbase.Server;
3232
import org.apache.hadoop.hbase.Stoppable;
3333
import org.apache.hadoop.hbase.TableName;
34+
import org.apache.hadoop.hbase.conf.ConfigurationManager;
35+
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
3436
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
3537
import org.apache.hadoop.hbase.replication.ReplicationFactory;
3638
import org.apache.hadoop.hbase.replication.ReplicationPeers;
@@ -50,15 +52,18 @@
5052

5153
/**
5254
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
55+
* <p>
56+
* Implement {@link PropagatingConfigurationObserver} mainly for registering
57+
* {@link ReplicationPeers}, so we can recreating the replication peer storage.
5358
*/
5459
@InterfaceAudience.Private
55-
public class Replication implements ReplicationSourceService {
60+
public class Replication implements ReplicationSourceService, PropagatingConfigurationObserver {
5661
private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
5762
private boolean isReplicationForBulkLoadDataEnabled;
5863
private ReplicationSourceManager replicationManager;
5964
private ReplicationQueueStorage queueStorage;
6065
private ReplicationPeers replicationPeers;
61-
private Configuration conf;
66+
private volatile Configuration conf;
6267
private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
6368
// Hosting server
6469
private Server server;
@@ -229,4 +234,19 @@ public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
229234
public ReplicationPeers getReplicationPeers() {
230235
return replicationPeers;
231236
}
237+
238+
@Override
239+
public void onConfigurationChange(Configuration conf) {
240+
this.conf = conf;
241+
}
242+
243+
@Override
244+
public void registerChildren(ConfigurationManager manager) {
245+
manager.registerObserver(replicationPeers);
246+
}
247+
248+
@Override
249+
public void deregisterChildren(ConfigurationManager manager) {
250+
manager.deregisterObserver(replicationPeers);
251+
}
232252
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.empty;
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertNotNull;
24+
25+
import java.io.IOException;
26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.hbase.HBaseClassTestRule;
28+
import org.apache.hadoop.hbase.HBaseTestingUtil;
29+
import org.apache.hadoop.hbase.client.Admin;
30+
import org.apache.hadoop.hbase.testclassification.LargeTests;
31+
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
32+
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
33+
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
34+
import org.apache.hadoop.util.ToolRunner;
35+
import org.junit.AfterClass;
36+
import org.junit.BeforeClass;
37+
import org.junit.ClassRule;
38+
import org.junit.Test;
39+
import org.junit.experimental.categories.Category;
40+
41+
@Category({ ReplicationTests.class, LargeTests.class })
42+
public class TestMigrateRepliationPeerStorageOnline {
43+
44+
@ClassRule
45+
public static final HBaseClassTestRule CLASS_RULE =
46+
HBaseClassTestRule.forClass(TestMigrateRepliationPeerStorageOnline.class);
47+
48+
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
49+
50+
@BeforeClass
51+
public static void setUp() throws Exception {
52+
// use zookeeper first, and then migrate to filesystem
53+
UTIL.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
54+
ReplicationPeerStorageType.ZOOKEEPER.name());
55+
UTIL.startMiniCluster(1);
56+
}
57+
58+
@AfterClass
59+
public static void tearDown() throws IOException {
60+
UTIL.shutdownMiniCluster();
61+
}
62+
63+
@Test
64+
public void testMigrate() throws Exception {
65+
Admin admin = UTIL.getAdmin();
66+
ReplicationPeerConfig rpc =
67+
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
68+
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
69+
admin.addReplicationPeer("1", rpc);
70+
71+
// disable peer modification
72+
admin.replicationPeerModificationSwitch(false, true);
73+
74+
// migrate replication peer data
75+
Configuration conf = new Configuration(UTIL.getConfiguration());
76+
assertEquals(0, ToolRunner.run(conf, new CopyReplicationPeers(conf),
77+
new String[] { "zookeeper", "filesystem" }));
78+
conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
79+
ReplicationPeerStorageType.FILESYSTEM.name());
80+
// confirm that we have copied the data
81+
ReplicationPeerStorage fsPeerStorage = ReplicationStorageFactory
82+
.getReplicationPeerStorage(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), conf);
83+
assertNotNull(fsPeerStorage.getPeerConfig("1"));
84+
85+
for (MasterThread mt : UTIL.getMiniHBaseCluster().getMasterThreads()) {
86+
Configuration newConf = new Configuration(mt.getMaster().getConfiguration());
87+
newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
88+
ReplicationPeerStorageType.FILESYSTEM.name());
89+
mt.getMaster().getConfigurationManager().notifyAllObservers(newConf);
90+
}
91+
for (RegionServerThread rt : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
92+
Configuration newConf = new Configuration(rt.getRegionServer().getConfiguration());
93+
newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
94+
ReplicationPeerStorageType.FILESYSTEM.name());
95+
rt.getRegionServer().getConfigurationManager().notifyAllObservers(newConf);
96+
}
97+
98+
admin.replicationPeerModificationSwitch(true);
99+
admin.removeReplicationPeer("1");
100+
101+
// confirm that we will operation on the new peer storage
102+
assertThat(fsPeerStorage.listPeerIds(), empty());
103+
}
104+
}

0 commit comments

Comments
 (0)