Skip to content

Commit 3637bbb

Browse files
committed
HBASE-22261 Make use of ClusterStatusListener for async client
1 parent 353f922 commit 3637bbb

File tree

5 files changed

+172
-3
lines changed

5 files changed

+172
-3
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
21+
import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
22+
import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
23+
import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
2024
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
2125
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
2226
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
@@ -107,6 +111,8 @@ class AsyncConnectionImpl implements AsyncConnection {
107111

108112
private final Optional<MetricsConnection> metrics;
109113

114+
private final ClusterStatusListener clusterStatusListener;
115+
110116
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
111117
User user) {
112118
this.conf = conf;
@@ -133,6 +139,31 @@ public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String cl
133139
} else {
134140
nonceGenerator = NO_NONCE_GENERATOR;
135141
}
142+
ClusterStatusListener listener = null;
143+
if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
144+
// XXX: this maybe a blocking operation, better to create it outside the constructor and pass
145+
// it in, just like clusterId. Not a big problem for now as the default value is false.
146+
Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
147+
STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
148+
if (listenerClass == null) {
149+
LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
150+
} else {
151+
try {
152+
listener = new ClusterStatusListener(
153+
new ClusterStatusListener.DeadServerHandler() {
154+
@Override
155+
public void newDead(ServerName sn) {
156+
locator.clearCache(sn);
157+
rpcClient.cancelConnections(sn);
158+
}
159+
}, conf, listenerClass);
160+
} catch (IOException e) {
161+
LOG.warn("Failed to create ClusterStatusListener, not a critical problem, ignoring...",
162+
e);
163+
}
164+
}
165+
}
166+
this.clusterStatusListener = listener;
136167
}
137168

138169
private void spawnRenewalChore(final UserGroupInformation user) {
@@ -152,6 +183,7 @@ public void close() {
152183
if (closed) {
153184
return;
154185
}
186+
IOUtils.closeQuietly(clusterStatusListener);
155187
IOUtils.closeQuietly(rpcClient);
156188
IOUtils.closeQuietly(registry);
157189
if (authService != null) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.atomic.AtomicReference;
2929
import org.apache.hadoop.hbase.HRegionLocation;
3030
import org.apache.hadoop.hbase.RegionLocations;
31+
import org.apache.hadoop.hbase.ServerName;
3132
import org.apache.yetus.audience.InterfaceAudience;
3233

3334
/**
@@ -113,4 +114,23 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
113114
void clearCache() {
114115
metaRegionLocations.set(null);
115116
}
117+
118+
void clearCache(ServerName serverName) {
119+
for (;;) {
120+
RegionLocations locs = metaRegionLocations.get();
121+
if (locs == null) {
122+
return;
123+
}
124+
RegionLocations newLocs = locs.removeByServer(serverName);
125+
if (locs == newLocs) {
126+
return;
127+
}
128+
if (newLocs.isEmpty()) {
129+
newLocs = null;
130+
}
131+
if (metaRegionLocations.compareAndSet(locs, newLocs)) {
132+
return;
133+
}
134+
}
135+
}
116136
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.hbase.HRegionLocation;
5151
import org.apache.hadoop.hbase.MetaTableAccessor;
5252
import org.apache.hadoop.hbase.RegionLocations;
53+
import org.apache.hadoop.hbase.ServerName;
5354
import org.apache.hadoop.hbase.TableName;
5455
import org.apache.hadoop.hbase.TableNotFoundException;
5556
import org.apache.hadoop.hbase.client.Scan.ReadType;
@@ -617,6 +618,24 @@ void clearCache() {
617618
cache.clear();
618619
}
619620

621+
void clearCache(ServerName serverName) {
622+
for (TableCache tableCache : cache.values()) {
623+
for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
624+
byte[] regionName = entry.getKey();
625+
RegionLocations locs = entry.getValue();
626+
RegionLocations newLocs = locs.removeByServer(serverName);
627+
if (locs == newLocs) {
628+
continue;
629+
}
630+
if (newLocs.isEmpty()) {
631+
tableCache.cache.remove(regionName, locs);
632+
} else {
633+
tableCache.cache.replace(regionName, locs, newLocs);
634+
}
635+
}
636+
}
637+
}
638+
620639
// only used for testing whether we have cached the location for a region.
621640
@VisibleForTesting
622641
RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.function.Supplier;
2626
import org.apache.hadoop.hbase.HRegionLocation;
2727
import org.apache.hadoop.hbase.RegionLocations;
28+
import org.apache.hadoop.hbase.ServerName;
2829
import org.apache.hadoop.hbase.TableName;
2930
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
3031
import org.apache.hadoop.hbase.util.Bytes;
@@ -33,6 +34,7 @@
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

37+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
3638
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
3739
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
3840

@@ -46,11 +48,14 @@ class AsyncRegionLocator {
4648

4749
private final HashedWheelTimer retryTimer;
4850

51+
private final AsyncConnectionImpl conn;
52+
4953
private final AsyncMetaRegionLocator metaRegionLocator;
5054

5155
private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
5256

5357
AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
58+
this.conn = conn;
5459
this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
5560
this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
5661
this.retryTimer = retryTimer;
@@ -150,18 +155,28 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
150155
}
151156

152157
void clearCache(TableName tableName) {
153-
if (LOG.isDebugEnabled()) {
154-
LOG.debug("Clear meta cache for " + tableName);
155-
}
158+
LOG.debug("Clear meta cache for {}", tableName);
156159
if (tableName.equals(META_TABLE_NAME)) {
157160
metaRegionLocator.clearCache();
158161
} else {
159162
nonMetaRegionLocator.clearCache(tableName);
160163
}
161164
}
162165

166+
void clearCache(ServerName serverName) {
167+
LOG.debug("Clear meta cache for {}", serverName);
168+
metaRegionLocator.clearCache(serverName);
169+
nonMetaRegionLocator.clearCache(serverName);
170+
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
171+
}
172+
163173
void clearCache() {
164174
metaRegionLocator.clearCache();
165175
nonMetaRegionLocator.clearCache();
166176
}
177+
178+
@VisibleForTesting
179+
AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
180+
return nonMetaRegionLocator;
181+
}
167182
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import static org.junit.Assert.*;
21+
import java.io.IOException;
22+
import org.apache.hadoop.hbase.HBaseClassTestRule;
23+
import org.apache.hadoop.hbase.HBaseTestingUtility;
24+
import org.apache.hadoop.hbase.HConstants;
25+
import org.apache.hadoop.hbase.ServerName;
26+
import org.apache.hadoop.hbase.TableName;
27+
import org.apache.hadoop.hbase.testclassification.ClientTests;
28+
import org.apache.hadoop.hbase.testclassification.MediumTests;
29+
import org.apache.hadoop.hbase.util.Bytes;
30+
import org.junit.AfterClass;
31+
import org.junit.BeforeClass;
32+
import org.junit.ClassRule;
33+
import org.junit.Test;
34+
import org.junit.experimental.categories.Category;
35+
36+
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
37+
38+
@Category({ MediumTests.class, ClientTests.class })
39+
public class TestAsyncTableRSCrashPublish {
40+
41+
@ClassRule
42+
public static final HBaseClassTestRule CLASS_RULE =
43+
HBaseClassTestRule.forClass(TestAsyncTableRSCrashPublish.class);
44+
45+
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
46+
47+
private static AsyncConnectionImpl CONN;
48+
49+
private static TableName TABLE_NAME = TableName.valueOf("Publish");
50+
51+
private static byte[] FAMILY = Bytes.toBytes("family");
52+
53+
@BeforeClass
54+
public static void setUp() throws Exception {
55+
UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
56+
UTIL.startMiniCluster(2);
57+
UTIL.createTable(TABLE_NAME, FAMILY);
58+
UTIL.waitTableAvailable(TABLE_NAME);
59+
CONN =
60+
(AsyncConnectionImpl) ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
61+
}
62+
63+
@AfterClass
64+
public static void tearDown() throws Exception {
65+
Closeables.close(CONN, true);
66+
UTIL.shutdownMiniCluster();
67+
}
68+
69+
@Test
70+
public void test() throws IOException {
71+
AsyncNonMetaRegionLocator locator = CONN.getLocator().getNonMetaRegionLocator();
72+
CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
73+
ServerName serverName = locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
74+
.getDefaultRegionLocation().getServerName();
75+
UTIL.getMiniHBaseCluster().stopRegionServer(serverName);
76+
UTIL.waitFor(60000,
77+
() -> locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW) == null);
78+
CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
79+
assertNotEquals(serverName,
80+
locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
81+
.getDefaultRegionLocation().getServerName());
82+
}
83+
}

0 commit comments

Comments
 (0)