Skip to content

Commit c302a9e

Browse files
ddupginfraio
authored andcommitted
HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)
Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent 2e95e75 commit c302a9e

File tree

15 files changed

+294
-230
lines changed

15 files changed

+294
-230
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
6767
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
6868
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
69+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
6970

7071
/**
7172
* The implementation of AsyncConnection.
@@ -107,6 +108,8 @@ class AsyncConnectionImpl implements AsyncConnection {
107108

108109
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
109110
private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
111+
private final ConcurrentMap<String, ReplicationServerService.Interface> replStubs =
112+
new ConcurrentHashMap<>();
110113

111114
private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
112115

@@ -266,12 +269,25 @@ private AdminService.Interface createAdminServerStub(ServerName serverName) thro
266269
return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
267270
}
268271

272+
private ReplicationServerService.Interface createReplicationServerStub(ServerName serverName)
273+
throws IOException {
274+
return ReplicationServerService.newStub(
275+
rpcClient.createRpcChannel(serverName, user, rpcTimeout));
276+
}
277+
269278
AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
270279
return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
271280
getStubKey(AdminService.Interface.class.getSimpleName(), serverName, hostnameCanChange),
272281
() -> createAdminServerStub(serverName));
273282
}
274283

284+
ReplicationServerService.Interface getReplicationServerStub(ServerName serverName)
285+
throws IOException {
286+
return ConcurrentMapUtils.computeIfAbsentEx(replStubs,
287+
getStubKey(ReplicationServerService.Interface.class.getSimpleName(), serverName,
288+
hostnameCanChange), () -> createReplicationServerStub(serverName));
289+
}
290+
275291
CompletableFuture<MasterService.Interface> getMasterStub() {
276292
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
277293
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
syntax = "proto2";
19+
package hbase.pb;
20+
21+
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
22+
option java_outer_classname = "ReplicationServerProtos";
23+
option java_generic_services = true;
24+
option java_generate_equals_and_hash = true;
25+
option optimize_for = SPEED;
26+
27+
import "server/region/Admin.proto";
28+
29+
service ReplicationServerService {
30+
rpc ReplicateWALEntry(ReplicateWALEntryRequest)
31+
returns(ReplicateWALEntryResponse);
32+
}

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import org.apache.hadoop.hbase.HConstants;
3131
import org.apache.hadoop.hbase.ServerName;
3232
import org.apache.hadoop.hbase.TableName;
33+
import org.apache.hadoop.hbase.client.AsyncAdmin;
34+
import org.apache.hadoop.hbase.client.AsyncConnection;
35+
import org.apache.hadoop.hbase.util.FutureUtils;
3336
import org.apache.yetus.audience.InterfaceAudience;
3437
import org.slf4j.Logger;
3538
import org.slf4j.LoggerFactory;
@@ -212,4 +215,20 @@ public static int getAdaptiveTimeout(final int initialValue, final int retries)
212215
}
213216
return initialValue * HConstants.RETRY_BACKOFF[ntries];
214217
}
218+
219+
/**
220+
* Check whether peer cluster supports replication offload.
221+
* @param peerConn connection for peer cluster
222+
* @return true if peer cluster version >= 3
223+
* @throws IOException exception
224+
*/
225+
public static boolean isPeerClusterSupportReplicationOffload(AsyncConnection peerConn)
226+
throws IOException {
227+
AsyncAdmin admin = peerConn.getAdmin();
228+
String version = FutureUtils.get(admin.getClusterMetrics()).getHBaseVersion();
229+
if (Integer.parseInt(version.split("\\.")[0]) >= 3) {
230+
return true;
231+
}
232+
return false;
233+
}
215234
}

hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public interface AsyncClusterConnection extends AsyncConnection {
4141
*/
4242
AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
4343

44+
/**
45+
* Get the admin service for the give replication server.
46+
*/
47+
AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName);
48+
4449
/**
4550
* Get the nonce generator for this connection.
4651
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
7070
return new AsyncRegionServerAdmin(serverName, this);
7171
}
7272

73+
@Override
74+
public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
75+
return new AsyncReplicationServerAdmin(serverName, this);
76+
}
77+
7378
@Override
7479
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
7580
boolean writeFlushWALMarker) {
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.CompletableFuture;
22+
23+
import org.apache.hadoop.hbase.CellScanner;
24+
import org.apache.hadoop.hbase.ServerName;
25+
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
26+
import org.apache.yetus.audience.InterfaceAudience;
27+
28+
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
29+
30+
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
31+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
32+
33+
/**
34+
* A simple wrapper of the {@link ReplicationServerService} for a replication server.
35+
* <p/>
36+
* Notice that there is no retry, and this is intentional.
37+
*/
38+
@InterfaceAudience.Private
39+
public class AsyncReplicationServerAdmin {
40+
41+
private final ServerName server;
42+
43+
private final AsyncConnectionImpl conn;
44+
45+
AsyncReplicationServerAdmin(ServerName server, AsyncConnectionImpl conn) {
46+
this.server = server;
47+
this.conn = conn;
48+
}
49+
50+
@FunctionalInterface
51+
private interface RpcCall<RESP> {
52+
void call(ReplicationServerService.Interface stub, HBaseRpcController controller,
53+
RpcCallback<RESP> done);
54+
}
55+
56+
private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
57+
CompletableFuture<RESP> future = new CompletableFuture<>();
58+
HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
59+
try {
60+
rpcCall.call(conn.getReplicationServerStub(server), controller, resp -> {
61+
if (controller.failed()) {
62+
future.completeExceptionally(controller.getFailed());
63+
} else {
64+
future.complete(resp);
65+
}
66+
});
67+
} catch (IOException e) {
68+
future.completeExceptionally(e);
69+
}
70+
return future;
71+
}
72+
73+
public CompletableFuture<AdminProtos.ReplicateWALEntryResponse> replicateWALEntry(
74+
AdminProtos.ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
75+
return call((stub, controller, done) -> {
76+
controller.setCallTimeout(timeout);
77+
stub.replicateWALEntry(controller, request, done);
78+
}, cellScanner);
79+
}
80+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hadoop.hbase.CellScanner;
2828
import org.apache.hadoop.hbase.PrivateCellUtil;
2929
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
30+
import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
3031
import org.apache.hadoop.hbase.io.SizedCellScanner;
3132
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
3233
import org.apache.hadoop.hbase.util.FutureUtils;
@@ -61,6 +62,23 @@ public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entri
6162
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
6263
}
6364

65+
/**
66+
* A helper to replicate a list of WAL entries using replication server admin
67+
* @param admin the replication server admin
68+
* @param entries Array of WAL entries to be replicated
69+
* @param replicationClusterId Id which will uniquely identify source cluster FS client
70+
* configurations in the replication configuration directory
71+
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
72+
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
73+
*/
74+
public static void replicateWALEntry(AsyncReplicationServerAdmin admin, Entry[] entries,
75+
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
76+
int timeout) throws IOException {
77+
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
78+
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
79+
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
80+
}
81+
6482
/**
6583
* Create a new ReplicateWALEntryRequest from a list of WAL entries
6684
* @param entries the WAL entries to be replicated

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@
254254
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
255255
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
256256
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
257+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
257258
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
258259
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
259260
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -267,7 +268,7 @@
267268
@SuppressWarnings("deprecation")
268269
public class RSRpcServices implements HBaseRPCErrorHandler,
269270
AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
270-
ConfigurationObserver {
271+
ConfigurationObserver, ReplicationServerService.BlockingInterface {
271272
protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
272273

273274
/** RPC scheduler to use for the region server. */
@@ -1487,8 +1488,11 @@ protected List<BlockingServiceAndInterface> getServices() {
14871488
}
14881489
if (admin) {
14891490
bssi.add(new BlockingServiceAndInterface(
1490-
AdminService.newReflectiveBlockingService(this),
1491-
AdminService.BlockingInterface.class));
1491+
AdminService.newReflectiveBlockingService(this),
1492+
AdminService.BlockingInterface.class));
1493+
bssi.add(new BlockingServiceAndInterface(
1494+
ReplicationServerService.newReflectiveBlockingService(this),
1495+
ReplicationServerService.BlockingInterface.class));
14921496
}
14931497
return new org.apache.hbase.thirdparty.com.google.common.collect.
14941498
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();

0 commit comments

Comments
 (0)