Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3167,6 +3167,7 @@ private void requeueCall(Call call)
throws IOException, InterruptedException {
try {
internalQueueCall(call, false);
rpcMetrics.incrRequeueCalls();
} catch (RpcServerException rse) {
call.doResponse(rse.getCause(), rse.getRpcStatusProto());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public static RpcMetrics create(Server server, Configuration conf) {
MutableCounterLong rpcClientBackoff;
@Metric("Number of Slow RPC calls")
MutableCounterLong rpcSlowCalls;
@Metric("Number of requeue calls")
MutableCounterLong rpcRequeueCalls;

@Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections();
Expand Down Expand Up @@ -304,6 +306,13 @@ public void incrSlowRpc() {
rpcSlowCalls.incr();
}

/**
* Increments the Requeue Calls counter.
*/
public void incrRequeueCalls() {
rpcRequeueCalls.incr();
}

/**
* Returns a MutableRate Counter.
* @return Mutable Rate
Expand Down Expand Up @@ -344,6 +353,15 @@ public long getRpcSlowCalls() {
return rpcSlowCalls.value();
}

/**
* Returns the number of requeue calls;
* @return long
*/
@VisibleForTesting
public long getRpcRequeueCalls() {
return rpcRequeueCalls.value();
}

public MutableRate getDeferredRpcProcessingTime() {
return deferredRpcProcessingTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6993,6 +6993,16 @@ public synchronized void verifyToken(DelegationTokenIdentifier identifier,
public EditLogTailer getEditLogTailer() {
return editLogTailer;
}

@VisibleForTesting
public void startNewEditLogTailer(Configuration conf) throws IOException {
if (this.editLogTailer != null) {
this.editLogTailer.stop();
}

this.editLogTailer = new EditLogTailer(this, conf);
this.editLogTailer.start();
}

@VisibleForTesting
public void setEditLogTailerForTests(EditLogTailer tailer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -36,7 +37,10 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -61,9 +65,11 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After;
Expand Down Expand Up @@ -124,6 +130,42 @@ public static void shutDownCluster() throws IOException {
}
}

@Test
public void testObserverRequeue() throws Exception {
ScheduledExecutorService interruptor =
Executors.newScheduledThreadPool(1);

EditLogTailer observerEditlogTailer = dfsCluster.getNameNode(2)
.getNamesystem().getEditLogTailer();
RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster
.getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics();

// Stop EditlogTailer of Observer NameNode.
observerEditlogTailer.stop();

long oldRequeueNum = obRpcMetrics.getRpcRequeueCalls();

ScheduledFuture<FileStatus> scheduledFuture = interruptor.schedule(
() -> {
Path tmpTestPath = new Path("/TestObserverRequeue");
dfs.create(tmpTestPath, (short)1).close();
assertSentTo(0);
// This operation will be blocked in ObserverNameNode
// until EditlogTailer tailed edits from journalNode.
FileStatus fileStatus = dfs.getFileStatus(tmpTestPath);
assertSentTo(2);
return fileStatus;
}, 0, TimeUnit.SECONDS);
Thread.sleep(1000);
observerEditlogTailer.doTailEdits();
FileStatus fileStatus = scheduledFuture.get(1000, TimeUnit.MILLISECONDS);
assertNotNull(fileStatus);

assertTrue(obRpcMetrics.getRpcRequeueCalls() > oldRequeueNum);
dfsCluster.getNameNode(2).getNamesystem()
.startNewEditLogTailer(dfsCluster.getConfiguration(2));
}

@Test
public void testNoActiveToObserver() throws Exception {
try {
Expand Down