diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 6d1ffb44d982..05a0c2cf649a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -828,10 +828,12 @@ public void setFailureResultForNonce(NonceKey nonceKey, String procName, User pr return; } - Procedure proc = - new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception); + completed.computeIfAbsent(procId, (key) -> { + Procedure proc = new FailedProcedure<>(procId.longValue(), + procName, procOwner, nonceKey, exception); - completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc)); + return new CompletedProcedureRetainer<>(proc); + }); } // ========================================================================== diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 02f6d7d524ce..7d1bd4e228b4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -147,8 +147,7 @@ public void join() { */ public void addNode(final TRemote key) { assert key != null: "Tried to add a node with a null key"; - final BufferNode newNode = new BufferNode(key); - nodeMap.putIfAbsent(key, newNode); + nodeMap.computeIfAbsent(key, k -> new BufferNode(k)); } /** diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index f3ef4fb96d2a..e6bf69ec5920 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -581,9 +581,9 @@ private Map rsGroupGetRegionsInTransition(String groupName) ServerName currServer = entry.getValue(); RegionInfo currRegion = entry.getKey(); if (rsGroupInfo.getTables().contains(currTable)) { - assignments.putIfAbsent(currTable, new HashMap<>()); - assignments.get(currTable).putIfAbsent(currServer, new ArrayList<>()); - assignments.get(currTable).get(currServer).add(currRegion); + assignments.computeIfAbsent(currTable, key -> new HashMap<>()) + .computeIfAbsent(currServer, key -> new ArrayList<>()) + .add(currRegion); } } @@ -595,12 +595,16 @@ private Map rsGroupGetRegionsInTransition(String groupName) } // add all tables that are members of the group - for(TableName tableName : rsGroupInfo.getTables()) { - if(assignments.containsKey(tableName)) { - result.put(tableName, new HashMap<>()); - result.get(tableName).putAll(serverMap); - result.get(tableName).putAll(assignments.get(tableName)); - LOG.debug("Adding assignments for {}: {}", tableName, assignments.get(tableName)); + for (TableName tableName : rsGroupInfo.getTables()) { + if (assignments.containsKey(tableName)) { + Map> tableResults = new HashMap<>(serverMap); + + Map> tableAssignments = assignments.get(tableName); + tableResults.putAll(tableAssignments); + + result.put(tableName, tableResults); + + LOG.debug("Adding assignments for {}: {}", tableName, tableAssignments); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index ea788acc8c54..c12c30aad67f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -63,7 +63,7 @@ public class ExecutorService { private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class); // hold the all the executors created in a map addressable by their names - private final ConcurrentHashMap executorMap = new ConcurrentHashMap<>(); + private final ConcurrentMap executorMap = new ConcurrentHashMap<>(); // Name of the server hosting this executor service. private final String servername; @@ -87,18 +87,18 @@ public ExecutorService(final String servername) { */ @VisibleForTesting public void startExecutorService(String name, int maxThreads) { - if (this.executorMap.get(name) != null) { - throw new RuntimeException("An executor service with the name " + name + - " is already running!"); - } - Executor hbes = new Executor(name, maxThreads); - if (this.executorMap.putIfAbsent(name, hbes) != null) { - throw new RuntimeException("An executor service with the name " + name + - " is already running (2)!"); - } - LOG.debug("Starting executor service name=" + name + - ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() + - ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize()); + Executor hbes = this.executorMap.compute(name, (key, value) -> { + if (value != null) { + throw new RuntimeException("An executor service with the name " + key + + " is already running!"); + } + return new Executor(key, maxThreads); + }); + + LOG.debug( + "Starting executor service name={}, corePoolSize={}, maxPoolSize={}", + name, hbes.threadPoolExecutor.getCorePoolSize(), + hbes.threadPoolExecutor.getMaximumPoolSize()); } boolean isExecutorServiceRunning(String name) { @@ -134,7 +134,8 @@ public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) { public void startExecutorService(final ExecutorType type, final int maxThreads) { String name = type.getExecutorName(this.servername); if (isExecutorServiceRunning(name)) { - LOG.debug("Executor service " + toString() + " already running on " + this.servername); + LOG.debug("Executor service {} already running on {}", this, + this.servername); return; } startExecutorService(name, maxThreads); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java index c5ad8676756c..9f204343bc08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java @@ -161,9 +161,8 @@ private void prepareTableToReopenRegionsMap( } LOG.warn("Region {} for Table {} has high storeFileRefCount {}, considering it for reopen..", regionInfo.getRegionNameAsString(), tableName, regionStoreRefCount); - tableToReopenRegionsMap.putIfAbsent(tableName, new ArrayList<>()); - tableToReopenRegionsMap.get(tableName).add(regionName); - + tableToReopenRegionsMap + .computeIfAbsent(tableName, (key) -> new ArrayList<>()).add(regionName); } // hashcode/equals implementation to ensure at-most one object of RegionsRecoveryChore diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java index 6654620fee78..f245500ce886 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java @@ -112,9 +112,8 @@ public boolean isRegionInRegionStates(final RegionInfo hri) { // ========================================================================== @VisibleForTesting RegionStateNode createRegionStateNode(RegionInfo regionInfo) { - RegionStateNode newNode = new RegionStateNode(regionInfo, regionInTransition); - RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode); - return oldNode != null ? oldNode : newNode; + return regionsMap.computeIfAbsent(regionInfo.getRegionName(), + key -> new RegionStateNode(regionInfo, regionInTransition)); } public RegionStateNode getOrCreateRegionStateNode(RegionInfo regionInfo) { @@ -556,7 +555,7 @@ public Map>> getAssignmentsForBalanc // Add online servers with no assignment for the table. for (Map> table : result.values()) { for (ServerName serverName : serverMap.keySet()) { - table.putIfAbsent(serverName, new ArrayList<>()); + table.computeIfAbsent(serverName, key -> new ArrayList<>()); } } } else { @@ -677,13 +676,7 @@ public Exception getException() { public RegionFailedOpen addToFailedOpen(final RegionStateNode regionNode) { final byte[] key = regionNode.getRegionInfo().getRegionName(); - RegionFailedOpen node = regionFailedOpen.get(key); - if (node == null) { - RegionFailedOpen newNode = new RegionFailedOpen(regionNode); - RegionFailedOpen oldNode = regionFailedOpen.putIfAbsent(key, newNode); - node = oldNode != null ? oldNode : newNode; - } - return node; + return regionFailedOpen.computeIfAbsent(key, (k) -> new RegionFailedOpen(regionNode)); } public RegionFailedOpen getFailedOpen(final RegionInfo regionInfo) { @@ -714,13 +707,7 @@ public List getRegionFailedOpen() { * to {@link #getServerNode(ServerName)} where we can. */ public ServerStateNode getOrCreateServer(final ServerName serverName) { - ServerStateNode node = serverMap.get(serverName); - if (node == null) { - node = new ServerStateNode(serverName); - ServerStateNode oldNode = serverMap.putIfAbsent(serverName, node); - node = oldNode != null ? oldNode : node; - } - return node; + return serverMap.computeIfAbsent(serverName, key -> new ServerStateNode(key)); } public void removeServer(final ServerName serverName) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java index 5b6d8c14bf9c..bbe53b267afa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java @@ -18,6 +18,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.conf.Configuration; @@ -36,7 +37,7 @@ public final class FileArchiverNotifierFactoryImpl implements FileArchiverNotifi private static final FileArchiverNotifierFactoryImpl DEFAULT_INSTANCE = new FileArchiverNotifierFactoryImpl(); private static volatile FileArchiverNotifierFactory CURRENT_INSTANCE = DEFAULT_INSTANCE; - private final ConcurrentHashMap CACHE; + private final ConcurrentMap CACHE; private FileArchiverNotifierFactoryImpl() { CACHE = new ConcurrentHashMap<>(); @@ -62,15 +63,10 @@ static void reset() { * @param tn The table to obtain a notifier for * @return The notifier for the given {@code tablename}. */ - public FileArchiverNotifier get( - Connection conn, Configuration conf, FileSystem fs, TableName tn) { + public FileArchiverNotifier get(Connection conn, Configuration conf, FileSystem fs, + TableName tn) { // Ensure that only one instance is exposed to callers - final FileArchiverNotifier newMapping = new FileArchiverNotifierImpl(conn, conf, fs, tn); - final FileArchiverNotifier previousMapping = CACHE.putIfAbsent(tn, newMapping); - if (previousMapping == null) { - return newMapping; - } - return previousMapping; + return CACHE.computeIfAbsent(tn, key -> new FileArchiverNotifierImpl(conn, conf, fs, key)); } public int getCacheSize() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index ce26366b6f98..1c97b2012c09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ScheduledChore; @@ -69,10 +70,10 @@ public class QuotaCache implements Stoppable { // for testing purpose only, enforce the cache to be always refreshed static boolean TEST_FORCE_REFRESH = false; - private final ConcurrentHashMap namespaceQuotaCache = new ConcurrentHashMap<>(); - private final ConcurrentHashMap tableQuotaCache = new ConcurrentHashMap<>(); - private final ConcurrentHashMap userQuotaCache = new ConcurrentHashMap<>(); - private final ConcurrentHashMap regionServerQuotaCache = + private final ConcurrentMap namespaceQuotaCache = new ConcurrentHashMap<>(); + private final ConcurrentMap tableQuotaCache = new ConcurrentHashMap<>(); + private final ConcurrentMap userQuotaCache = new ConcurrentHashMap<>(); + private final ConcurrentMap regionServerQuotaCache = new ConcurrentHashMap<>(); private volatile boolean exceedThrottleQuotaEnabled = false; // factors used to divide cluster scope quota into machine scope quota @@ -174,7 +175,7 @@ protected boolean isExceedThrottleQuotaEnabled() { * Returns the QuotaState requested. If the quota info is not in cache an empty one will be * returned and the quota request will be enqueued for the next cache refresh. */ - private QuotaState getQuotaState(final ConcurrentHashMap quotasMap, + private QuotaState getQuotaState(final ConcurrentMap quotasMap, final K key) { return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh); } @@ -223,17 +224,18 @@ public QuotaRefresherChore(final int period, final Stoppable stoppable) { protected void chore() { // Prefetch online tables/namespaces for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) { - if (table.isSystemTable()) continue; - if (!QuotaCache.this.tableQuotaCache.containsKey(table)) { - QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState()); - } - String ns = table.getNamespaceAsString(); - if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) { - QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState()); + if (table.isSystemTable()) { + continue; } + QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState()); + + final String ns = table.getNamespaceAsString(); + + QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState()); } - QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, - new QuotaState()); + + QuotaCache.this.regionServerQuotaCache.computeIfAbsent( + QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState()); updateQuotaFactors(); fetchNamespaceQuotaState(); @@ -319,7 +321,7 @@ private void fetchExceedThrottleQuota() { } private void fetch(final String type, - final ConcurrentHashMap quotasMap, final Fetcher fetcher) { + final ConcurrentMap quotasMap, final Fetcher fetcher) { long now = EnvironmentEdgeManager.currentTime(); long refreshPeriod = getPeriod(); long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java index 71fd89b5c957..ee6db3110bb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -78,7 +79,7 @@ public class StoreHotnessProtector { private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; - private final Map preparePutToStoreMap = + private final ConcurrentMap preparePutToStoreMap = new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); private final Region region; @@ -101,7 +102,7 @@ public void init(Configuration conf) { public void update(Configuration conf) { init(conf); preparePutToStoreMap.clear(); - LOG.debug("update config: " + toString()); + LOG.debug("update config: {}", this); } public void start(Map> familyMaps) throws RegionTooBusyException { @@ -121,13 +122,9 @@ public void start(Map> familyMaps) throws RegionTooBusyExcept //we need to try to add #preparePutCount at first because preparePutToStoreMap will be //cleared when changing the configuration. - preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger()); - AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey()); - if (preparePutCounter == null) { - preparePutCounter = new AtomicInteger(); - preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter); - } - int preparePutCount = preparePutCounter.incrementAndGet(); + int preparePutCount = preparePutToStoreMap + .computeIfAbsent(e.getKey(), key -> new AtomicInteger()) + .incrementAndGet(); if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit || preparePutCount > this.parallelPreparePutToStoreThreadLimit) { tooBusyStore = (tooBusyStore == null ? @@ -146,9 +143,7 @@ public void start(Map> familyMaps) throws RegionTooBusyExcept String msg = "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")"; - if (LOG.isTraceEnabled()) { - LOG.trace(msg); - } + LOG.trace(msg); throw new RegionTooBusyException(msg); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 683a9ab8b558..32739b1b4318 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -305,24 +305,30 @@ private void initializeWALEntryFilter(UUID peerClusterId) { } private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { - ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); - ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); - if (extant != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(), - walGroupId); - } - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId); + workerThreads.compute(walGroupId, (key, value) -> { + if (value != null) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "{} Someone has beat us to start a worker thread for wal group {}", + logPeerId(), key); + } + return value; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key); + } + ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); + ReplicationSourceWALReader walReader = + createNewWALReader(walGroupId, queue, worker.getStartPosition()); + Threads.setDaemonThreadRunning( + walReader, Thread.currentThread().getName() + + ".replicationSource.wal-reader." + walGroupId + "," + queueId, + this::uncaughtException); + worker.setWALReader(walReader); + worker.startup(this::uncaughtException); + return worker; } - ReplicationSourceWALReader walReader = - createNewWALReader(walGroupId, queue, worker.getStartPosition()); - Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() + - ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException); - worker.setWALReader(walReader); - worker.startup(this::uncaughtException); - } + }); } @Override