Skip to content

Commit 0c4263a

Browse files
authored
HBASE-27185 Rewrite NettyRpcServer to decode rpc request with netty handler (#4624)
Signed-off-by: Xin Sun <[email protected]>
1 parent ac8b3a7 commit 0c4263a

19 files changed

+507
-466
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,6 @@ private void saslNegotiate(final Channel ch) {
223223
public void operationComplete(Future<Boolean> future) throws Exception {
224224
if (future.isSuccess()) {
225225
ChannelPipeline p = ch.pipeline();
226-
p.remove(SaslChallengeDecoder.class);
227-
p.remove(NettyHBaseSaslRpcClientHandler.class);
228-
229226
// check if negotiate with server for connection header is necessary
230227
if (saslHandler.isNeedProcessConnectionHeader()) {
231228
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();

hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESUnwrapHandler.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESWrapHandler.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseRpcConnectionHeaderHandler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
2626
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
2727
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
28-
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2928
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
3029

3130
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@@ -92,10 +91,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
9291
* Remove handlers for sasl encryption and add handlers for Crypto AES encryption
9392
*/
9493
private void setupCryptoAESHandler(ChannelPipeline p, CryptoAES cryptoAES) {
95-
p.remove(SaslWrapHandler.class);
96-
p.remove(SaslUnwrapHandler.class);
97-
String lengthDecoder = p.context(LengthFieldBasedFrameDecoder.class).name();
98-
p.addAfter(lengthDecoder, null, new CryptoAESUnwrapHandler(cryptoAES));
99-
p.addAfter(lengthDecoder, null, new CryptoAESWrapHandler(cryptoAES));
94+
p.replace(SaslWrapHandler.class, null, new SaslWrapHandler(cryptoAES::wrap));
95+
p.replace(SaslUnwrapHandler.class, null, new SaslUnwrapHandler(cryptoAES::unwrap));
10096
}
10197
}

hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ public void setupSaslHandler(ChannelPipeline p) {
5252
return;
5353
}
5454
// add wrap and unwrap handlers to pipeline.
55-
p.addFirst(new SaslWrapHandler(saslClient),
55+
p.addFirst(new SaslWrapHandler(saslClient::wrap),
5656
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
57-
new SaslUnwrapHandler(saslClient));
57+
new SaslUnwrapHandler(saslClient::unwrap));
5858
}
5959

6060
public String getSaslQOP() {

hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
2525
import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
2626
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
27+
import org.apache.hadoop.hbase.util.NettyFutureUtils;
2728
import org.apache.hadoop.security.UserGroupInformation;
2829
import org.apache.hadoop.security.token.Token;
2930
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -33,6 +34,7 @@
3334

3435
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
3536
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
37+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
3638
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
3739
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
3840

@@ -77,7 +79,7 @@ public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInf
7779

7880
private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
7981
LOG.trace("Sending token size={} from initSASLContext.", response.length);
80-
ctx.writeAndFlush(
82+
NettyFutureUtils.safeWriteAndFlush(ctx,
8183
ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response));
8284
}
8385

@@ -90,8 +92,11 @@ private void tryComplete(ChannelHandlerContext ctx) {
9092
if (LOG.isTraceEnabled()) {
9193
LOG.trace("SASL negotiation for {} is complete", provider.getSaslAuthMethod().getName());
9294
}
93-
95+
ChannelPipeline p = ctx.pipeline();
9496
saslRpcClient.setupSaslHandler(ctx.pipeline());
97+
p.remove(SaslChallengeDecoder.class);
98+
p.remove(this);
99+
95100
setCryptoAESOption();
96101

97102
saslPromise.setSuccess(true);
@@ -110,6 +115,9 @@ public boolean isNeedProcessConnectionHeader() {
110115

111116
@Override
112117
public void handlerAdded(ChannelHandlerContext ctx) {
118+
// dispose the saslRpcClient when the channel is closed, since saslRpcClient is final, it is
119+
// safe to reference it in lambda expr.
120+
NettyFutureUtils.addListener(ctx.channel().closeFuture(), f -> saslRpcClient.dispose());
113121
try {
114122
byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
115123

@@ -170,14 +178,12 @@ public byte[] run() throws Exception {
170178

171179
@Override
172180
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
173-
saslRpcClient.dispose();
174181
saslPromise.tryFailure(new ConnectionClosedException("Connection closed"));
175182
ctx.fireChannelInactive();
176183
}
177184

178185
@Override
179186
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
180-
saslRpcClient.dispose();
181187
saslPromise.tryFailure(cause);
182188
}
183189
}

hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.security;
1919

20-
import javax.security.sasl.SaslClient;
20+
import javax.security.sasl.SaslException;
2121
import org.apache.yetus.audience.InterfaceAudience;
2222

2323
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -32,22 +32,20 @@
3232
@InterfaceAudience.Private
3333
public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
3434

35-
private final SaslClient saslClient;
36-
37-
public SaslUnwrapHandler(SaslClient saslClient) {
38-
this.saslClient = saslClient;
35+
public interface Unwrapper {
36+
byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException;
3937
}
4038

41-
@Override
42-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
43-
SaslUtil.safeDispose(saslClient);
44-
ctx.fireChannelInactive();
39+
private final Unwrapper unwrapper;
40+
41+
public SaslUnwrapHandler(Unwrapper unwrapper) {
42+
this.unwrapper = unwrapper;
4543
}
4644

4745
@Override
4846
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
4947
byte[] bytes = new byte[msg.readableBytes()];
5048
msg.readBytes(bytes);
51-
ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(bytes, 0, bytes.length)));
49+
ctx.fireChannelRead(Unpooled.wrappedBuffer(unwrapper.unwrap(bytes, 0, bytes.length)));
5250
}
5351
}

hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.security;
1919

20-
import javax.security.sasl.SaslClient;
20+
import javax.security.sasl.SaslException;
2121
import org.apache.yetus.audience.InterfaceAudience;
2222

2323
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -30,17 +30,21 @@
3030
@InterfaceAudience.Private
3131
public class SaslWrapHandler extends MessageToByteEncoder<ByteBuf> {
3232

33-
private final SaslClient saslClient;
33+
public interface Wrapper {
34+
byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException;
35+
}
36+
37+
private final Wrapper wrapper;
3438

35-
public SaslWrapHandler(SaslClient saslClient) {
36-
this.saslClient = saslClient;
39+
public SaslWrapHandler(Wrapper wrapper) {
40+
this.wrapper = wrapper;
3741
}
3842

3943
@Override
4044
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
4145
byte[] bytes = new byte[msg.readableBytes()];
4246
msg.readBytes(bytes);
43-
byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
47+
byte[] wrapperBytes = wrapper.wrap(bytes, 0, bytes.length);
4448
out.ensureWritable(4 + wrapperBytes.length);
4549
out.writeInt(wrapperBytes.length);
4650
out.writeBytes(wrapperBytes);
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
22+
import org.apache.hadoop.hbase.security.SaslStatus;
23+
import org.apache.hadoop.hbase.security.SaslUnwrapHandler;
24+
import org.apache.hadoop.hbase.security.SaslWrapHandler;
25+
import org.apache.hadoop.hbase.util.NettyFutureUtils;
26+
import org.apache.hadoop.io.BytesWritable;
27+
import org.apache.hadoop.io.Writable;
28+
import org.apache.hadoop.io.WritableUtils;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
33+
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
34+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
35+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
36+
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
37+
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
38+
39+
/**
40+
* Implement SASL negotiation logic for rpc server.
41+
*/
42+
class NettyHBaseSaslRpcServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
43+
44+
private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcServerHandler.class);
45+
46+
static final String DECODER_NAME = "SaslNegotiationDecoder";
47+
48+
private final NettyRpcServer rpcServer;
49+
50+
private final NettyServerRpcConnection conn;
51+
52+
NettyHBaseSaslRpcServerHandler(NettyRpcServer rpcServer, NettyServerRpcConnection conn) {
53+
this.rpcServer = rpcServer;
54+
this.conn = conn;
55+
}
56+
57+
private void doResponse(ChannelHandlerContext ctx, SaslStatus status, Writable rv,
58+
String errorClass, String error) throws IOException {
59+
// In my testing, have noticed that sasl messages are usually
60+
// in the ballpark of 100-200. That's why the initial capacity is 256.
61+
ByteBuf resp = ctx.alloc().buffer(256);
62+
try (ByteBufOutputStream out = new ByteBufOutputStream(resp)) {
63+
out.writeInt(status.state); // write status
64+
if (status == SaslStatus.SUCCESS) {
65+
rv.write(out);
66+
} else {
67+
WritableUtils.writeString(out, errorClass);
68+
WritableUtils.writeString(out, error);
69+
}
70+
}
71+
NettyFutureUtils.safeWriteAndFlush(ctx, resp);
72+
}
73+
74+
@Override
75+
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
76+
LOG.debug("Read input token of size={} for processing by saslServer.evaluateResponse()",
77+
msg.readableBytes());
78+
HBaseSaslRpcServer saslServer = conn.getOrCreateSaslServer();
79+
byte[] saslToken = new byte[msg.readableBytes()];
80+
msg.readBytes(saslToken, 0, saslToken.length);
81+
byte[] replyToken = saslServer.evaluateResponse(saslToken);
82+
if (replyToken != null) {
83+
LOG.debug("Will send token of size {} from saslServer.", replyToken.length);
84+
doResponse(ctx, SaslStatus.SUCCESS, new BytesWritable(replyToken), null, null);
85+
}
86+
if (saslServer.isComplete()) {
87+
conn.finishSaslNegotiation();
88+
String qop = saslServer.getNegotiatedQop();
89+
boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
90+
ChannelPipeline p = ctx.pipeline();
91+
if (useWrap) {
92+
p.addFirst(new SaslWrapHandler(saslServer::wrap));
93+
p.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
94+
new SaslUnwrapHandler(saslServer::unwrap));
95+
}
96+
conn.setupDecoder();
97+
p.remove(this);
98+
p.remove(DECODER_NAME);
99+
}
100+
}
101+
102+
@Override
103+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
104+
LOG.error("Error when doing SASL handshade, provider={}", conn.provider, cause);
105+
Throwable sendToClient = HBaseSaslRpcServer.unwrap(cause);
106+
doResponse(ctx, SaslStatus.ERROR, null, sendToClient.getClass().getName(),
107+
sendToClient.getLocalizedMessage());
108+
rpcServer.metrics.authenticationFailure();
109+
String clientIP = this.toString();
110+
// attempting user could be null
111+
RpcServer.AUDITLOG.warn("{}{}: {}", RpcServer.AUTH_FAILED_FOR, clientIP,
112+
conn.saslServer != null ? conn.saslServer.getAttemptingUser() : "Unknown");
113+
NettyFutureUtils.safeClose(ctx);
114+
}
115+
}

0 commit comments

Comments
 (0)