Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2986,11 +2986,13 @@ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte

public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
ServerName destinationServer) {
return buildCloseRegionRequest(server, regionName, destinationServer, -1);
// this method is used when we are bypassing active HMaster, so we don't have procId or master
// active time.
return buildCloseRegionRequest(server, regionName, destinationServer, -1, -1);
}

public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
ServerName destinationServer, long closeProcId) {
ServerName destinationServer, long closeProcId, long initiatingMasterActiveTime) {
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
RegionSpecifier region =
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
Expand All @@ -3001,6 +3003,7 @@ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte
if (server != null) {
builder.setServerStartCode(server.getStartcode());
}
builder.setInitiatingMasterActiveTime(initiatingMasterActiveTime);
builder.setCloseProcId(closeProcId);
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,22 @@ protected final void submitTask(Runnable task, long delay, TimeUnit unit) {
*/
public static abstract class RemoteOperation {
private final RemoteProcedure remoteProcedure;
// active time of the master that sent this request, used for fencing
private final long initiatingMasterActiveTime;

protected RemoteOperation(final RemoteProcedure remoteProcedure) {
protected RemoteOperation(final RemoteProcedure remoteProcedure,
long initiatingMasterActiveTime) {
this.remoteProcedure = remoteProcedure;
this.initiatingMasterActiveTime = initiatingMasterActiveTime;
}

public RemoteProcedure getRemoteProcedure() {
return remoteProcedure;
}

public long getInitiatingMasterActiveTime() {
return initiatingMasterActiveTime;
}
}

/**
Expand Down
6 changes: 6 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ message OpenRegionRequest {
repeated RegionOpenInfo open_info = 1;
// the intended server for this RPC.
optional uint64 serverStartCode = 2;
// Master active time as fencing token
optional int64 initiating_master_active_time = 3;
// wall clock time from master
optional uint64 master_system_time = 5;

Expand Down Expand Up @@ -122,6 +124,8 @@ message CloseRegionRequest {
// the intended server for this RPC.
optional uint64 serverStartCode = 5;
optional int64 close_proc_id = 6 [default = -1];
// Master active time as fencing token
optional int64 initiating_master_active_time = 8;
}

message CloseRegionResponse {
Expand Down Expand Up @@ -271,6 +275,8 @@ message RemoteProcedureRequest {
required uint64 proc_id = 1;
required string proc_class = 2;
optional bytes proc_data = 3;
// Master active time as fencing token
optional int64 initiating_master_active_time = 4;
}

message ExecuteProceduresRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ message RegionStateTransition {
optional uint64 open_seq_num = 3;

repeated int64 proc_id = 4;

// Master active time as fencing token
optional int64 initiating_master_active_time = 5;
enum TransitionCode {
OPENED = 0;
FAILED_OPEN = 1;
Expand Down Expand Up @@ -155,6 +158,8 @@ message RemoteProcedureResult {
}
required Status status = 2;
optional ForeignExceptionMessage error = 3;
// Master active time as fencing token
optional int64 initiating_master_active_time = 4;
}
message ReportProcedureDoneRequest {
repeated RemoteProcedureResult result = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3036,6 +3036,7 @@ public long getMasterStartTime() {
}

/** Returns timestamp in millis when HMaster became the active master. */
@Override
public long getMasterActiveTime() {
return masterActiveTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.Server;
Expand Down Expand Up @@ -69,7 +70,6 @@
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
Expand Down Expand Up @@ -327,6 +327,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
Expand Down Expand Up @@ -1695,6 +1696,15 @@ public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcContro
ReportRegionStateTransitionRequest req) throws ServiceException {
try {
master.checkServiceStarted();
for (RegionServerStatusProtos.RegionStateTransition transition : req.getTransitionList()) {
long procId =
transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
// -1 is less than any possible MasterActiveCode
long initiatingMasterActiveTime = transition.hasInitiatingMasterActiveTime()
? transition.getInitiatingMasterActiveTime()
: -1;
throwOnOldMaster(procId, initiatingMasterActiveTime);
}
return master.getAssignmentManager().reportRegionStateTransition(req);
} catch (IOException ioe) {
throw new ServiceException(ioe);
Expand Down Expand Up @@ -2379,8 +2389,14 @@ public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
// Check Masters is up and ready for duty before progressing. Remote side will keep trying.
try {
this.master.checkServiceStarted();
} catch (ServerNotRunningYetException snrye) {
throw new ServiceException(snrye);
for (RemoteProcedureResult result : request.getResultList()) {
// -1 is less than any possible MasterActiveCode
long initiatingMasterActiveTime =
result.hasInitiatingMasterActiveTime() ? result.getInitiatingMasterActiveTime() : -1;
throwOnOldMaster(result.getProcId(), initiatingMasterActiveTime);
}
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
request.getResultList().forEach(result -> {
if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
Expand All @@ -2393,6 +2409,18 @@ public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
return ReportProcedureDoneResponse.getDefaultInstance();
}

private void throwOnOldMaster(long procId, long initiatingMasterActiveTime)
throws MasterNotRunningException {
if (initiatingMasterActiveTime > master.getMasterActiveTime()) {
// procedure is initiated by new active master but report received on master with older active
// time
LOG.warn(
"Report for procId: {} and initiatingMasterAT {} received on master with activeTime {}",
procId, initiatingMasterActiveTime, master.getMasterActiveTime());
throw new MasterNotRunningException("Another master is active");
}
}

// HBCK Services

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ long splitRegion(final RegionInfo regionInfo, final byte[] splitRow, final long
/** Returns true if master is the active one */
boolean isActiveMaster();

/** Returns timestamp in millis when this master became the active one. */
long getMasterActiveTime();

/** Returns true if master is initialized */
boolean isInitialized();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ public TableOperationType getTableOperationType() {
}

@Override
public RemoteOperation newRemoteOperation() {
return new RegionCloseOperation(this, region, getProcId(), assignCandidate);
public RemoteOperation newRemoteOperation(MasterProcedureEnv env) {
return new RegionCloseOperation(this, region, getProcId(), assignCandidate,
env.getMasterServices().getMasterActiveTime());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ public TableOperationType getTableOperationType() {
}

@Override
public RemoteOperation newRemoteOperation() {
return new RegionOpenOperation(this, region, getProcId());
public RemoteOperation newRemoteOperation(MasterProcedureEnv env) {
return new RegionOpenOperation(this, region, getProcId(),
env.getMasterServices().getMasterActiveTime());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(Maste
if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
return Optional.empty();
}
return Optional.of(newRemoteOperation());
return Optional.of(newRemoteOperation(env));
}

protected abstract RemoteProcedureDispatcher.RemoteOperation newRemoteOperation();
protected abstract RemoteProcedureDispatcher.RemoteOperation
newRemoteOperation(MasterProcedureEnv env);

@Override
public void remoteOperationCompleted(MasterProcedureEnv env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public void dispatchCloseRequests(final MasterProcedureEnv env,

@Override
public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
operations.stream().map(ServerOperation::buildRequest).forEachOrdered(request::addProc);
}

// will be overridden in test.
Expand All @@ -450,7 +450,9 @@ protected final void remoteCallFailed(final MasterProcedureEnv env, final IOExce
private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
final ServerName serverName, final List<RegionOpenOperation> operations) {
final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
builder.setServerStartCode(serverName.getStartcode());
builder.setServerStartCode(serverName.getStartCode());
operations.stream().map(RemoteOperation::getInitiatingMasterActiveTime).findAny()
.ifPresent(builder::setInitiatingMasterActiveTime);
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
for (RegionOpenOperation op : operations) {
builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
Expand All @@ -473,35 +475,37 @@ public static final class ServerOperation extends RemoteOperation {
private final byte[] rsProcData;

public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
byte[] rsProcData) {
super(remoteProcedure);
byte[] rsProcData, long initiatingMasterActiveTime) {
super(remoteProcedure, initiatingMasterActiveTime);
this.procId = procId;
this.rsProcClass = rsProcClass;
this.rsProcData = rsProcData;
}

public RemoteProcedureRequest buildRequest() {
return RemoteProcedureRequest.newBuilder().setProcId(procId)
.setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
.setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData))
.setInitiatingMasterActiveTime(getInitiatingMasterActiveTime()).build();
}
}

public static abstract class RegionOperation extends RemoteOperation {
protected final RegionInfo regionInfo;
protected final long procId;

protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
super(remoteProcedure);
protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
long initiatingMasterActiveTime) {
super(remoteProcedure, initiatingMasterActiveTime);
this.regionInfo = regionInfo;
this.procId = procId;
}
}

public static class RegionOpenOperation extends RegionOperation {

public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo,
long procId) {
super(remoteProcedure, regionInfo, procId);
public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
long initiatingMasterActiveTime) {
super(remoteProcedure, regionInfo, procId, initiatingMasterActiveTime);
}

public OpenRegionRequest.RegionOpenInfo
Expand All @@ -515,8 +519,8 @@ public static class RegionCloseOperation extends RegionOperation {
private final ServerName destinationServer;

public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
ServerName destinationServer) {
super(remoteProcedure, regionInfo, procId);
ServerName destinationServer, long initiatingMasterActiveTime) {
super(remoteProcedure, regionInfo, procId, initiatingMasterActiveTime);
this.destinationServer = destinationServer;
}

Expand All @@ -526,7 +530,7 @@ public ServerName getDestinationServer() {

public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
getDestinationServer(), procId);
getDestinationServer(), procId, getInitiatingMasterActiveTime());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
@Override
public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
ServerName serverName) {
return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(),
SplitWALCallable.class, MasterProcedureProtos.SplitWALParameter.newBuilder()
.setWalPath(walPath).build().toByteArray()));
return Optional.of(new RSProcedureDispatcher.ServerOperation(
this, getProcId(), SplitWALCallable.class, MasterProcedureProtos.SplitWALParameter
.newBuilder().setWalPath(walPath).build().toByteArray(),
env.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
SwitchRpcThrottleRemoteCallable.class,
SwitchRpcThrottleRemoteStateData.newBuilder()
.setTargetServer(ProtobufUtil.toServerName(remote))
.setRpcThrottleEnabled(rpcThrottleEnabled).build().toByteArray()));
.setRpcThrottleEnabled(rpcThrottleEnabled).build().toByteArray(),
masterProcedureEnv.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerN
return Optional.of(new ServerOperation(this, getProcId(), ClaimReplicationQueueCallable.class,
ClaimReplicationQueueRemoteParameter.newBuilder()
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue).build()
.toByteArray()));
.toByteArray(),
env.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerN
assert targetServer.equals(remote);
return Optional.of(new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()));
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray(),
env.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Loading