diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index e79612f7a5a0f..1913e4533f7a2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -3167,6 +3167,7 @@ private void requeueCall(Call call) throws IOException, InterruptedException { try { internalQueueCall(call, false); + rpcMetrics.incrRequeueCalls(); } catch (RpcServerException rse) { call.doResponse(rse.getCause(), rse.getRpcStatusProto()); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index bf21e3865fa8a..ca386dc00fd68 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -128,6 +128,8 @@ public static RpcMetrics create(Server server, Configuration conf) { MutableCounterLong rpcClientBackoff; @Metric("Number of Slow RPC calls") MutableCounterLong rpcSlowCalls; + @Metric("Number of requeue calls") + MutableCounterLong rpcRequeueCalls; @Metric("Number of open connections") public int numOpenConnections() { return server.getNumOpenConnections(); @@ -304,6 +306,13 @@ public void incrSlowRpc() { rpcSlowCalls.incr(); } + /** + * Increments the Requeue Calls counter. + */ + public void incrRequeueCalls() { + rpcRequeueCalls.incr(); + } + /** * Returns a MutableRate Counter. * @return Mutable Rate @@ -344,6 +353,15 @@ public long getRpcSlowCalls() { return rpcSlowCalls.value(); } + /** + * Returns the number of requeue calls; + * @return long + */ + @VisibleForTesting + public long getRpcRequeueCalls() { + return rpcRequeueCalls.value(); + } + public MutableRate getDeferredRpcProcessingTime() { return deferredRpcProcessingTime; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 13894b4fecf8c..55ec0e34321d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -6993,6 +6993,16 @@ public synchronized void verifyToken(DelegationTokenIdentifier identifier, public EditLogTailer getEditLogTailer() { return editLogTailer; } + + @VisibleForTesting + public void startNewEditLogTailer(Configuration conf) throws IOException { + if (this.editLogTailer != null) { + this.editLogTailer.stop(); + } + + this.editLogTailer = new EditLogTailer(this, conf); + this.editLogTailer.start(); + } @VisibleForTesting public void setEditLogTailerForTests(EditLogTailer tailer) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 29cae6f13adbc..a42c787921896 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -36,7 +37,10 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -61,9 +65,11 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer; import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.tools.GetGroups; import org.apache.hadoop.ipc.ObserverRetryOnActiveException; +import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.After; @@ -124,6 +130,42 @@ public static void shutDownCluster() throws IOException { } } + @Test + public void testObserverRequeue() throws Exception { + ScheduledExecutorService interruptor = + Executors.newScheduledThreadPool(1); + + EditLogTailer observerEditlogTailer = dfsCluster.getNameNode(2) + .getNamesystem().getEditLogTailer(); + RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster + .getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics(); + + // Stop EditlogTailer of Observer NameNode. + observerEditlogTailer.stop(); + + long oldRequeueNum = obRpcMetrics.getRpcRequeueCalls(); + + ScheduledFuture scheduledFuture = interruptor.schedule( + () -> { + Path tmpTestPath = new Path("/TestObserverRequeue"); + dfs.create(tmpTestPath, (short)1).close(); + assertSentTo(0); + // This operation will be blocked in ObserverNameNode + // until EditlogTailer tailed edits from journalNode. + FileStatus fileStatus = dfs.getFileStatus(tmpTestPath); + assertSentTo(2); + return fileStatus; + }, 0, TimeUnit.SECONDS); + Thread.sleep(1000); + observerEditlogTailer.doTailEdits(); + FileStatus fileStatus = scheduledFuture.get(1000, TimeUnit.MILLISECONDS); + assertNotNull(fileStatus); + + assertTrue(obRpcMetrics.getRpcRequeueCalls() > oldRequeueNum); + dfsCluster.getNameNode(2).getNamesystem() + .startNewEditLogTailer(dfsCluster.getConfiguration(2)); + } + @Test public void testNoActiveToObserver() throws Exception { try {