Skip to content

Commit 7cc3eb8

Browse files
committed
HBASE-27185 Rewrite NettyRpcServer to decode rpc request with netty handler
1 parent 02f2636 commit 7cc3eb8

19 files changed

+502
-467
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,6 @@ private void saslNegotiate(final Channel ch) {
223223
public void operationComplete(Future<Boolean> future) throws Exception {
224224
if (future.isSuccess()) {
225225
ChannelPipeline p = ch.pipeline();
226-
p.remove(SaslChallengeDecoder.class);
227-
p.remove(NettyHBaseSaslRpcClientHandler.class);
228-
229226
// check if negotiate with server for connection header is necessary
230227
if (saslHandler.isNeedProcessConnectionHeader()) {
231228
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();

hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESUnwrapHandler.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESWrapHandler.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseRpcConnectionHeaderHandler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
2626
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
2727
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
28-
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2928
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
3029

3130
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@@ -92,10 +91,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
9291
* Remove handlers for sasl encryption and add handlers for Crypto AES encryption
9392
*/
9493
private void setupCryptoAESHandler(ChannelPipeline p, CryptoAES cryptoAES) {
95-
p.remove(SaslWrapHandler.class);
96-
p.remove(SaslUnwrapHandler.class);
97-
String lengthDecoder = p.context(LengthFieldBasedFrameDecoder.class).name();
98-
p.addAfter(lengthDecoder, null, new CryptoAESUnwrapHandler(cryptoAES));
99-
p.addAfter(lengthDecoder, null, new CryptoAESWrapHandler(cryptoAES));
94+
p.replace(SaslWrapHandler.class, null, new SaslWrapHandler(cryptoAES::wrap));
95+
p.replace(SaslUnwrapHandler.class, null, new SaslUnwrapHandler(cryptoAES::unwrap));
10096
}
10197
}

hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ public void setupSaslHandler(ChannelPipeline p) {
5252
return;
5353
}
5454
// add wrap and unwrap handlers to pipeline.
55-
p.addFirst(new SaslWrapHandler(saslClient),
55+
p.addFirst(new SaslWrapHandler(saslClient::wrap),
5656
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
57-
new SaslUnwrapHandler(saslClient));
57+
new SaslUnwrapHandler(saslClient::unwrap));
5858
}
5959

6060
public String getSaslQOP() {

hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
3535
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
36+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
3637
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
3738
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
3839

@@ -90,8 +91,11 @@ private void tryComplete(ChannelHandlerContext ctx) {
9091
if (LOG.isTraceEnabled()) {
9192
LOG.trace("SASL negotiation for {} is complete", provider.getSaslAuthMethod().getName());
9293
}
93-
94+
ChannelPipeline p = ctx.pipeline();
9495
saslRpcClient.setupSaslHandler(ctx.pipeline());
96+
p.remove(SaslChallengeDecoder.class);
97+
p.remove(this);
98+
9599
setCryptoAESOption();
96100

97101
saslPromise.setSuccess(true);
@@ -110,6 +114,9 @@ public boolean isNeedProcessConnectionHeader() {
110114

111115
@Override
112116
public void handlerAdded(ChannelHandlerContext ctx) {
117+
// dispose the saslRpcClient when the channel is closed, since saslRpcClient is final, it is
118+
// safe to reference it in lambda expr.
119+
ctx.channel().closeFuture().addListener(f -> saslRpcClient.dispose());
113120
try {
114121
byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
115122

@@ -170,14 +177,12 @@ public byte[] run() throws Exception {
170177

171178
@Override
172179
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
173-
saslRpcClient.dispose();
174180
saslPromise.tryFailure(new ConnectionClosedException("Connection closed"));
175181
ctx.fireChannelInactive();
176182
}
177183

178184
@Override
179185
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
180-
saslRpcClient.dispose();
181186
saslPromise.tryFailure(cause);
182187
}
183188
}

hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.security;
1919

20-
import javax.security.sasl.SaslClient;
20+
import javax.security.sasl.SaslException;
2121
import org.apache.yetus.audience.InterfaceAudience;
2222

2323
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -32,22 +32,20 @@
3232
@InterfaceAudience.Private
3333
public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
3434

35-
private final SaslClient saslClient;
36-
37-
public SaslUnwrapHandler(SaslClient saslClient) {
38-
this.saslClient = saslClient;
35+
public interface Unwrapper {
36+
byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException;
3937
}
4038

41-
@Override
42-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
43-
SaslUtil.safeDispose(saslClient);
44-
ctx.fireChannelInactive();
39+
private final Unwrapper unwrapper;
40+
41+
public SaslUnwrapHandler(Unwrapper unwrapper) {
42+
this.unwrapper = unwrapper;
4543
}
4644

4745
@Override
4846
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
4947
byte[] bytes = new byte[msg.readableBytes()];
5048
msg.readBytes(bytes);
51-
ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(bytes, 0, bytes.length)));
49+
ctx.fireChannelRead(Unpooled.wrappedBuffer(unwrapper.unwrap(bytes, 0, bytes.length)));
5250
}
5351
}

hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.security;
1919

20-
import javax.security.sasl.SaslClient;
20+
import javax.security.sasl.SaslException;
2121
import org.apache.yetus.audience.InterfaceAudience;
2222

2323
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -30,17 +30,21 @@
3030
@InterfaceAudience.Private
3131
public class SaslWrapHandler extends MessageToByteEncoder<ByteBuf> {
3232

33-
private final SaslClient saslClient;
33+
public interface Wrapper {
34+
byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException;
35+
}
36+
37+
private final Wrapper wrapper;
3438

35-
public SaslWrapHandler(SaslClient saslClient) {
36-
this.saslClient = saslClient;
39+
public SaslWrapHandler(Wrapper wrapper) {
40+
this.wrapper = wrapper;
3741
}
3842

3943
@Override
4044
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
4145
byte[] bytes = new byte[msg.readableBytes()];
4246
msg.readBytes(bytes);
43-
byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
47+
byte[] wrapperBytes = wrapper.wrap(bytes, 0, bytes.length);
4448
out.ensureWritable(4 + wrapperBytes.length);
4549
out.writeInt(wrapperBytes.length);
4650
out.writeBytes(wrapperBytes);
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
22+
import org.apache.hadoop.hbase.security.SaslStatus;
23+
import org.apache.hadoop.hbase.security.SaslUnwrapHandler;
24+
import org.apache.hadoop.hbase.security.SaslWrapHandler;
25+
import org.apache.hadoop.io.BytesWritable;
26+
import org.apache.hadoop.io.Writable;
27+
import org.apache.hadoop.io.WritableUtils;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
32+
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
33+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
34+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
35+
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
36+
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
37+
38+
/**
39+
* Implement SASL negotiation logic for rpc server.
40+
*/
41+
class NettyHBaseSaslRpcServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcServerHandler.class);
44+
45+
static final String DECODER_NAME = "SaslNegotiationDecoder";
46+
47+
private final NettyRpcServer rpcServer;
48+
49+
private final NettyServerRpcConnection conn;
50+
51+
NettyHBaseSaslRpcServerHandler(NettyRpcServer rpcServer, NettyServerRpcConnection conn) {
52+
this.rpcServer = rpcServer;
53+
this.conn = conn;
54+
}
55+
56+
private void doResponse(ChannelHandlerContext ctx, SaslStatus status, Writable rv,
57+
String errorClass, String error) throws IOException {
58+
// In my testing, have noticed that sasl messages are usually
59+
// in the ballpark of 100-200. That's why the initial capacity is 256.
60+
ByteBuf resp = ctx.alloc().buffer(256);
61+
try (ByteBufOutputStream out = new ByteBufOutputStream(resp)) {
62+
out.writeInt(status.state); // write status
63+
if (status == SaslStatus.SUCCESS) {
64+
rv.write(out);
65+
} else {
66+
WritableUtils.writeString(out, errorClass);
67+
WritableUtils.writeString(out, error);
68+
}
69+
}
70+
ctx.writeAndFlush(resp);
71+
}
72+
73+
@Override
74+
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
75+
LOG.debug("Read input token of size={} for processing by saslServer.evaluateResponse()",
76+
msg.readableBytes());
77+
HBaseSaslRpcServer saslServer = conn.getOrCreateSaslServer();
78+
byte[] saslToken = new byte[msg.readableBytes()];
79+
msg.readBytes(saslToken, 0, saslToken.length);
80+
byte[] replyToken = saslServer.evaluateResponse(saslToken);
81+
if (replyToken != null) {
82+
LOG.debug("Will send token of size {} from saslServer.", replyToken.length);
83+
doResponse(ctx, SaslStatus.SUCCESS, new BytesWritable(replyToken), null, null);
84+
}
85+
if (saslServer.isComplete()) {
86+
conn.finishSaslNegotiation();
87+
String qop = saslServer.getNegotiatedQop();
88+
boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
89+
ChannelPipeline p = ctx.pipeline();
90+
if (useWrap) {
91+
p.addFirst(new SaslWrapHandler(saslServer::wrap));
92+
p.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
93+
new SaslUnwrapHandler(saslServer::unwrap));
94+
}
95+
conn.setupDecoder();
96+
p.remove(this);
97+
p.remove(DECODER_NAME);
98+
}
99+
}
100+
101+
@Override
102+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
103+
LOG.error("Error when doing SASL handshade, provider={}", conn.provider, cause);
104+
Throwable sendToClient = HBaseSaslRpcServer.unwrap(cause);
105+
doResponse(ctx, SaslStatus.ERROR, null, sendToClient.getClass().getName(),
106+
sendToClient.getLocalizedMessage());
107+
rpcServer.metrics.authenticationFailure();
108+
String clientIP = this.toString();
109+
// attempting user could be null
110+
RpcServer.AUDITLOG.warn("{}{}: {}", RpcServer.AUTH_FAILED_FOR, clientIP,
111+
conn.saslServer != null ? conn.saslServer.getAttemptingUser() : "Unknown");
112+
ctx.close();
113+
}
114+
}

0 commit comments

Comments
 (0)