Skip to content

Commit 076e2f1

Browse files
committed
HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource
1 parent e053a00 commit 076e2f1

File tree

11 files changed

+198
-135
lines changed

11 files changed

+198
-135
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,8 @@ public enum OperationStatusCode {
979979
/*
980980
* cluster replication constants.
981981
*/
982+
public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled";
983+
public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;
982984
public static final String
983985
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
984986
public static final String

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ public interface ReplicationListener {
3333
* A region server has been removed from the local cluster
3434
* @param regionServer the removed region server
3535
*/
36-
public void regionServerRemoved(String regionServer);
36+
void regionServerRemoved(String regionServer);
3737
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.apache.hadoop.hbase.replication;
2+
3+
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
4+
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
5+
import org.apache.yetus.audience.InterfaceAudience;
6+
7+
import java.util.concurrent.atomic.AtomicLong;
8+
9+
/**
10+
* Used to control all replication sources inside one RegionServer or ReplicationServer.
11+
* Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
12+
*/
13+
@InterfaceAudience.Private
14+
public interface ReplicationSourceOverallController {
15+
16+
/**
17+
* Returns the maximum size in bytes of edits held in memory which are pending replication
18+
* across all sources inside this RegionServer or ReplicationServer.
19+
*/
20+
long getTotalBufferLimit();
21+
22+
AtomicLong getTotalBufferUsed();
23+
24+
MetricsReplicationGlobalSourceSource getGlobalMetrics();
25+
26+
/**
27+
* Called this when the recovered replication source replicated all WALs.
28+
* @param src
29+
*/
30+
void finishRecoveredSource(RecoveredReplicationSource src);
31+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.hbase.ServerName;
3131
import org.apache.hadoop.hbase.replication.ReplicationPeer;
3232
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
33+
import org.apache.hadoop.hbase.replication.ReplicationSourceOverallController;
3334
import org.apache.hadoop.hbase.util.CommonFSUtils;
3435
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
3536
import org.apache.yetus.audience.InterfaceAudience;
@@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends ReplicationSource {
4546

4647
private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
4748

48-
private Path walDir;
49-
5049
private String actualPeerId;
5150

5251
@Override
53-
public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
54-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
55-
String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
56-
MetricsSource metrics) throws IOException {
57-
super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
58-
clusterId, walFileLengthProvider, metrics);
59-
this.walDir = walDir;
52+
public void init(Configuration conf, FileSystem fs, Path walDir,
53+
ReplicationSourceOverallController overallController, ReplicationQueueStorage queueStorage,
54+
ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId,
55+
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
56+
super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server,
57+
peerClusterZnode, clusterId, walFileLengthProvider, metrics);
6058
this.actualPeerId = this.replicationQueueInfo.getPeerId();
6159
}
6260

@@ -149,7 +147,7 @@ private Path getReplSyncUpPath(Path path) throws IOException {
149147
void tryFinish() {
150148
if (workerThreads.isEmpty()) {
151149
this.getSourceMetrics().clear();
152-
manager.finishRecoveredSource(this);
150+
overallController.finishRecoveredSource(this);
153151
}
154152
}
155153

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.hadoop.hbase.replication.ReplicationPeer;
6060
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
6161
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
62+
import org.apache.hadoop.hbase.replication.ReplicationSourceOverallController;
6263
import org.apache.hadoop.hbase.replication.ReplicationUtils;
6364
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
6465
import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -99,8 +100,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
99100
protected Configuration conf;
100101
protected ReplicationQueueInfo replicationQueueInfo;
101102

102-
// The manager of all sources to which we ping back our progress
103-
ReplicationSourceManager manager;
103+
protected Path walDir;
104+
105+
protected ReplicationSourceOverallController overallController;
104106
// Should we stop everything?
105107
protected Server server;
106108
// How long should we sleep for each retry
@@ -177,23 +179,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
177179
this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
178180
}
179181

180-
/**
181-
* Instantiation method used by region servers
182-
* @param conf configuration to use
183-
* @param fs file system to use
184-
* @param manager replication manager to ping to
185-
* @param server the server for this region server
186-
* @param queueId the id of our replication queue
187-
* @param clusterId unique UUID for the cluster
188-
* @param metrics metrics for replication source
189-
*/
190182
@Override
191-
public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
192-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
193-
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
194-
MetricsSource metrics) throws IOException {
183+
public void init(Configuration conf, FileSystem fs, Path walDir,
184+
ReplicationSourceOverallController overallController, ReplicationQueueStorage queueStorage,
185+
ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
186+
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
195187
this.server = server;
196188
this.conf = HBaseConfiguration.create(conf);
189+
this.walDir = walDir;
197190
this.waitOnEndpointSeconds =
198191
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
199192
decorateConf();
@@ -204,7 +197,7 @@ public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSour
204197
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
205198
this.queueStorage = queueStorage;
206199
this.replicationPeer = replicationPeer;
207-
this.manager = manager;
200+
this.overallController = overallController;
208201
this.fs = fs;
209202
this.metrics = metrics;
210203
this.clusterId = clusterId;
@@ -217,6 +210,7 @@ public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSour
217210
currentBandwidth = getCurrentBandwidth();
218211
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
219212
this.walFileLengthProvider = walFileLengthProvider;
213+
220214
LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
221215
replicationPeer.getId(), this.currentBandwidth);
222216
}
@@ -734,9 +728,9 @@ public void postShipEdits(List<Entry> entries, int batchSize) {
734728
throttler.addPushSize(batchSize);
735729
}
736730
totalReplicatedEdits.addAndGet(entries.size());
737-
long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
731+
long newBufferUsed = overallController.getTotalBufferUsed().addAndGet(-batchSize);
738732
// Record the new buffer usage
739-
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
733+
overallController.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
740734
}
741735

742736
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
3333
import org.apache.hadoop.hbase.replication.ReplicationPeer;
3434
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
35+
import org.apache.hadoop.hbase.replication.ReplicationSourceOverallController;
3536
import org.apache.hadoop.hbase.wal.WAL.Entry;
3637
import org.apache.yetus.audience.InterfaceAudience;
3738

@@ -44,14 +45,22 @@ public interface ReplicationSourceInterface {
4445
/**
4546
* Initializer for the source
4647
*
47-
* @param conf the configuration to use
48-
* @param fs the file system to use
49-
* @param server the server for this region server
50-
*/
51-
void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
52-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
53-
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
54-
MetricsSource metrics) throws IOException;
48+
* @param conf configuration to use
49+
* @param fs file system to use
50+
* @param walDir the directory where the WAL is located
51+
* @param overallController the overall controller of all replication sources
52+
* @param queueStorage the replication queue storage
53+
* @param replicationPeer the replication peer
54+
* @param server the server which start and run this replication source
55+
* @param queueId the id of our replication queue
56+
* @param clusterId unique UUID for the cluster
57+
* @param walFileLengthProvider used to get the WAL length
58+
* @param metrics metrics for this replication source
59+
*/
60+
void init(Configuration conf, FileSystem fs, Path walDir,
61+
ReplicationSourceOverallController overallController, ReplicationQueueStorage queueStorage,
62+
ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
63+
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
5564

5665
/**
5766
* Add a log to the list of logs to replicate

0 commit comments

Comments
 (0)