Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure;
Expand Down Expand Up @@ -260,6 +261,7 @@
import org.apache.hadoop.hbase.util.JsonMapper;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.util.TableDescriptorChecker;
Expand Down Expand Up @@ -489,6 +491,15 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
public static final String WARMUP_BEFORE_MOVE = "hbase.master.warmup.before.move";
private static final boolean DEFAULT_WARMUP_BEFORE_MOVE = true;

/**
* Use RSProcedureDispatcher instance to initiate master -> rs remote procedure execution. Use
* this config to extend RSProcedureDispatcher (mainly for testing purpose).
*/
public static final String HBASE_MASTER_RSPROC_DISPATCHER_CLASS =
"hbase.master.rsproc.dispatcher.class";
private static final String DEFAULT_HBASE_MASTER_RSPROC_DISPATCHER_CLASS =
RSProcedureDispatcher.class.getName();

private TaskGroup startupTaskGroup;

/**
Expand Down Expand Up @@ -1833,7 +1844,11 @@ protected void stopServiceThreads() {
}

private void createProcedureExecutor() throws IOException {
MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
final String procedureDispatcherClassName =
conf.get(HBASE_MASTER_RSPROC_DISPATCHER_CLASS, DEFAULT_HBASE_MASTER_RSPROC_DISPATCHER_CLASS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for extention of RSProcedureDispatcher or can there be a new implementation of RemoteProcedureDispatcher ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows only extension of RSProcedureDispatcher, mainly to be used for testing purpose.

final RSProcedureDispatcher procedureDispatcher = ReflectionUtils.instantiateWithCustomCtor(
procedureDispatcherClassName, new Class[] { MasterServices.class }, new Object[] { this });
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this, procedureDispatcher);
procedureStore = new RegionProcedureStore(this, masterRegion,
new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
procedureStore.registerListener(new ProcedureStoreListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ private boolean isRunning() {
private final MasterProcedureScheduler procSched;
private final MasterServices master;

public MasterProcedureEnv(final MasterServices master) {
this(master, new RSProcedureDispatcher(master));
}

public MasterProcedureEnv(final MasterServices master,
final RSProcedureDispatcher remoteDispatcher) {
this.master = master;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.ipc.RpcConnectionConstants;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
Expand Down Expand Up @@ -249,6 +249,22 @@ protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver,
"hbase.regionserver.rpc.retry.interval";
private static final int DEFAULT_RS_RPC_RETRY_INTERVAL = 100;

/**
* Config to determine the retry limit while executing remote regionserver procedure. This retry
* limit applies to only specific errors. These errors could potentially get the remote
* procedure stuck for several minutes unless the retry limit is applied.
*/
private static final String RS_REMOTE_PROC_FAIL_FAST_LIMIT =
"hbase.master.rs.remote.proc.fail.fast.limit";
/**
* The default retry limit. Waiting for more than {@value} attempts is not going to help much
* for genuine connectivity errors. Therefore, consider fail-fast after {@value} retries. Value
* = {@value}
*/
private static final int DEFAULT_RS_REMOTE_PROC_RETRY_LIMIT = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 5? Why not 3?
Let's have a comment on justification for the default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


private final int failFastRetryLimit;

private ExecuteProceduresRequest.Builder request = null;

public ExecuteProceduresRemoteCall(final ServerName serverName,
Expand All @@ -257,6 +273,8 @@ public ExecuteProceduresRemoteCall(final ServerName serverName,
this.remoteProcedures = remoteProcedures;
this.rsRpcRetryInterval = master.getConfiguration().getLong(RS_RPC_RETRY_INTERVAL_CONF_KEY,
DEFAULT_RS_RPC_RETRY_INTERVAL);
this.failFastRetryLimit = master.getConfiguration().getInt(RS_REMOTE_PROC_FAIL_FAST_LIMIT,
DEFAULT_RS_REMOTE_PROC_RETRY_LIMIT);
}

private AsyncRegionServerAdmin getRsAdmin() throws IOException {
Expand Down Expand Up @@ -300,13 +318,28 @@ private boolean scheduleForRetry(IOException e) {
if (numberOfAttemptsSoFar == 0 && unableToConnectToServer(e)) {
return false;
}

// Check if the num of attempts have crossed the retry limit, and if the error type can
// fail-fast.
if (numberOfAttemptsSoFar >= failFastRetryLimit - 1 && isErrorTypeFailFast(e)) {
LOG
.warn("Number of retries {} exceeded limit {} for the given error type. Scheduling server"
+ " crash for {}", numberOfAttemptsSoFar + 1, failFastRetryLimit, serverName, e);
// Expiring the server will schedule SCP and also reject the regionserver report from the
// regionserver if regionserver is somehow able to send the regionserver report to master.
// The master rejects the report by throwing YouAreDeadException, which would eventually
// result in the regionserver abort.
// This will also remove "serverName" from the ServerManager's onlineServers map.
master.getServerManager().expireServer(serverName);
return false;
}
// Always retry for other exception types if the region server is not dead yet.
if (!master.getServerManager().isServerOnline(serverName)) {
LOG.warn("Request to {} failed due to {}, try={} and the server is not online, give up",
serverName, e.toString(), numberOfAttemptsSoFar);
return false;
}
if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) {
if (e instanceof RegionServerStoppedException) {
// A better way is to return true here to let the upper layer quit, and then schedule a
// background task to check whether the region server is dead. And if it is dead, call
// remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect
Expand All @@ -324,7 +357,8 @@ private boolean scheduleForRetry(IOException e) {
// retry^2 on each try
// up to max of 10 seconds (don't want to back off too much in case of situation change).
submitTask(this,
Math.min(rsRpcRetryInterval * (this.numberOfAttemptsSoFar * this.numberOfAttemptsSoFar),
Math.min(
rsRpcRetryInterval * ((long) this.numberOfAttemptsSoFar * this.numberOfAttemptsSoFar),
10 * 1000),
TimeUnit.MILLISECONDS);
return true;
Expand Down Expand Up @@ -371,6 +405,39 @@ private boolean isSaslError(IOException e) {
}
}

/**
* Returns true if the error or its cause is of type ConnectionClosedException.
* @param e IOException thrown by the underlying rpc framework.
* @return True if the error or its cause is of type ConnectionClosedException.
*/
private boolean isConnectionClosedError(IOException e) {
if (e instanceof ConnectionClosedException) {
return true;
}
Throwable cause = e;
while (true) {
if (cause instanceof IOException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can only unwrap RemoteException, so here let's just test RemoteException directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoteException is of type IOE and I kept this to be consistent with other exception check logic we have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this fine @Apache9? I can change it if you have strong opinion on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I saw you have already used this style in isSaslError, I should stop it at the first place...

Let's file new issues to polish it. UnwrapException is not designed to be used here, it just want to put the instanceof test inside the method so we do not need to write it everywhere but here we already have the instanceof, so calling this method again does not make sense...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, isConnectionClosedError() also follows the same pattern. Logically there is no difference but your suggestion will make it look cleaner so let's do it in separate Jira?

IOException unwrappedCause = unwrapException((IOException) cause);
if (unwrappedCause instanceof ConnectionClosedException) {
return true;
}
}
cause = cause.getCause();
if (cause == null) {
return false;
}
}
}

/**
* Returns true if the error type can allow fail-fast.
* @param e IOException thrown by the underlying rpc framework.
* @return True if the error type can allow fail-fast.
*/
private boolean isErrorTypeFailFast(IOException e) {
return e instanceof CallQueueTooBigException || isSaslError(e) || isConnectionClosedError(e);
}

private long getMaxWaitTime() {
if (this.maxWaitTime < 0) {
// This is the max attempts, not retries, so it should be at least 1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ protected void setupConfiguration(Configuration conf) throws Exception {
// make retry for TRSP more frequent
conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10);
conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100);
conf.setInt("hbase.master.rs.remote.proc.fail.fast.limit", Integer.MAX_VALUE);
}

@Before
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;

/**
* Test implementation of RSProcedureDispatcher that throws desired errors for testing purpose.
*/
public class RSProcDispatcher extends RSProcedureDispatcher {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extension of RSProcedureDispatcher is under hbase-server/src/test package.


private static final Logger LOG = LoggerFactory.getLogger(RSProcDispatcher.class);

private static final AtomicInteger i = new AtomicInteger();

public RSProcDispatcher(MasterServices master) {
super(master);
}

@Override
protected void remoteDispatch(final ServerName serverName,
final Set<RemoteProcedure> remoteProcedures) {
if (!master.getServerManager().isServerOnline(serverName)) {
// fail fast
submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
} else {
submitTask(new TestExecuteProceduresRemoteCall(serverName, remoteProcedures));
}
}

class TestExecuteProceduresRemoteCall extends ExecuteProceduresRemoteCall {

public TestExecuteProceduresRemoteCall(ServerName serverName,
Set<RemoteProcedure> remoteProcedures) {
super(serverName, remoteProcedures);
}

@Override
public AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
final AdminProtos.ExecuteProceduresRequest request) throws IOException {
int j = i.addAndGet(1);
LOG.info("sendRequest() req: {} , j: {}", request, j);
if (j == 12 || j == 22) {
// Execute the remote close and open region requests in the last (5th) retry before
// throwing ConnectionClosedException. This is to ensure even if the region open/close
// is successfully completed by regionserver, master still schedules SCP because
// sendRequest() throws error which has retry-limit exhausted.
FutureUtils.get(getRsAdmin().executeProcedures(request));
}
// For one of the close region requests and one of the open region requests,
// throw ConnectionClosedException until retry limit is exhausted and master
// schedules recoveries for the server.
// We will have ABNORMALLY_CLOSED regions, and they are expected to recover on their own.
if (j >= 8 && j <= 13 || j >= 18 && j <= 23) {
throw new ConnectionClosedException("test connection closed error...");
}
return FutureUtils.get(getRsAdmin().executeProcedures(request));
}

private AsyncRegionServerAdmin getRsAdmin() {
return master.getAsyncClusterConnection().getRegionServerAdmin(getServerName());
}
}

private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {

public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
super(serverName, remoteProcedures);
}

@Override
public void run() {
remoteCallFailed(master.getMasterProcedureExecutor().getEnvironment(),
new RegionServerStoppedException("Server " + getServerName() + " is not online"));
}
}

}
Loading