Skip to content

Commit b9345a5

Browse files
authored
HBASE-27923 NettyRpcServer may hange if it should skip initial sasl handshake (#5282)
Signed-off-by: Duo Zhang <[email protected]> Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent b6aef44 commit b9345a5

File tree

7 files changed

+207
-14
lines changed

7 files changed

+207
-14
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,20 @@ private void tryComplete(ChannelHandlerContext ctx) {
8787
return;
8888
}
8989

90-
ChannelPipeline p = ctx.pipeline();
91-
saslRpcClient.setupSaslHandler(p, HANDLER_NAME);
92-
p.remove(SaslChallengeDecoder.class);
93-
p.remove(this);
90+
saslRpcClient.setupSaslHandler(ctx.pipeline(), HANDLER_NAME);
91+
removeHandlers(ctx);
9492

9593
setCryptoAESOption();
9694

9795
saslPromise.setSuccess(true);
9896
}
9997

98+
private void removeHandlers(ChannelHandlerContext ctx) {
99+
ChannelPipeline p = ctx.pipeline();
100+
p.remove(SaslChallengeDecoder.class);
101+
p.remove(this);
102+
}
103+
100104
private void setCryptoAESOption() {
101105
boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop()
102106
.equalsIgnoreCase(saslRpcClient.getSaslQOP());
@@ -142,6 +146,9 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep
142146
} else {
143147
saslPromise.tryFailure(new FallbackDisallowedException());
144148
}
149+
// When we switch to simple auth, we should also remove SaslChallengeDecoder and
150+
// NettyHBaseSaslRpcClientHandler.
151+
removeHandlers(ctx);
145152
return;
146153
}
147154
LOG.trace("Reading input token size={} for processing by initSASLContext", len);

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyHBaseSaslRpcServerHandler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,11 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep
8989
boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
9090
ChannelPipeline p = ctx.pipeline();
9191
if (useWrap) {
92-
p.addBefore(DECODER_NAME, null, new SaslWrapHandler(saslServer::wrap)).addLast(
93-
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
94-
new SaslUnwrapHandler(saslServer::unwrap));
92+
p.addBefore(DECODER_NAME, null, new SaslWrapHandler(saslServer::wrap))
93+
.addBefore(NettyRpcServerResponseEncoder.NAME, null,
94+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
95+
.addBefore(NettyRpcServerResponseEncoder.NAME, null,
96+
new SaslUnwrapHandler(saslServer::unwrap));
9597
}
9698
conn.setupHandler();
9799
p.remove(this);

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,11 @@ protected void initChannel(Channel ch) throws Exception {
132132
initSSL(pipeline, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
133133
}
134134
pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, preambleDecoder)
135-
.addLast(createNettyRpcServerPreambleHandler());
135+
.addLast(createNettyRpcServerPreambleHandler())
136+
// We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may
137+
// send RpcResponse to client.
138+
.addLast(NettyRpcServerResponseEncoder.NAME,
139+
new NettyRpcServerResponseEncoder(metrics));
136140
}
137141
});
138142
try {

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep
5858
LengthFieldBasedFrameDecoder decoder =
5959
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4);
6060
decoder.setSingleDecode(true);
61-
p.addLast(NettyHBaseSaslRpcServerHandler.DECODER_NAME, decoder);
62-
p.addLast(new NettyHBaseSaslRpcServerHandler(rpcServer, conn));
61+
p.addBefore(NettyRpcServerResponseEncoder.NAME, NettyHBaseSaslRpcServerHandler.DECODER_NAME,
62+
decoder).addBefore(NettyRpcServerResponseEncoder.NAME, null,
63+
new NettyHBaseSaslRpcServerHandler(rpcServer, conn));
6364
} else {
6465
conn.setupHandler();
6566
}

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
@InterfaceAudience.Private
3232
class NettyRpcServerResponseEncoder extends ChannelOutboundHandlerAdapter {
3333

34+
static final String NAME = "NettyRpcServerResponseEncoder";
35+
3436
private final MetricsHBaseServer metrics;
3537

3638
NettyRpcServerResponseEncoder(MetricsHBaseServer metrics) {

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ class NettyServerRpcConnection extends ServerRpcConnection {
7171

7272
void setupHandler() {
7373
channel.pipeline()
74-
.addLast("frameDecoder", new NettyRpcFrameDecoder(rpcServer.maxRequestSize, this))
75-
.addLast("decoder", new NettyRpcServerRequestDecoder(rpcServer.metrics, this))
76-
.addLast("encoder", new NettyRpcServerResponseEncoder(rpcServer.metrics));
74+
.addBefore(NettyRpcServerResponseEncoder.NAME, "frameDecoder",
75+
new NettyRpcFrameDecoder(rpcServer.maxRequestSize, this))
76+
.addBefore(NettyRpcServerResponseEncoder.NAME, "decoder",
77+
new NettyRpcServerRequestDecoder(rpcServer.metrics, this));
7778
}
7879

7980
void process(ByteBuf buf) throws IOException, InterruptedException {
@@ -115,6 +116,6 @@ public NettyServerCall createCall(int id, final BlockingService service,
115116

116117
@Override
117118
protected void doRespond(RpcResponse resp) {
118-
channel.writeAndFlush(resp);
119+
NettyFutureUtils.safeWriteAndFlush(channel, resp);
119120
}
120121
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
21+
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
22+
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
23+
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
24+
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosPrincipal;
25+
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
26+
import static org.junit.Assert.assertFalse;
27+
import static org.junit.Assert.assertTrue;
28+
29+
import java.io.File;
30+
import java.net.InetSocketAddress;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.hbase.HBaseClassTestRule;
34+
import org.apache.hadoop.hbase.HBaseTestingUtility;
35+
import org.apache.hadoop.hbase.HConstants;
36+
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
37+
import org.apache.hadoop.hbase.security.SecurityInfo;
38+
import org.apache.hadoop.hbase.security.User;
39+
import org.apache.hadoop.hbase.testclassification.MediumTests;
40+
import org.apache.hadoop.hbase.testclassification.RPCTests;
41+
import org.apache.hadoop.minikdc.MiniKdc;
42+
import org.apache.hadoop.security.UserGroupInformation;
43+
import org.junit.AfterClass;
44+
import org.junit.Before;
45+
import org.junit.BeforeClass;
46+
import org.junit.ClassRule;
47+
import org.junit.Test;
48+
import org.junit.experimental.categories.Category;
49+
import org.mockito.Mockito;
50+
51+
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
52+
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
53+
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
54+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
55+
56+
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
57+
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
58+
59+
@Category({ RPCTests.class, MediumTests.class })
60+
public class TestRpcSkipInitialSaslHandshake {
61+
62+
@ClassRule
63+
public static final HBaseClassTestRule CLASS_RULE =
64+
HBaseClassTestRule.forClass(TestRpcSkipInitialSaslHandshake.class);
65+
66+
protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67+
68+
protected static final File KEYTAB_FILE =
69+
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
70+
71+
protected static MiniKdc KDC;
72+
protected static String HOST = "localhost";
73+
protected static String PRINCIPAL;
74+
75+
protected String krbKeytab;
76+
protected String krbPrincipal;
77+
protected UserGroupInformation ugi;
78+
protected Configuration clientConf;
79+
protected Configuration serverConf;
80+
81+
protected static void initKDCAndConf() throws Exception {
82+
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
83+
PRINCIPAL = "hbase/" + HOST;
84+
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
85+
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
86+
// set a smaller timeout and retry to speed up tests
87+
TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000000000);
88+
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1);
89+
}
90+
91+
protected static void stopKDC() throws InterruptedException {
92+
if (KDC != null) {
93+
KDC.stop();
94+
}
95+
}
96+
97+
protected final void setUpPrincipalAndConf() throws Exception {
98+
krbKeytab = getKeytabFileForTesting();
99+
krbPrincipal = getPrincipalForTesting();
100+
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
101+
clientConf = new Configuration(TEST_UTIL.getConfiguration());
102+
setSecuredConfiguration(clientConf);
103+
clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true);
104+
serverConf = new Configuration(TEST_UTIL.getConfiguration());
105+
}
106+
107+
@BeforeClass
108+
public static void setUp() throws Exception {
109+
initKDCAndConf();
110+
}
111+
112+
@AfterClass
113+
public static void tearDown() throws Exception {
114+
stopKDC();
115+
TEST_UTIL.cleanupTestDir();
116+
}
117+
118+
@Before
119+
public void setUpTest() throws Exception {
120+
setUpPrincipalAndConf();
121+
}
122+
123+
/**
124+
* This test is for HBASE-27923,which NettyRpcServer may hange if it should skip initial sasl
125+
* handshake.
126+
*/
127+
@Test
128+
public void test() throws Exception {
129+
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
130+
Mockito.when(securityInfoMock.getServerPrincipal())
131+
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
132+
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
133+
134+
final AtomicBoolean useSaslRef = new AtomicBoolean(false);
135+
NettyRpcServer rpcServer = new NettyRpcServer(null, getClass().getSimpleName(),
136+
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
137+
new InetSocketAddress(HOST, 0), serverConf, new FifoRpcScheduler(serverConf, 1), true) {
138+
139+
@Override
140+
protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
141+
return new NettyRpcServerPreambleHandler(this) {
142+
private NettyServerRpcConnection conn;
143+
144+
@Override
145+
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
146+
super.channelRead0(ctx, msg);
147+
useSaslRef.set(conn.useSasl);
148+
149+
}
150+
151+
@Override
152+
protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
153+
conn = super.createNettyServerRpcConnection(channel);
154+
return conn;
155+
}
156+
};
157+
}
158+
};
159+
160+
rpcServer.start();
161+
try (NettyRpcClient rpcClient =
162+
new NettyRpcClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString(), null, null)) {
163+
BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress(),
164+
User.create(UserGroupInformation.getCurrentUser()));
165+
166+
String response =
167+
stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage("test").build())
168+
.getMessage();
169+
assertTrue("test".equals(response));
170+
assertFalse(useSaslRef.get());
171+
172+
} finally {
173+
rpcServer.stop();
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)