From eb929e32f109e5a45cc44ffde950c26d663d0c72 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 3 Jun 2021 18:42:00 +0530 Subject: [PATCH 01/12] HBASE-25881: Create a chore to update age related metrics --- .../hbase/regionserver/HRegionServer.java | 14 ++++ .../MetricsSourceRefresherChore.java | 70 +++++++++++++++++++ .../regionserver/ReplicationSource.java | 2 +- .../ReplicationSourceInterface.java | 4 ++ .../ReplicationSourceLogQueue.java | 38 ---------- .../replication/ReplicationSourceDummy.java | 7 ++ .../TestReplicationSourceLogQueue.java | 4 -- 7 files changed, 96 insertions(+), 43 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsSourceRefresherChore.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7783bff32c49..04268fdd822c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -147,6 +147,7 @@ import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.SecurityConstants; @@ -455,6 +456,9 @@ public class HRegionServer extends Thread implements /** The nonce manager chore. */ private ScheduledChore nonceManagerChore; + // The metrics source chore + private MetricsSourceRefresherChore metricsSourceChore; + private Map coprocessorServiceHandlers = Maps.newHashMap(); /** @@ -1980,16 +1984,26 @@ private void setupWALAndReplication() throws IOException { * Start up replication source and sink handlers. */ private void startReplicationService() throws IOException { + boolean is_replication_enabled = false; if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { + is_replication_enabled = true; this.replicationSourceHandler.startReplicationService(); } else { if (this.replicationSourceHandler != null) { + is_replication_enabled = true; this.replicationSourceHandler.startReplicationService(); } if (this.replicationSinkHandler != null) { this.replicationSinkHandler.startReplicationService(); } } + if (is_replication_enabled) { + int duration = this.conf.getInt(MetricsSourceRefresherChore.DURATION, + MetricsSourceRefresherChore.DEFAULT_DURATION); + for (ReplicationSourceInterface source: this.getReplicationSourceService().getReplicationManager().getSources()) { + this.metricsSourceChore = new MetricsSourceRefresherChore(duration, this, source); + } + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsSourceRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsSourceRefresherChore.java new file mode 100644 index 000000000000..47bbf40b767c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsSourceRefresherChore.java @@ -0,0 +1,70 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Map; +import java.util.concurrent.PriorityBlockingQueue; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The Class MetricsSourceRefresherChore for refreshing source metrics + */ +@InterfaceAudience.Private +public class MetricsSourceRefresherChore extends ScheduledChore{ + + private ReplicationSourceInterface replicationSource; + + private MetricsSource metrics; + + public static final String DURATION = "hbase.metrics.source.refresher.duration"; + public static final int DEFAULT_DURATION = 60000; + + public MetricsSourceRefresherChore(int duration, Stoppable stopper, ReplicationSourceInterface replicationSource) { + super("MetricsSourceRefresherChore", stopper, duration); + this.replicationSource = replicationSource; + this.metrics = this.replicationSource.getSourceMetrics(); + } + + @Override + protected void chore() { + this.metrics.setOldestWalAge(getOldestWalAge()); + } + + /* + Returns the age of oldest wal. + */ + long getOldestWalAge() { + long now = EnvironmentEdgeManager.currentTime(); + long timestamp = getOldestWalTimestamp(); + if (timestamp == Long.MAX_VALUE) { + // If there are no wals in the queue then set the oldest wal timestamp to current time + // so that the oldest wal age will be 0. + timestamp = now; + } + long age = now - timestamp; + return age; + } + + /* + Get the oldest wal timestamp from all the queues. + */ + private long getOldestWalTimestamp() { + long oldestWalTimestamp = Long.MAX_VALUE; + for (Map.Entry> entry : this.replicationSource.getQueues().entrySet()) { + PriorityBlockingQueue queue = entry.getValue(); + Path path = queue.peek(); + // Can path ever be null ? + if (path != null) { + oldestWalTimestamp = Math.min(oldestWalTimestamp, + AbstractFSWALProvider.WALStartTimeComparator.getTS(path)); + } + } + return oldestWalTimestamp; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d1268fab94cf..2ca7e0281d72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -259,7 +259,7 @@ public void enqueueLog(Path wal) { } } - @InterfaceAudience.Private + @Override public Map> getQueues() { return logQueue.getQueues(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 27e4b79c141b..5c85ac42ae55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.PriorityBlockingQueue; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -220,4 +222,6 @@ default boolean isRecovered() { default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { getSourceManager().logPositionAndCleanOldLogs(this, entryBatch); } + + public Map> getQueues(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java index 4d89edef5fdc..294c22de02d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java @@ -23,7 +23,6 @@ import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -80,8 +79,6 @@ public boolean enqueueLog(Path wal, String walGroupId) { } // Increment size of logQueue this.metrics.incrSizeOfLogQueue(); - // Compute oldest wal age - this.metrics.setOldestWalAge(getOldestWalAge()); // This will wal a warning for each new wal that gets created above the warn threshold int queueSize = queue.size(); if (queueSize > this.logQueueWarnThreshold) { @@ -137,8 +134,6 @@ public void remove(String walGroupId) { queue.remove(); // Decrease size logQueue. this.metrics.decrSizeOfLogQueue(); - // Re-compute age of oldest wal metric. - this.metrics.setOldestWalAge(getOldestWalAge()); } /** @@ -152,39 +147,6 @@ public void clear(String walGroupId) { queue.remove(); metrics.decrSizeOfLogQueue(); } - this.metrics.setOldestWalAge(getOldestWalAge()); - } - - /* - Returns the age of oldest wal. - */ - long getOldestWalAge() { - long now = EnvironmentEdgeManager.currentTime(); - long timestamp = getOldestWalTimestamp(); - if (timestamp == Long.MAX_VALUE) { - // If there are no wals in the queue then set the oldest wal timestamp to current time - // so that the oldest wal age will be 0. - timestamp = now; - } - long age = now - timestamp; - return age; - } - - /* - Get the oldest wal timestamp from all the queues. - */ - private long getOldestWalTimestamp() { - long oldestWalTimestamp = Long.MAX_VALUE; - for (Map.Entry> entry : queues.entrySet()) { - PriorityBlockingQueue queue = entry.getValue(); - Path path = queue.peek(); - // Can path ever be null ? - if (path != null) { - oldestWalTimestamp = Math.min(oldestWalTimestamp, - AbstractFSWALProvider.WALStartTimeComparator.getTS(path)); - } - } - return oldestWalTimestamp; } public MetricsSource getMetrics() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index cab01d6fa6e6..e536f998a8f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -172,4 +174,9 @@ public ReplicationQueueStorage getReplicationQueueStorage() { public ReplicationPeer getPeer() { return replicationPeer; } + + @Override + public Map> getQueues() { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java index c28b18003c5b..49292b7aa335 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java @@ -60,22 +60,18 @@ public void testEnqueueDequeue() { manualEdge.setValue(10); // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. logQueue.enqueueLog(log1, walGroupId1); - assertEquals(2, logQueue.getOldestWalAge()); final Path log2 = new Path("log-walgroup-b.4"); // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 logQueue.enqueueLog(log2, walGroupId2); - assertEquals(6, logQueue.getOldestWalAge()); // Remove an element from walGroupId2. // After this op, there will be only one element in the queue log-walgroup-a.8 logQueue.remove(walGroupId2); - assertEquals(2, logQueue.getOldestWalAge()); // Remove last element from the queue. logQueue.remove(walGroupId1); // This will test the case where there are no elements in the queue. - assertEquals(0, logQueue.getOldestWalAge()); } finally { EnvironmentEdgeManager.reset(); } From 9fd580bb7b57f4feefdf52d8e185ef49b15e720f Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 1 Jul 2021 14:06:16 +0530 Subject: [PATCH 02/12] schedulingCore as part of ReplicationSource setup --- .../hadoop/hbase/regionserver/HRegionServer.java | 14 -------------- .../MetricsReplicationSourceRefresherChore.java} | 12 +++++------- .../regionserver/ReplicationSourceInterface.java | 3 --- .../hbase/replication/ReplicationSourceDummy.java | 6 ------ 4 files changed, 5 insertions(+), 30 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/{regionserver/MetricsSourceRefresherChore.java => replication/regionserver/MetricsReplicationSourceRefresherChore.java} (77%) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 04268fdd822c..c697604aa49e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -456,9 +456,6 @@ public class HRegionServer extends Thread implements /** The nonce manager chore. */ private ScheduledChore nonceManagerChore; - // The metrics source chore - private MetricsSourceRefresherChore metricsSourceChore; - private Map coprocessorServiceHandlers = Maps.newHashMap(); /** @@ -1984,26 +1981,16 @@ private void setupWALAndReplication() throws IOException { * Start up replication source and sink handlers. */ private void startReplicationService() throws IOException { - boolean is_replication_enabled = false; if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { - is_replication_enabled = true; this.replicationSourceHandler.startReplicationService(); } else { if (this.replicationSourceHandler != null) { - is_replication_enabled = true; this.replicationSourceHandler.startReplicationService(); } if (this.replicationSinkHandler != null) { this.replicationSinkHandler.startReplicationService(); } } - if (is_replication_enabled) { - int duration = this.conf.getInt(MetricsSourceRefresherChore.DURATION, - MetricsSourceRefresherChore.DEFAULT_DURATION); - for (ReplicationSourceInterface source: this.getReplicationSourceService().getReplicationManager().getSources()) { - this.metricsSourceChore = new MetricsSourceRefresherChore(duration, this, source); - } - } } /** @@ -2151,7 +2138,6 @@ private void startServices() throws IOException { if (this.slowLogTableOpsChore != null) { choreService.scheduleChore(slowLogTableOpsChore); } - // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsSourceRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java similarity index 77% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsSourceRefresherChore.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java index 47bbf40b767c..64b6f1ab9196 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsSourceRefresherChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java @@ -1,4 +1,4 @@ -package org.apache.hadoop.hbase.regionserver; +package org.apache.hadoop.hbase.replication.regionserver; import java.util.Map; import java.util.concurrent.PriorityBlockingQueue; @@ -6,8 +6,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -16,16 +14,16 @@ * The Class MetricsSourceRefresherChore for refreshing source metrics */ @InterfaceAudience.Private -public class MetricsSourceRefresherChore extends ScheduledChore{ +public class MetricsReplicationSourceRefresherChore extends ScheduledChore { - private ReplicationSourceInterface replicationSource; + private ReplicationSource replicationSource; private MetricsSource metrics; - public static final String DURATION = "hbase.metrics.source.refresher.duration"; + public static final String DURATION = "hbase.metrics.replication.source.refresher.duration"; public static final int DEFAULT_DURATION = 60000; - public MetricsSourceRefresherChore(int duration, Stoppable stopper, ReplicationSourceInterface replicationSource) { + public MetricsReplicationSourceRefresherChore(int duration, Stoppable stopper, ReplicationSource replicationSource) { super("MetricsSourceRefresherChore", stopper, duration); this.replicationSource = replicationSource; this.metrics = this.replicationSource.getSourceMetrics(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 5c85ac42ae55..a790ce0d2191 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -222,6 +221,4 @@ default boolean isRecovered() { default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { getSourceManager().logPositionAndCleanOldLogs(this, entryBatch); } - - public Map> getQueues(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index e536f998a8f1..be6b62b1a6ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -174,9 +173,4 @@ public ReplicationQueueStorage getReplicationQueueStorage() { public ReplicationPeer getPeer() { return replicationPeer; } - - @Override - public Map> getQueues() { - return null; - } } From 02a3c5971d1c704c0801304d1e32cccb9eca6efc Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 1 Jul 2021 14:08:05 +0530 Subject: [PATCH 03/12] add chore --- .../replication/regionserver/ReplicationSource.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2ca7e0281d72..504c1816e544 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MetricsReplicationSourceRefresherChore; +import org.apache.hadoop.hbase.regionserver.MetricsSourceRefresherChore; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; @@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.replication.regionserver.Replication.ReplicationStatisticsChore; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -223,7 +226,9 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); - + int duration = this.conf.getInt(MetricsReplicationSourceRefresherChore.DURATION, + MetricsReplicationSourceRefresherChore.DEFAULT_DURATION); + this.server.getChoreService().scheduleChore(new MetricsReplicationSourceRefresherChore(duration, server, this)); LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -259,7 +264,7 @@ public void enqueueLog(Path wal) { } } - @Override + @InterfaceAudience.Private public Map> getQueues() { return logQueue.getQueues(); } @@ -810,7 +815,7 @@ Server getServer() { public ReplicationQueueStorage getReplicationQueueStorage() { return queueStorage; } - + void removeWorker(ReplicationSourceShipper worker) { workerThreads.remove(worker.walGroupId, worker); } From 34ba2b547d15ea9b2dac2cc3bb0b42c67a8b8dfe Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 22 Jul 2021 14:35:05 +0530 Subject: [PATCH 04/12] Add test case --- ...etricsReplicationSourceRefresherChore.java | 6 +- .../regionserver/ReplicationSource.java | 1 - ...etricsReplicationSourceRefresherChore.java | 61 +++++++++++++++++++ 3 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java index 64b6f1ab9196..0e7352cfc470 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java @@ -22,7 +22,11 @@ public class MetricsReplicationSourceRefresherChore extends ScheduledChore { public static final String DURATION = "hbase.metrics.replication.source.refresher.duration"; public static final int DEFAULT_DURATION = 60000; - + + public MetricsReplicationSourceRefresherChore(Stoppable stopper, ReplicationSource replicationSource) { + this(DEFAULT_DURATION, stopper, replicationSource); + } + public MetricsReplicationSourceRefresherChore(int duration, Stoppable stopper, ReplicationSource replicationSource) { super("MetricsSourceRefresherChore", stopper, duration); this.replicationSource = replicationSource; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 504c1816e544..456d5ad8f72d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MetricsReplicationSourceRefresherChore; -import org.apache.hadoop.hbase.regionserver.MetricsSourceRefresherChore; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java new file mode 100644 index 000000000000..618bc57af096 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java @@ -0,0 +1,61 @@ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({SmallTests.class}) +public class TestMetricsReplicationSourceRefresherChore { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule + .forClass(TestMetricsReplicationSourceRefresherChore.class); + + @Test + public void testOldestWalAgeMetricsRefresherCore() { + HRegionServer rs = mock(HRegionServer.class); + ReplicationSource mockSource = Mockito.mock(ReplicationSource.class); + MetricsSource mockMetricsSource = Mockito.mock(MetricsSource.class); + Mockito.when(mockSource.getSourceMetrics()).thenReturn(mockMetricsSource); + ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(manualEdge); + MetricsReplicationSourceRefresherChore mrsrChore = new MetricsReplicationSourceRefresherChore(10, rs, mockSource); + + manualEdge.setValue(10); + final Path log1 = new Path("log-walgroup-a.8"); + String walGroupId1 = "fake-walgroup-id-1"; + Map> queues = new ConcurrentHashMap<>(); + PriorityBlockingQueue queue = new PriorityBlockingQueue<>(1, new AbstractFSWALProvider.WALStartTimeComparator()); + queue.put(log1); + queues.put(walGroupId1, queue); + Mockito.when(mockSource.getQueues()).thenReturn(queues); + mrsrChore.chore(); + verify(mockMetricsSource, times(1)).setOldestWalAge(2); + + manualEdge.setValue(20); + final Path log2 = new Path("log-walgroup-b.8"); + String walGroupId2 = "fake-walgroup-id-2"; + queue.put(log2); + queues.put(walGroupId2, queue); + mrsrChore.chore(); + verify(mockMetricsSource, times(1)).setOldestWalAge(12); + + } +} From 46750502be3d457b1bc2d96e70106cc8c85d0ccb Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 22 Jul 2021 14:50:22 +0530 Subject: [PATCH 05/12] fix indentation --- .../hbase/regionserver/HRegionServer.java | 2 +- ...etricsReplicationSourceRefresherChore.java | 7 +- ...etricsReplicationSourceRefresherChore.java | 72 ++++++++++--------- 3 files changed, 41 insertions(+), 40 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index c697604aa49e..7783bff32c49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -147,7 +147,6 @@ import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.SecurityConstants; @@ -2138,6 +2137,7 @@ private void startServices() throws IOException { if (this.slowLogTableOpsChore != null) { choreService.scheduleChore(slowLogTableOpsChore); } + // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java index 0e7352cfc470..c6150623287a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java @@ -38,9 +38,9 @@ protected void chore() { this.metrics.setOldestWalAge(getOldestWalAge()); } - /* + /* Returns the age of oldest wal. - */ + */ long getOldestWalAge() { long now = EnvironmentEdgeManager.currentTime(); long timestamp = getOldestWalTimestamp(); @@ -52,11 +52,10 @@ long getOldestWalAge() { long age = now - timestamp; return age; } - /* Get the oldest wal timestamp from all the queues. */ - private long getOldestWalTimestamp() { + private long getOldestWalTimestamp() { long oldestWalTimestamp = Long.MAX_VALUE; for (Map.Entry> entry : this.replicationSource.getQueues().entrySet()) { PriorityBlockingQueue queue = entry.getValue(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java index 618bc57af096..c5b5c23ab4ba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java @@ -21,41 +21,43 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; -@Category({SmallTests.class}) +@Category({ SmallTests.class }) public class TestMetricsReplicationSourceRefresherChore { - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule - .forClass(TestMetricsReplicationSourceRefresherChore.class); - - @Test - public void testOldestWalAgeMetricsRefresherCore() { - HRegionServer rs = mock(HRegionServer.class); - ReplicationSource mockSource = Mockito.mock(ReplicationSource.class); - MetricsSource mockMetricsSource = Mockito.mock(MetricsSource.class); - Mockito.when(mockSource.getSourceMetrics()).thenReturn(mockMetricsSource); - ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(manualEdge); - MetricsReplicationSourceRefresherChore mrsrChore = new MetricsReplicationSourceRefresherChore(10, rs, mockSource); - - manualEdge.setValue(10); - final Path log1 = new Path("log-walgroup-a.8"); - String walGroupId1 = "fake-walgroup-id-1"; - Map> queues = new ConcurrentHashMap<>(); - PriorityBlockingQueue queue = new PriorityBlockingQueue<>(1, new AbstractFSWALProvider.WALStartTimeComparator()); - queue.put(log1); - queues.put(walGroupId1, queue); - Mockito.when(mockSource.getQueues()).thenReturn(queues); - mrsrChore.chore(); - verify(mockMetricsSource, times(1)).setOldestWalAge(2); - - manualEdge.setValue(20); - final Path log2 = new Path("log-walgroup-b.8"); - String walGroupId2 = "fake-walgroup-id-2"; - queue.put(log2); - queues.put(walGroupId2, queue); - mrsrChore.chore(); - verify(mockMetricsSource, times(1)).setOldestWalAge(12); - - } + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMetricsReplicationSourceRefresherChore.class); + + @Test + public void testOldestWalAgeMetricsRefresherCore() { + HRegionServer rs = mock(HRegionServer.class); + ReplicationSource mockSource = Mockito.mock(ReplicationSource.class); + MetricsSource mockMetricsSource = Mockito.mock(MetricsSource.class); + Mockito.when(mockSource.getSourceMetrics()).thenReturn(mockMetricsSource); + ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(manualEdge); + MetricsReplicationSourceRefresherChore mrsrChore = + new MetricsReplicationSourceRefresherChore(10, rs, mockSource); + + manualEdge.setValue(10); + final Path log1 = new Path("log-walgroup-a.8"); + String walGroupId1 = "fake-walgroup-id-1"; + Map> queues = new ConcurrentHashMap<>(); + PriorityBlockingQueue queue = + new PriorityBlockingQueue<>(1, new AbstractFSWALProvider.WALStartTimeComparator()); + queue.put(log1); + queues.put(walGroupId1, queue); + Mockito.when(mockSource.getQueues()).thenReturn(queues); + mrsrChore.chore(); + verify(mockMetricsSource, times(1)).setOldestWalAge(2); + + manualEdge.setValue(20); + final Path log2 = new Path("log-walgroup-b.8"); + String walGroupId2 = "fake-walgroup-id-2"; + queue.put(log2); + queues.put(walGroupId2, queue); + mrsrChore.chore(); + verify(mockMetricsSource, times(1)).setOldestWalAge(12); + + } } From 32a77d87600a1afa2abfd869a2795eec3af0c9e2 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 22 Jul 2021 15:08:48 +0530 Subject: [PATCH 06/12] fix test cases --- ...etricsReplicationSourceRefresherChore.java | 102 +++++++++--------- .../regionserver/ReplicationSource.java | 3 +- .../ReplicationSourceInterface.java | 1 - .../replication/ReplicationSourceDummy.java | 1 - ...etricsReplicationSourceRefresherChore.java | 59 +++++----- .../TestReplicationSourceLogQueue.java | 83 ++++++-------- 6 files changed, 119 insertions(+), 130 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java index c6150623287a..fa9a66b8103e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java @@ -11,61 +11,65 @@ import org.apache.yetus.audience.InterfaceAudience; /** - * The Class MetricsSourceRefresherChore for refreshing source metrics + * The Class MetricsReplicationSourceRefresherChore for refreshing age related replication source metrics */ @InterfaceAudience.Private public class MetricsReplicationSourceRefresherChore extends ScheduledChore { - private ReplicationSource replicationSource; + private ReplicationSource replicationSource; - private MetricsSource metrics; - - public static final String DURATION = "hbase.metrics.replication.source.refresher.duration"; - public static final int DEFAULT_DURATION = 60000; + private MetricsSource metrics; - public MetricsReplicationSourceRefresherChore(Stoppable stopper, ReplicationSource replicationSource) { - this(DEFAULT_DURATION, stopper, replicationSource); - } + public static final String DURATION = "hbase.metrics.replication.source.refresher.duration"; + public static final int DEFAULT_DURATION = 60000; - public MetricsReplicationSourceRefresherChore(int duration, Stoppable stopper, ReplicationSource replicationSource) { - super("MetricsSourceRefresherChore", stopper, duration); - this.replicationSource = replicationSource; - this.metrics = this.replicationSource.getSourceMetrics(); - } - - @Override - protected void chore() { - this.metrics.setOldestWalAge(getOldestWalAge()); - } + public MetricsReplicationSourceRefresherChore(Stoppable stopper, + ReplicationSource replicationSource) { + this(DEFAULT_DURATION, stopper, replicationSource); + } - /* - Returns the age of oldest wal. - */ - long getOldestWalAge() { - long now = EnvironmentEdgeManager.currentTime(); - long timestamp = getOldestWalTimestamp(); - if (timestamp == Long.MAX_VALUE) { - // If there are no wals in the queue then set the oldest wal timestamp to current time - // so that the oldest wal age will be 0. - timestamp = now; - } - long age = now - timestamp; - return age; - } - /* - Get the oldest wal timestamp from all the queues. - */ - private long getOldestWalTimestamp() { - long oldestWalTimestamp = Long.MAX_VALUE; - for (Map.Entry> entry : this.replicationSource.getQueues().entrySet()) { - PriorityBlockingQueue queue = entry.getValue(); - Path path = queue.peek(); - // Can path ever be null ? - if (path != null) { - oldestWalTimestamp = Math.min(oldestWalTimestamp, - AbstractFSWALProvider.WALStartTimeComparator.getTS(path)); - } - } - return oldestWalTimestamp; - } + public MetricsReplicationSourceRefresherChore(int duration, Stoppable stopper, + ReplicationSource replicationSource) { + super("MetricsSourceRefresherChore", stopper, duration); + this.replicationSource = replicationSource; + this.metrics = this.replicationSource.getSourceMetrics(); + } + + @Override + protected void chore() { + this.metrics.setOldestWalAge(getOldestWalAge()); + } + + /* + * Returns the age of oldest wal. + */ + long getOldestWalAge() { + long now = EnvironmentEdgeManager.currentTime(); + long timestamp = getOldestWalTimestamp(); + if (timestamp == Long.MAX_VALUE) { + // If there are no wals in the queue then set the oldest wal timestamp to current time + // so that the oldest wal age will be 0. + timestamp = now; + } + long age = now - timestamp; + return age; + } + + /* + * Get the oldest wal timestamp from all the queues. + */ + private long getOldestWalTimestamp() { + long oldestWalTimestamp = Long.MAX_VALUE; + for (Map.Entry> entry : this.replicationSource.getQueues() + .entrySet()) { + PriorityBlockingQueue queue = entry.getValue(); + Path path = queue.peek(); + // Can path ever be null ? + if (path != null) { + oldestWalTimestamp = + Math.min(oldestWalTimestamp, AbstractFSWALProvider.WALStartTimeComparator.getTS(path)); + } + } + return oldestWalTimestamp; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 456d5ad8f72d..789c1b76d937 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.replication.regionserver.Replication.ReplicationStatisticsChore; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -814,7 +813,7 @@ Server getServer() { public ReplicationQueueStorage getReplicationQueueStorage() { return queueStorage; } - + void removeWorker(ReplicationSourceShipper worker) { workerThreads.remove(worker.walGroupId, worker); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index a790ce0d2191..27e4b79c141b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index be6b62b1a6ad..cab01d6fa6e6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java index c5b5c23ab4ba..6d581e893b2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java @@ -30,34 +30,37 @@ public class TestMetricsReplicationSourceRefresherChore { @Test public void testOldestWalAgeMetricsRefresherCore() { - HRegionServer rs = mock(HRegionServer.class); - ReplicationSource mockSource = Mockito.mock(ReplicationSource.class); - MetricsSource mockMetricsSource = Mockito.mock(MetricsSource.class); - Mockito.when(mockSource.getSourceMetrics()).thenReturn(mockMetricsSource); - ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(manualEdge); - MetricsReplicationSourceRefresherChore mrsrChore = - new MetricsReplicationSourceRefresherChore(10, rs, mockSource); - - manualEdge.setValue(10); - final Path log1 = new Path("log-walgroup-a.8"); - String walGroupId1 = "fake-walgroup-id-1"; - Map> queues = new ConcurrentHashMap<>(); - PriorityBlockingQueue queue = - new PriorityBlockingQueue<>(1, new AbstractFSWALProvider.WALStartTimeComparator()); - queue.put(log1); - queues.put(walGroupId1, queue); - Mockito.when(mockSource.getQueues()).thenReturn(queues); - mrsrChore.chore(); - verify(mockMetricsSource, times(1)).setOldestWalAge(2); - - manualEdge.setValue(20); - final Path log2 = new Path("log-walgroup-b.8"); - String walGroupId2 = "fake-walgroup-id-2"; - queue.put(log2); - queues.put(walGroupId2, queue); - mrsrChore.chore(); - verify(mockMetricsSource, times(1)).setOldestWalAge(12); + try { + HRegionServer rs = mock(HRegionServer.class); + ReplicationSource mockSource = Mockito.mock(ReplicationSource.class); + MetricsSource mockMetricsSource = Mockito.mock(MetricsSource.class); + Mockito.when(mockSource.getSourceMetrics()).thenReturn(mockMetricsSource); + ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(manualEdge); + MetricsReplicationSourceRefresherChore mrsrChore = + new MetricsReplicationSourceRefresherChore(10, rs, mockSource); + manualEdge.setValue(10); + final Path log1 = new Path("log-walgroup-a.8"); + String walGroupId1 = "fake-walgroup-id-1"; + Map> queues = new ConcurrentHashMap<>(); + PriorityBlockingQueue queue = + new PriorityBlockingQueue<>(1, new AbstractFSWALProvider.WALStartTimeComparator()); + queue.put(log1); + queues.put(walGroupId1, queue); + Mockito.when(mockSource.getQueues()).thenReturn(queues); + mrsrChore.chore(); + verify(mockMetricsSource, times(1)).setOldestWalAge(2); + + manualEdge.setValue(20); + final Path log2 = new Path("log-walgroup-b.8"); + String walGroupId2 = "fake-walgroup-id-2"; + queue.put(log2); + queues.put(walGroupId2, queue); + mrsrChore.chore(); + verify(mockMetricsSource, times(1)).setOldestWalAge(12); + } finally { + EnvironmentEdgeManager.reset(); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java index 49292b7aa335..ffc605bf72ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java @@ -1,19 +1,12 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. */ package org.apache.hadoop.hbase.replication.regionserver; @@ -25,55 +18,47 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -@Category({SmallTests.class,ReplicationTests.class}) +@Category({ SmallTests.class, ReplicationTests.class }) public class TestReplicationSourceLogQueue { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationSourceLogQueue.class); + HBaseClassTestRule.forClass(TestReplicationSourceLogQueue.class); /* - Testing enqueue and dequeuing of wal and check age of oldest wal. - */ + * Testing enqueue and dequeuing of wal + */ @Test public void testEnqueueDequeue() { - try { - String walGroupId1 = "fake-walgroup-id-1"; - String walGroupId2 = "fake-walgroup-id-2"; + String walGroupId1 = "fake-walgroup-id-1"; + String walGroupId2 = "fake-walgroup-id-2"; - ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(manualEdge); + MetricsSource metrics = new MetricsSource("1"); + Configuration conf = HBaseConfiguration.create(); + ReplicationSource source = mock(ReplicationSource.class); + Mockito.doReturn("peer").when(source).logPeerId(); + ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics, source); + final Path log1 = new Path("log-walgroup-a.8"); - MetricsSource metrics = new MetricsSource("1"); - Configuration conf = HBaseConfiguration.create(); - ReplicationSource source = mock(ReplicationSource.class); - Mockito.doReturn("peer").when(source).logPeerId(); - ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics, source); - final Path log1 = new Path("log-walgroup-a.8"); - manualEdge.setValue(10); - // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. - logQueue.enqueueLog(log1, walGroupId1); + logQueue.enqueueLog(log1, walGroupId1); + assertEquals(1, logQueue.getQueue(walGroupId1).size()); + final Path log2 = new Path("log-walgroup-b.4"); + logQueue.enqueueLog(log2, walGroupId2); + assertEquals(1, logQueue.getQueue(walGroupId2).size()); + assertEquals(2, logQueue.getNumQueues()); - final Path log2 = new Path("log-walgroup-b.4"); - // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 - logQueue.enqueueLog(log2, walGroupId2); - - // Remove an element from walGroupId2. - // After this op, there will be only one element in the queue log-walgroup-a.8 - logQueue.remove(walGroupId2); - - // Remove last element from the queue. - logQueue.remove(walGroupId1); - // This will test the case where there are no elements in the queue. - } finally { - EnvironmentEdgeManager.reset(); - } + // Remove an element from walGroupId2. + // After this op, there will be only one element in the queue log-walgroup-a.8 + logQueue.remove(walGroupId2); + assertEquals(0, logQueue.getQueue(walGroupId2).size()); + // Remove last element from the queue. + logQueue.remove(walGroupId1); + assertEquals(0, logQueue.getQueue(walGroupId1).size()); + // This will test the case where there are no elements in the queue. } } From 5fef9118999476ee17c36d85c2ce8b686ca80273 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 22 Jul 2021 15:16:03 +0530 Subject: [PATCH 07/12] use milliseconds --- ...etricsReplicationSourceRefresherChore.java | 21 +++++++++++++++++-- .../ReplicationSourceLogQueue.java | 3 +-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java index fa9a66b8103e..0629617189bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Map; @@ -21,11 +38,11 @@ public class MetricsReplicationSourceRefresherChore extends ScheduledChore { private MetricsSource metrics; public static final String DURATION = "hbase.metrics.replication.source.refresher.duration"; - public static final int DEFAULT_DURATION = 60000; + public static final int DEFAULT_DURATION_MILLISECONDS = 60000; public MetricsReplicationSourceRefresherChore(Stoppable stopper, ReplicationSource replicationSource) { - this(DEFAULT_DURATION, stopper, replicationSource); + this(DEFAULT_DURATION_MILLISECONDS, stopper, replicationSource); } public MetricsReplicationSourceRefresherChore(int duration, Stoppable stopper, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java index 294c22de02d1..5a6ffbfa70ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java @@ -30,8 +30,7 @@ import org.slf4j.LoggerFactory; /* - Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics - just at one place. + Class that does enqueueing/dequeuing of wal at one place */ @InterfaceAudience.Private @InterfaceStability.Evolving From 6a32ddcede3825eae1dff5057b4143936eace481 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 22 Jul 2021 15:18:08 +0530 Subject: [PATCH 08/12] indent fix --- .../TestReplicationSourceLogQueue.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java index ffc605bf72ed..c46004933c88 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java @@ -1,12 +1,19 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hbase.replication.regionserver; From 9bca6183a811cb6486e0f5c519e7fa02c42d9202 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 22 Jul 2021 16:45:14 +0530 Subject: [PATCH 09/12] fix checks --- .../MetricsReplicationSourceRefresherChore.java | 5 ++++- .../hbase/replication/regionserver/ReplicationSource.java | 6 +++--- .../TestMetricsReplicationSourceRefresherChore.java | 1 - 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java index 0629617189bf..493cf6c3f020 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java @@ -26,11 +26,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; /** - * The Class MetricsReplicationSourceRefresherChore for refreshing age related replication source metrics + * The Class MetricsReplicationSourceRefresherChore for + * refreshing age related replication source metrics */ @InterfaceAudience.Private +@InterfaceStability.Evolving public class MetricsReplicationSourceRefresherChore extends ScheduledChore { private ReplicationSource replicationSource; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 789c1b76d937..c066bc7a5219 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.MetricsReplicationSourceRefresherChore; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; @@ -225,8 +224,9 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); int duration = this.conf.getInt(MetricsReplicationSourceRefresherChore.DURATION, - MetricsReplicationSourceRefresherChore.DEFAULT_DURATION); - this.server.getChoreService().scheduleChore(new MetricsReplicationSourceRefresherChore(duration, server, this)); + MetricsReplicationSourceRefresherChore.DEFAULT_DURATION_MILLISECONDS); + this.server.getChoreService() + .scheduleChore(new MetricsReplicationSourceRefresherChore(duration, server, this)); LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java index 6d581e893b2e..6687b3601500 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java @@ -1,6 +1,5 @@ package org.apache.hadoop.hbase.replication.regionserver; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; From 35c10a4ac068528d5a730616f4a989af6e790382 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 22 Jul 2021 19:13:11 +0530 Subject: [PATCH 10/12] fix checks --- .../MetricsReplicationSourceRefresherChore.java | 4 ++-- .../regionserver/ReplicationSource.java | 2 +- ...tMetricsReplicationSourceRefresherChore.java | 17 +++++++++++++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java index 493cf6c3f020..4abbc7ae38d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceRefresherChore.java @@ -29,8 +29,8 @@ import org.apache.yetus.audience.InterfaceStability; /** - * The Class MetricsReplicationSourceRefresherChore for - * refreshing age related replication source metrics + * The Class MetricsReplicationSourceRefresherChore for refreshing age related replication source + * metrics */ @InterfaceAudience.Private @InterfaceStability.Evolving diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c066bc7a5219..f4d84dfef76d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -224,7 +224,7 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); int duration = this.conf.getInt(MetricsReplicationSourceRefresherChore.DURATION, - MetricsReplicationSourceRefresherChore.DEFAULT_DURATION_MILLISECONDS); + MetricsReplicationSourceRefresherChore.DEFAULT_DURATION_MILLISECONDS); this.server.getChoreService() .scheduleChore(new MetricsReplicationSourceRefresherChore(duration, server, this)); LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java index 6687b3601500..c510e7b397c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetricsReplicationSourceRefresherChore.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.replication.regionserver; import static org.mockito.Mockito.mock; From 4cba9045abfe9fcb0e1d9d210320c3bcd3461fa5 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Fri, 30 Jul 2021 16:43:46 +0530 Subject: [PATCH 11/12] Fix RelicationSOource tests --- .../hadoop/hbase/MockRegionServerServices.java | 2 +- .../regionserver/TestReplicationSource.java | 15 +++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index f0a85b580d4b..d4ddc6140784 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -251,7 +251,7 @@ public ExecutorService getExecutorService() { @Override public ChoreService getChoreService() { - return null; + return new ChoreService(getClass().getSimpleName()); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index f5d4f7782947..9b16e4d50d57 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -265,7 +265,9 @@ public void testTerminateTimeout() throws Exception { testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, manager, null, mockPeer, null, "testPeer", + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + source.init(testConf, null, manager, null, mockPeer, rss, "testPeer", null, p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit( @@ -289,7 +291,9 @@ public void testTerminateClearsBuffer() throws Exception { ReplicationPeer mockPeer = mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); - source.init(testConf, null, mockManager, null, mockPeer, null, + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + source.init(testConf, null, mockManager, null, mockPeer, rss, "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); @@ -635,10 +639,10 @@ public void testAgeOfOldestWal() throws Exception { try { ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(manualEdge); - String id = "1"; MetricsSource metrics = new MetricsSource(id); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(MetricsReplicationSourceRefresherChore.DURATION, 1000); conf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); @@ -653,7 +657,6 @@ public void testAgeOfOldestWal() throws Exception { thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - ReplicationSource source = new ReplicationSource(); source.init(conf, null, manager, null, mockPeer, rss, id, null, p -> OptionalLong.empty(), metrics); @@ -662,12 +665,16 @@ public void testAgeOfOldestWal() throws Exception { manualEdge.setValue(10); // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. source.enqueueLog(log1); + // Sleep for chore to update WAL age + Thread.sleep(1000); MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id); assertEquals(2, metricsSource1.getOldestWalAge()); final Path log2 = new Path(logDir, "log-walgroup-b.4"); // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 source.enqueueLog(log2); + // Sleep for chore to update WAL age + Thread.sleep(1000); assertEquals(6, metricsSource1.getOldestWalAge()); // Clear all metrics. metrics.clear(); From 00f87f7691d9350dae4bea3ae9ab59b7cb0124db Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Fri, 30 Jul 2021 19:53:20 +0530 Subject: [PATCH 12/12] fix tests --- .../hbase/replication/regionserver/ReplicationSource.java | 1 + .../hbase/replication/regionserver/ReplicationSyncUp.java | 2 +- .../replication/regionserver/TestReplicationSource.java | 1 + .../regionserver/TestReplicationSourceLogQueue.java | 8 ++++---- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index f4d84dfef76d..25e6f564612b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -223,6 +223,7 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); + int duration = this.conf.getInt(MetricsReplicationSourceRefresherChore.DURATION, MetricsReplicationSourceRefresherChore.DEFAULT_DURATION_MILLISECONDS); this.server.getChoreService() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index a6362f99d3a9..e90d5e6d264e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -162,7 +162,7 @@ public Connection getConnection() { @Override public ChoreService getChoreService() { - return null; + return new ChoreService(getClass().getSimpleName()); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 9b16e4d50d57..6759ec771aac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -639,6 +639,7 @@ public void testAgeOfOldestWal() throws Exception { try { ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(manualEdge); + String id = "1"; MetricsSource metrics = new MetricsSource(id); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java index c46004933c88..9724f1d03907 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java @@ -30,16 +30,16 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; -@Category({ SmallTests.class, ReplicationTests.class }) +@Category({SmallTests.class,ReplicationTests.class}) public class TestReplicationSourceLogQueue { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationSourceLogQueue.class); + HBaseClassTestRule.forClass(TestReplicationSourceLogQueue.class); /* - * Testing enqueue and dequeuing of wal - */ + Testing enqueue and dequeuing of wal + */ @Test public void testEnqueueDequeue() { String walGroupId1 = "fake-walgroup-id-1";