Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;

/**
* The implementation of AsyncConnection.
Expand Down Expand Up @@ -101,6 +102,8 @@ class AsyncConnectionImpl implements AsyncConnection {

private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ReplicationServerService.Interface> replStubs =
new ConcurrentHashMap<>();

private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();

Expand Down Expand Up @@ -278,12 +281,25 @@ private AdminService.Interface createAdminServerStub(ServerName serverName) thro
return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}

private ReplicationServerService.Interface createReplicationServerStub(ServerName serverName)
throws IOException {
return ReplicationServerService.newStub(
rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}

AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
getStubKey(AdminService.getDescriptor().getName(), serverName),
() -> createAdminServerStub(serverName));
}

ReplicationServerService.Interface getReplicationServerStub(ServerName serverName)
throws IOException {
return ConcurrentMapUtils.computeIfAbsentEx(replStubs,
getStubKey(ReplicationServerService.Interface.class.getSimpleName(), serverName),
() -> createReplicationServerStub(serverName));
}

CompletableFuture<MasterService.Interface> getMasterStub() {
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatusProtos;

/**
* Maps RPC protocol interfaces to required configuration
Expand All @@ -51,6 +53,11 @@ public class SecurityInfo {
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(ReplicationServerStatusProtos.ReplicationServerStatusService.getDescriptor()
.getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(ReplicationServerProtos.ReplicationServerService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.REPLICATION_SERVER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
// NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE
// new Service will not be found when all is Kerberized!!!!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,8 @@ public enum OperationStatusCode {
/*
* cluster replication constants.
*/
public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled";
public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;
public static final String
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public final class SecurityConstants {
public static final String MASTER_KRB_KEYTAB_FILE = "hbase.master.keytab.file";
public static final String REGIONSERVER_KRB_PRINCIPAL = "hbase.regionserver.kerberos.principal";
public static final String REGIONSERVER_KRB_KEYTAB_FILE = "hbase.regionserver.keytab.file";
public static final String REPLICATION_SERVER_KRB_PRINCIPAL =
"hbase.replication.server.kerberos.principal";
public static final String REPLICATION_SERVER_KRB_KEYTAB_FILE =
"hbase.replication.server.keytab.file";

private SecurityConstants() {
// Can't be instantiated with this ctor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public final class DNS {

public enum ServerType {
MASTER("master"),
REGIONSERVER("regionserver");
REGIONSERVER("regionserver"),
REPLICATIONSERVER("replicationserver");

private String name;
ServerType(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,13 @@ message BalancerDecisionsResponse {
repeated BalancerDecision balancer_decision = 1;
}

message ListReplicationSinkServersRequest {
}

message ListReplicationSinkServersResponse {
repeated ServerName server_name = 1;
}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1146,10 +1153,13 @@ service MasterService {
returns (RenameRSGroupResponse);

rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest)
returns (UpdateRSGroupConfigResponse);
returns (UpdateRSGroupConfigResponse);

rpc GetLogEntries(LogRequest)
returns(LogEntry);

rpc ListReplicationSinkServers(ListReplicationSinkServersRequest)
returns (ListReplicationSinkServersResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";

package hbase.pb;

option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "ReplicationServerStatusProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "HBase.proto";
import "server/ClusterStatus.proto";

message ReplicationServerReportRequest {
required ServerName server = 1;

/** load the server is under */
optional ServerLoad load = 2;

/** The replication queues which this replication server is responsible for. */
repeated string queue_node = 3;
}

message ReplicationServerReportResponse {
}

service ReplicationServerStatusService {
rpc ReplicationServerReport(ReplicationServerReportRequest)
returns(ReplicationServerReportResponse);
}
12 changes: 12 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,15 @@ message ClearSlowLogResponses {
required bool is_cleaned = 1;
}

message GetLogFileSizeIfBeingWrittenRequest {
required string wal_path = 1;
}

message GetLogFileSizeIfBeingWrittenResponse {
required bool is_being_written = 1;
optional uint64 length = 2;
}

service AdminService {
rpc GetRegionInfo(GetRegionInfoRequest)
returns(GetRegionInfoResponse);
Expand Down Expand Up @@ -399,4 +408,7 @@ service AdminService {
rpc GetLogEntries(LogRequest)
returns(LogEntry);

rpc GetLogFileSizeIfBeingWritten(GetLogFileSizeIfBeingWrittenRequest)
returns(GetLogFileSizeIfBeingWrittenResponse);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
package hbase.pb;

option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "ReplicationServerProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "HBase.proto";
import "server/region/Admin.proto";

message StartReplicationSourceRequest {
required ServerName server_name = 1;
required string queue_id = 2;
}

message StartReplicationSourceResponse {
}

service ReplicationServerService {
rpc ReplicateWALEntry(ReplicateWALEntryRequest)
returns(ReplicateWALEntryResponse);

rpc StartReplicationSource(StartReplicationSourceRequest)
returns(StartReplicationSourceResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ public interface ReplicationListener {
* A region server has been removed from the local cluster
* @param regionServer the removed region server
*/
public void regionServerRemoved(String regionServer);
void regionServerRemoved(String regionServer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -31,11 +32,16 @@
* This class is responsible for the parsing logic for a queue id representing a queue.
* It will extract the peerId if it's recovered as well as the dead region servers
* that were part of the queue's history.
* One replication queue has only one owner. And the owner must be one region server. When enable
* replication offload feature, region server will not start replication source thread to replicate
* data. The replication queue will be used by replication server which is responsible for
* replicating data.
*/
@InterfaceAudience.Private
public class ReplicationQueueInfo {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueueInfo.class);

private final ServerName owner;
private final String peerId;
private final String queueId;
private boolean queueRecovered;
Expand All @@ -46,7 +52,8 @@ public class ReplicationQueueInfo {
* The passed queueId will be either the id of the peer or the handling story of that queue
* in the form of id-servername-*
*/
public ReplicationQueueInfo(String queueId) {
public ReplicationQueueInfo(ServerName owner, String queueId) {
this.owner = owner;
this.queueId = queueId;
String[] parts = queueId.split("-", 2);
this.queueRecovered = parts.length != 1;
Expand All @@ -57,6 +64,22 @@ public ReplicationQueueInfo(String queueId) {
}
}

/**
* A util method to parse the peerId from queueId.
*/
public static String parsePeerId(String queueId) {
String[] parts = queueId.split("-", 2);
return parts.length != 1 ? parts[0] : queueId;
}

/**
* A util method to check whether a queue is recovered.
*/
public static boolean isQueueRecovered(String queueId) {
String[] parts = queueId.split("-", 2);
return parts.length != 1;
}

/**
* Parse dead server names from queue id. servername can contain "-" such as
* "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
Expand Down Expand Up @@ -114,6 +137,10 @@ public List<ServerName> getDeadRegionServers() {
return Collections.unmodifiableList(this.deadRegionServers);
}

public ServerName getOwner() {
return this.owner;
}

public String getPeerId() {
return this.peerId;
}
Expand All @@ -125,4 +152,18 @@ public String getQueueId() {
public boolean isQueueRecovered() {
return queueRecovered;
}

@Override
public boolean equals(Object o) {
if (o instanceof ReplicationQueueInfo) {
ReplicationQueueInfo other = (ReplicationQueueInfo) o;
return Objects.equals(this.owner, other.owner) && Objects.equals(this.queueId, other.queueId);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(this.owner, this.queueId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,8 +86,7 @@ public static void removeAllQueues(ReplicationQueueStorage queueStorage, String
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) {
if (ReplicationQueueInfo.parsePeerId(queueId).equals(peerId)) {
queueStorage.removeQueue(replicator, queueId);
}
}
Expand Down Expand Up @@ -212,4 +214,25 @@ public static int getAdaptiveTimeout(final int initialValue, final int retries)
}
return initialValue * HConstants.RETRY_BACKOFF[ntries];
}

/**
* Check whether peer cluster supports replication offload.
* @param peerConn connection for peer cluster
* @return true if peer cluster version >= 3
* @throws IOException exception
*/
public static boolean isPeerClusterSupportReplicationOffload(AsyncConnection peerConn)
throws IOException {
AsyncAdmin admin = peerConn.getAdmin();
String version = FutureUtils.get(admin.getClusterMetrics()).getHBaseVersion();
if (Integer.parseInt(version.split("\\.")[0]) >= 3) {
return true;
}
return false;
}

public static boolean isReplicationOffloadEnabled(Configuration conf) {
return conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT);
}
}
Loading