From b4956823c3ef7e0a7ef546522daade44b1786ac2 Mon Sep 17 00:00:00 2001 From: guoxiaojiao1 Date: Thu, 13 Apr 2023 19:52:16 +0800 Subject: [PATCH] HBASE-27763 Recover WAL encounter KeeperErrorCode = NoNode cause RegionServer crash --- .../RecoveredReplicationSource.java | 29 +++++++++++++++++++ .../regionserver/ReplicationSource.java | 10 +++++-- .../ReplicationSourceShipper.java | 8 +++++ 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 024248a3f8c9..df76f4d4bc8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.PriorityBlockingQueue; @@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -162,4 +164,31 @@ public ServerName getServerWALsBelongTo() { public boolean isRecovered() { return true; } + + @Override + public void startShipperWorks() { + List startWorkerThreads = new ArrayList<>(); + // 1 create shippers and add them into workerThreads. + for (String walGroupId : logQueue.getQueues().keySet()) { + // maybe shipper related to walGroupId has run. + if (workerThreads.get(walGroupId) == null) { + ReplicationSourceShipper worker = createNewShipper(walGroupId); + ReplicationSourceWALReader walReader = createNewWALReader(walGroupId, worker.getStartPosition()); + worker.setWALReader(walReader); + LOG.info("Recover queueId {} walGroup {}, create replication shipper and " + + "add to workerThreads but not run.", queueId, walGroupId); + workerThreads.put(walGroupId, worker); + startWorkerThreads.add(worker); + } + } + + // 2 run readers and shippers + for (ReplicationSourceShipper worker : startWorkerThreads) { + Threads.setDaemonThreadRunning(worker.getWALReader(), + Thread.currentThread().getName() + ".replicationSource.wal-reader." + + worker.getWalGroupId() + "," + queueId + , (t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 74a430a7f382..93d6dfb85e36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -405,7 +405,7 @@ protected ReplicationSourceShipper createNewShipper(String walGroupId) { return new ReplicationSourceShipper(conf, walGroupId, logQueue, this); } - private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) { + protected ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) { return replicationPeer.getPeerConfig().isSerial() ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, this, walGroupId) @@ -421,7 +421,7 @@ WALEntryFilter getWalEntryFilter() { return walEntryFilter; } - private void uncaughtException(Thread t, Throwable e, ReplicationSourceManager manager, + protected void uncaughtException(Thread t, Throwable e, ReplicationSourceManager manager, String peerId) { OOMEChecker.exitIfOOME(e, getClass().getSimpleName()); LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e); @@ -574,10 +574,14 @@ private void initialize() { this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId, peerClusterId); initializeWALEntryFilter(peerClusterId); // Start workers + startShipperWorks(); + this.startupOngoing.set(false); + } + + protected void startShipperWorks() { for (String walGroupId : logQueue.getQueues().keySet()) { tryStartNewShipper(walGroupId); } - setSourceStartupStatus(false); } private synchronized void setSourceStartupStatus(boolean initializing) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 16c54191c2b6..7b68e9a569cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -94,6 +94,14 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); } + public ReplicationSourceWALReader getWALReader() { + return entryReader; + } + + public String getWalGroupId(){ + return walGroupId; + } + @Override public final void run() { setWorkerState(WorkerState.RUNNING);