Skip to content

Commit 83dcae9

Browse files
ddupginfraio
authored andcommitted
HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)
Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent 1bcf389 commit 83dcae9

File tree

16 files changed

+290
-226
lines changed

16 files changed

+290
-226
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
6767
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
6868
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
69+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
6970

7071
/**
7172
* The implementation of AsyncConnection.
@@ -107,6 +108,8 @@ class AsyncConnectionImpl implements AsyncConnection {
107108

108109
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
109110
private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
111+
private final ConcurrentMap<String, ReplicationServerService.Interface> replStubs =
112+
new ConcurrentHashMap<>();
110113

111114
private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
112115

@@ -266,12 +269,25 @@ private AdminService.Interface createAdminServerStub(ServerName serverName) thro
266269
return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
267270
}
268271

272+
private ReplicationServerService.Interface createReplicationServerStub(ServerName serverName)
273+
throws IOException {
274+
return ReplicationServerService.newStub(
275+
rpcClient.createRpcChannel(serverName, user, rpcTimeout));
276+
}
277+
269278
AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
270279
return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
271280
getStubKey(AdminService.getDescriptor().getName(), serverName, hostnameCanChange),
272281
() -> createAdminServerStub(serverName));
273282
}
274283

284+
ReplicationServerService.Interface getReplicationServerStub(ServerName serverName)
285+
throws IOException {
286+
return ConcurrentMapUtils.computeIfAbsentEx(replStubs,
287+
getStubKey(ReplicationServerService.Interface.class.getSimpleName(), serverName,
288+
hostnameCanChange), () -> createReplicationServerStub(serverName));
289+
}
290+
275291
CompletableFuture<MasterService.Interface> getMasterStub() {
276292
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
277293
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
syntax = "proto2";
19+
package hbase.pb;
20+
21+
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
22+
option java_outer_classname = "ReplicationServerProtos";
23+
option java_generic_services = true;
24+
option java_generate_equals_and_hash = true;
25+
option optimize_for = SPEED;
26+
27+
import "server/region/Admin.proto";
28+
29+
service ReplicationServerService {
30+
rpc ReplicateWALEntry(ReplicateWALEntryRequest)
31+
returns(ReplicateWALEntryResponse);
32+
}

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import org.apache.hadoop.hbase.HConstants;
3131
import org.apache.hadoop.hbase.ServerName;
3232
import org.apache.hadoop.hbase.TableName;
33+
import org.apache.hadoop.hbase.client.AsyncAdmin;
34+
import org.apache.hadoop.hbase.client.AsyncConnection;
35+
import org.apache.hadoop.hbase.util.FutureUtils;
3336
import org.apache.yetus.audience.InterfaceAudience;
3437
import org.slf4j.Logger;
3538
import org.slf4j.LoggerFactory;
@@ -212,4 +215,20 @@ public static int getAdaptiveTimeout(final int initialValue, final int retries)
212215
}
213216
return initialValue * HConstants.RETRY_BACKOFF[ntries];
214217
}
218+
219+
/**
220+
* Check whether peer cluster supports replication offload.
221+
* @param peerConn connection for peer cluster
222+
* @return true if peer cluster version >= 3
223+
* @throws IOException exception
224+
*/
225+
public static boolean isPeerClusterSupportReplicationOffload(AsyncConnection peerConn)
226+
throws IOException {
227+
AsyncAdmin admin = peerConn.getAdmin();
228+
String version = FutureUtils.get(admin.getClusterMetrics()).getHBaseVersion();
229+
if (Integer.parseInt(version.split("\\.")[0]) >= 3) {
230+
return true;
231+
}
232+
return false;
233+
}
215234
}

hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public interface AsyncClusterConnection extends AsyncConnection {
4141
*/
4242
AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
4343

44+
/**
45+
* Get the admin service for the give replication server.
46+
*/
47+
AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName);
48+
4449
/**
4550
* Get the nonce generator for this connection.
4651
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
7070
return new AsyncRegionServerAdmin(serverName, this);
7171
}
7272

73+
@Override
74+
public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
75+
return new AsyncReplicationServerAdmin(serverName, this);
76+
}
77+
7378
@Override
7479
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
7580
boolean writeFlushWALMarker) {
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.CompletableFuture;
22+
23+
import org.apache.hadoop.hbase.CellScanner;
24+
import org.apache.hadoop.hbase.ServerName;
25+
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
26+
import org.apache.yetus.audience.InterfaceAudience;
27+
28+
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
29+
30+
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
31+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
32+
33+
/**
34+
* A simple wrapper of the {@link ReplicationServerService} for a replication server.
35+
* <p/>
36+
* Notice that there is no retry, and this is intentional.
37+
*/
38+
@InterfaceAudience.Private
39+
public class AsyncReplicationServerAdmin {
40+
41+
private final ServerName server;
42+
43+
private final AsyncConnectionImpl conn;
44+
45+
AsyncReplicationServerAdmin(ServerName server, AsyncConnectionImpl conn) {
46+
this.server = server;
47+
this.conn = conn;
48+
}
49+
50+
@FunctionalInterface
51+
private interface RpcCall<RESP> {
52+
void call(ReplicationServerService.Interface stub, HBaseRpcController controller,
53+
RpcCallback<RESP> done);
54+
}
55+
56+
private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
57+
CompletableFuture<RESP> future = new CompletableFuture<>();
58+
HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
59+
try {
60+
rpcCall.call(conn.getReplicationServerStub(server), controller, resp -> {
61+
if (controller.failed()) {
62+
future.completeExceptionally(controller.getFailed());
63+
} else {
64+
future.complete(resp);
65+
}
66+
});
67+
} catch (IOException e) {
68+
future.completeExceptionally(e);
69+
}
70+
return future;
71+
}
72+
73+
public CompletableFuture<AdminProtos.ReplicateWALEntryResponse> replicateWALEntry(
74+
AdminProtos.ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
75+
return call((stub, controller, done) -> {
76+
controller.setCallTimeout(timeout);
77+
stub.replicateWALEntry(controller, request, done);
78+
}, cellScanner);
79+
}
80+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hadoop.hbase.CellScanner;
2828
import org.apache.hadoop.hbase.PrivateCellUtil;
2929
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
30+
import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
3031
import org.apache.hadoop.hbase.io.SizedCellScanner;
3132
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
3233
import org.apache.hadoop.hbase.util.FutureUtils;
@@ -61,6 +62,23 @@ public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entri
6162
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
6263
}
6364

65+
/**
66+
* A helper to replicate a list of WAL entries using replication server admin
67+
* @param admin the replication server admin
68+
* @param entries Array of WAL entries to be replicated
69+
* @param replicationClusterId Id which will uniquely identify source cluster FS client
70+
* configurations in the replication configuration directory
71+
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
72+
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
73+
*/
74+
public static void replicateWALEntry(AsyncReplicationServerAdmin admin, Entry[] entries,
75+
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
76+
int timeout) throws IOException {
77+
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
78+
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
79+
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
80+
}
81+
6482
/**
6583
* Create a new ReplicateWALEntryRequest from a list of WAL entries
6684
* @param entries the WAL entries to be replicated

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@
257257
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
258258
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
259259
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
260+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
260261
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
261262
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
262263
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -1463,8 +1464,11 @@ protected List<BlockingServiceAndInterface> getServices() {
14631464
}
14641465
if (admin) {
14651466
bssi.add(new BlockingServiceAndInterface(
1466-
AdminService.newReflectiveBlockingService(this),
1467-
AdminService.BlockingInterface.class));
1467+
AdminService.newReflectiveBlockingService(this),
1468+
AdminService.BlockingInterface.class));
1469+
bssi.add(new BlockingServiceAndInterface(
1470+
ReplicationServerService.newReflectiveBlockingService(this),
1471+
ReplicationServerService.BlockingInterface.class));
14681472
}
14691473
return new org.apache.hbase.thirdparty.com.google.common.collect.
14701474
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@
2727
import java.util.concurrent.ThreadLocalRandom;
2828

2929
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.fs.Path;
3031
import org.apache.hadoop.hbase.HBaseConfiguration;
3132
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
3233
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
34+
import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
3335
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
36+
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
3437
import org.apache.hadoop.hbase.security.User;
38+
import org.apache.hadoop.hbase.wal.WAL;
3539
import org.apache.hadoop.hbase.zookeeper.ZKListener;
3640
import org.apache.yetus.audience.InterfaceAudience;
3741
import org.apache.hadoop.hbase.Abortable;
@@ -280,7 +284,7 @@ protected synchronized SinkPeer getReplicationSink() throws IOException {
280284
}
281285
ServerName serverName =
282286
sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
283-
return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
287+
return createSinkPeer(serverName);
284288
}
285289

286290
/**
@@ -343,21 +347,60 @@ public synchronized void nodeChildrenChanged(String path) {
343347
/**
344348
* Wraps a replication region server sink to provide the ability to identify it.
345349
*/
346-
public static class SinkPeer {
350+
public static abstract class SinkPeer {
347351
private ServerName serverName;
348-
private AsyncRegionServerAdmin regionServer;
349352

350-
public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
353+
public SinkPeer(ServerName serverName) {
351354
this.serverName = serverName;
352-
this.regionServer = regionServer;
353355
}
354356

355357
ServerName getServerName() {
356358
return serverName;
357359
}
358360

359-
public AsyncRegionServerAdmin getRegionServer() {
360-
return regionServer;
361+
public abstract void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
362+
Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException;
363+
}
364+
365+
public static class RegionServerSinkPeer extends SinkPeer {
366+
367+
private AsyncRegionServerAdmin regionServer;
368+
369+
public RegionServerSinkPeer(ServerName serverName,
370+
AsyncRegionServerAdmin replicationServer) {
371+
super(serverName);
372+
this.regionServer = replicationServer;
373+
}
374+
375+
public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
376+
Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
377+
ReplicationProtobufUtil.replicateWALEntry(regionServer, entries, replicationClusterId,
378+
sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
379+
}
380+
}
381+
382+
public static class ReplicationServerSinkPeer extends SinkPeer {
383+
384+
private AsyncReplicationServerAdmin replicationServer;
385+
386+
public ReplicationServerSinkPeer(ServerName serverName,
387+
AsyncReplicationServerAdmin replicationServer) {
388+
super(serverName);
389+
this.replicationServer = replicationServer;
390+
}
391+
392+
public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
393+
Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
394+
ReplicationProtobufUtil.replicateWALEntry(replicationServer, entries, replicationClusterId,
395+
sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
396+
}
397+
}
398+
399+
private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
400+
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
401+
return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
402+
} else {
403+
return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
361404
}
362405
}
363406
}

0 commit comments

Comments
 (0)