Skip to content

Commit 28f025d

Browse files
committed
HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364)
Signed-off-by: meiyi <[email protected]>
1 parent 788772c commit 28f025d

File tree

11 files changed

+194
-133
lines changed

11 files changed

+194
-133
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,8 @@ public enum OperationStatusCode {
979979
/*
980980
* cluster replication constants.
981981
*/
982+
public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled";
983+
public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;
982984
public static final String
983985
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
984986
public static final String

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ public interface ReplicationListener {
3333
* A region server has been removed from the local cluster
3434
* @param regionServer the removed region server
3535
*/
36-
public void regionServerRemoved(String regionServer);
36+
void regionServerRemoved(String regionServer);
3737
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import java.util.concurrent.atomic.AtomicLong;
21+
22+
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
23+
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
26+
/**
27+
* Used to control all replication sources inside one RegionServer or ReplicationServer.
28+
* Used by {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSource} or
29+
* {@link RecoveredReplicationSource}.
30+
*/
31+
@InterfaceAudience.Private
32+
public interface ReplicationSourceController {
33+
34+
/**
35+
* Returns the maximum size in bytes of edits held in memory which are pending replication
36+
* across all sources inside this RegionServer or ReplicationServer.
37+
*/
38+
long getTotalBufferLimit();
39+
40+
AtomicLong getTotalBufferUsed();
41+
42+
MetricsReplicationGlobalSourceSource getGlobalMetrics();
43+
44+
/**
45+
* Call this when the recovered replication source replicated all WALs.
46+
*/
47+
void finishRecoveredSource(RecoveredReplicationSource src);
48+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.hbase.ServerName;
3131
import org.apache.hadoop.hbase.replication.ReplicationPeer;
3232
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
33+
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
3334
import org.apache.hadoop.hbase.util.CommonFSUtils;
3435
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
3536
import org.apache.yetus.audience.InterfaceAudience;
@@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends ReplicationSource {
4546

4647
private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
4748

48-
private Path walDir;
49-
5049
private String actualPeerId;
5150

5251
@Override
53-
public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
54-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
55-
String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
56-
MetricsSource metrics) throws IOException {
57-
super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
58-
clusterId, walFileLengthProvider, metrics);
59-
this.walDir = walDir;
52+
public void init(Configuration conf, FileSystem fs, Path walDir,
53+
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
54+
ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId,
55+
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
56+
super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server,
57+
peerClusterZnode, clusterId, walFileLengthProvider, metrics);
6058
this.actualPeerId = this.replicationQueueInfo.getPeerId();
6159
}
6260

@@ -149,7 +147,7 @@ private Path getReplSyncUpPath(Path path) throws IOException {
149147
void tryFinish() {
150148
if (workerThreads.isEmpty()) {
151149
this.getSourceMetrics().clear();
152-
manager.finishRecoveredSource(this);
150+
controller.finishRecoveredSource(this);
153151
}
154152
}
155153

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.hadoop.hbase.replication.ReplicationPeer;
6262
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
6363
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
64+
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
6465
import org.apache.hadoop.hbase.replication.ReplicationUtils;
6566
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
6667
import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -101,8 +102,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
101102
protected Configuration conf;
102103
protected ReplicationQueueInfo replicationQueueInfo;
103104

104-
// The manager of all sources to which we ping back our progress
105-
ReplicationSourceManager manager;
105+
protected Path walDir;
106+
107+
protected ReplicationSourceController controller;
106108
// Should we stop everything?
107109
protected Server server;
108110
// How long should we sleep for each retry
@@ -187,23 +189,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
187189
this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
188190
}
189191

190-
/**
191-
* Instantiation method used by region servers
192-
* @param conf configuration to use
193-
* @param fs file system to use
194-
* @param manager replication manager to ping to
195-
* @param server the server for this region server
196-
* @param queueId the id of our replication queue
197-
* @param clusterId unique UUID for the cluster
198-
* @param metrics metrics for replication source
199-
*/
200192
@Override
201-
public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
202-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
203-
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
204-
MetricsSource metrics) throws IOException {
193+
public void init(Configuration conf, FileSystem fs, Path walDir,
194+
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
195+
ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
196+
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
205197
this.server = server;
206198
this.conf = HBaseConfiguration.create(conf);
199+
this.walDir = walDir;
207200
this.waitOnEndpointSeconds =
208201
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
209202
decorateConf();
@@ -214,7 +207,7 @@ public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSour
214207
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
215208
this.queueStorage = queueStorage;
216209
this.replicationPeer = replicationPeer;
217-
this.manager = manager;
210+
this.controller = overallController;
218211
this.fs = fs;
219212
this.metrics = metrics;
220213
this.clusterId = clusterId;
@@ -782,9 +775,9 @@ public void postShipEdits(List<Entry> entries, int batchSize) {
782775
throttler.addPushSize(batchSize);
783776
}
784777
totalReplicatedEdits.addAndGet(entries.size());
785-
long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
778+
long newBufferUsed = controller.getTotalBufferUsed().addAndGet(-batchSize);
786779
// Record the new buffer usage
787-
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
780+
controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
788781
}
789782

790783
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
3333
import org.apache.hadoop.hbase.replication.ReplicationPeer;
3434
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
35+
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
3536
import org.apache.hadoop.hbase.wal.WAL.Entry;
3637
import org.apache.yetus.audience.InterfaceAudience;
3738

@@ -44,14 +45,22 @@ public interface ReplicationSourceInterface {
4445
/**
4546
* Initializer for the source
4647
*
47-
* @param conf the configuration to use
48-
* @param fs the file system to use
49-
* @param server the server for this region server
50-
*/
51-
void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
52-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
53-
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
54-
MetricsSource metrics) throws IOException;
48+
* @param conf configuration to use
49+
* @param fs file system to use
50+
* @param walDir the directory where the WAL is located
51+
* @param overallController the overall controller of all replication sources
52+
* @param queueStorage the replication queue storage
53+
* @param replicationPeer the replication peer
54+
* @param server the server which start and run this replication source
55+
* @param queueId the id of our replication queue
56+
* @param clusterId unique UUID for the cluster
57+
* @param walFileLengthProvider used to get the WAL length
58+
* @param metrics metrics for this replication source
59+
*/
60+
void init(Configuration conf, FileSystem fs, Path walDir,
61+
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
62+
ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
63+
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
5564

5665
/**
5766
* Add a log to the list of logs to replicate

0 commit comments

Comments
 (0)