Skip to content

Commit dbf3da1

Browse files
authored
HBASE-27782 During SSL handshake error, netty complains that exceptionCaught() was not handled (#5305)
Signed-off-by: Bryan Beaudreault <[email protected]>
1 parent da171c3 commit dbf3da1

File tree

2 files changed

+200
-2
lines changed

2 files changed

+200
-2
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import java.io.IOException;
2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import javax.net.ssl.SSLException;
2324
import org.apache.yetus.audience.InterfaceAudience;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2427

2528
import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
2629
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
@@ -33,6 +36,8 @@
3336
@InterfaceAudience.Private
3437
class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
3538

39+
private static final Logger LOG = LoggerFactory.getLogger(BufferCallBeforeInitHandler.class);
40+
3641
static final String NAME = "BufferCall";
3742

3843
private enum BufferCallAction {
@@ -93,20 +98,50 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
9398
for (Call call : id2Call.values()) {
9499
ctx.write(call);
95100
}
101+
ctx.flush();
102+
ctx.pipeline().remove(this);
96103
break;
97104
case FAIL:
98105
for (Call call : id2Call.values()) {
99106
call.setException(bcEvt.error);
100107
}
108+
// here we do not remove us from the pipeline, for receiving possible exceptions and log
109+
// it, especially the ssl exceptions, to prevent it reaching the tail of the pipeline and
110+
// generate a confusing netty WARN
101111
break;
102112
}
103-
ctx.flush();
104-
ctx.pipeline().remove(this);
105113
} else if (evt instanceof CallEvent) {
106114
// just remove the call for now until we add other call event other than timeout and cancel.
107115
id2Call.remove(((CallEvent) evt).call.id);
108116
} else {
109117
ctx.fireUserEventTriggered(evt);
110118
}
111119
}
120+
121+
private boolean isSslError(Throwable cause) {
122+
Throwable error = cause;
123+
do {
124+
if (error instanceof SSLException) {
125+
return true;
126+
}
127+
error = error.getCause();
128+
} while (error != null);
129+
return false;
130+
}
131+
132+
@Override
133+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
134+
if (isSslError(cause)) {
135+
// this should have been logged in other places, see HBASE-27782 for more details.
136+
// here we just log it with debug and tell users that this is not a critical problem,
137+
// otherwise if we just pass it through the pipeline, it will lead to a confusing
138+
// "An exceptionCaught() event was fired, and it reached at the tail of the pipeline"
139+
LOG.debug(
140+
"got ssl exception, which should have already been proceeded, log it here to"
141+
+ " prevent it being passed to netty's TailContext and trigger a confusing WARN message",
142+
cause);
143+
} else {
144+
ctx.fireExceptionCaught(cause);
145+
}
146+
}
112147
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.instanceOf;
22+
import static org.hamcrest.Matchers.startsWith;
23+
import static org.junit.Assert.assertEquals;
24+
import static org.mockito.ArgumentMatchers.any;
25+
import static org.mockito.Mockito.doAnswer;
26+
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.verify;
28+
import static org.mockito.Mockito.when;
29+
30+
import java.io.IOException;
31+
import java.net.ServerSocket;
32+
import java.net.Socket;
33+
import java.util.Random;
34+
import java.util.concurrent.atomic.AtomicReference;
35+
import org.apache.hadoop.conf.Configuration;
36+
import org.apache.hadoop.hbase.HBaseClassTestRule;
37+
import org.apache.hadoop.hbase.HBaseConfiguration;
38+
import org.apache.hadoop.hbase.Waiter;
39+
import org.apache.hadoop.hbase.client.MetricsConnection.CallStats;
40+
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
41+
import org.apache.hadoop.hbase.net.Address;
42+
import org.apache.hadoop.hbase.security.User;
43+
import org.apache.hadoop.hbase.testclassification.ClientTests;
44+
import org.apache.hadoop.hbase.testclassification.SmallTests;
45+
import org.junit.After;
46+
import org.junit.Before;
47+
import org.junit.ClassRule;
48+
import org.junit.Test;
49+
import org.junit.experimental.categories.Category;
50+
import org.mockito.invocation.InvocationOnMock;
51+
import org.mockito.stubbing.Answer;
52+
import org.slf4j.Logger;
53+
import org.slf4j.LoggerFactory;
54+
55+
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
56+
import org.apache.hbase.thirdparty.io.netty.handler.ssl.NotSslRecordException;
57+
58+
/**
59+
* A simple UT to make sure that we do not leak the SslExceptions to netty's TailContext, where it
60+
* will generate a confusing WARN message.
61+
* <p>
62+
* See HBASE-27782 for more details.
63+
*/
64+
@Category({ ClientTests.class, SmallTests.class })
65+
public class TestTLSHandshadeFailure {
66+
67+
@ClassRule
68+
public static final HBaseClassTestRule CLASS_RULE =
69+
HBaseClassTestRule.forClass(TestTLSHandshadeFailure.class);
70+
71+
private static final Logger LOG = LoggerFactory.getLogger(TestTLSHandshadeFailure.class);
72+
73+
private final Configuration conf = HBaseConfiguration.create();
74+
75+
// use a pre set seed to make the random bytes stable
76+
private final Random rand = new Random(1);
77+
78+
private ServerSocket server;
79+
80+
private Thread serverThread;
81+
82+
private NettyRpcClient client;
83+
84+
private org.apache.logging.log4j.core.Appender mockAppender;
85+
86+
private void serve() {
87+
Socket socket = null;
88+
try {
89+
socket = server.accept();
90+
byte[] bytes = new byte[128];
91+
rand.nextBytes(bytes);
92+
socket.getOutputStream().write(bytes);
93+
socket.getOutputStream().flush();
94+
} catch (Exception e) {
95+
LOG.warn("failed to process request", e);
96+
} finally {
97+
if (socket != null) {
98+
try {
99+
socket.close();
100+
} catch (IOException e1) {
101+
LOG.warn("failed to close socket");
102+
}
103+
}
104+
}
105+
}
106+
107+
@Before
108+
public void setUp() throws IOException {
109+
server = new ServerSocket(0);
110+
serverThread = new Thread(this::serve);
111+
serverThread.setDaemon(true);
112+
serverThread.setName("Error-Server-Thread");
113+
serverThread.start();
114+
conf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, true);
115+
client = new NettyRpcClient(conf);
116+
117+
mockAppender = mock(org.apache.logging.log4j.core.Appender.class);
118+
when(mockAppender.getName()).thenReturn("mockAppender");
119+
when(mockAppender.isStarted()).thenReturn(true);
120+
((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
121+
.getLogger(BufferCallBeforeInitHandler.class)).addAppender(mockAppender);
122+
}
123+
124+
@After
125+
public void tearDown() throws IOException {
126+
((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
127+
.getLogger(BufferCallBeforeInitHandler.class)).removeAppender(mockAppender);
128+
Closeables.close(client, true);
129+
Closeables.close(server, true);
130+
}
131+
132+
@Test
133+
public void test() throws Exception {
134+
AtomicReference<org.apache.logging.log4j.Level> level = new AtomicReference<>();
135+
AtomicReference<String> msg = new AtomicReference<String>();
136+
doAnswer(new Answer<Void>() {
137+
138+
@Override
139+
public Void answer(InvocationOnMock invocation) throws Throwable {
140+
org.apache.logging.log4j.core.LogEvent logEvent =
141+
invocation.getArgument(0, org.apache.logging.log4j.core.LogEvent.class);
142+
level.set(logEvent.getLevel());
143+
msg.set(logEvent.getMessage().getFormattedMessage());
144+
return null;
145+
}
146+
}).when(mockAppender).append(any());
147+
ConnectionId id = new ConnectionId(User.getCurrent(), "test",
148+
Address.fromParts("127.0.0.1", server.getLocalPort()));
149+
NettyRpcConnection conn = client.createConnection(id);
150+
BlockingRpcCallback<Call> done = new BlockingRpcCallback<>();
151+
Call call = new Call(1, null, null, null, null, 0, 0, done, new CallStats());
152+
HBaseRpcController hrc = new HBaseRpcControllerImpl();
153+
conn.sendRequest(call, hrc);
154+
done.get();
155+
assertThat(call.error, instanceOf(NotSslRecordException.class));
156+
Waiter.waitFor(conf, 5000, () -> msg.get() != null);
157+
verify(mockAppender).append(any());
158+
// make sure that it has been logged by BufferCallBeforeInitHandler
159+
assertEquals(org.apache.logging.log4j.Level.DEBUG, level.get());
160+
assertThat(msg.get(),
161+
startsWith("got ssl exception, which should have already been proceeded"));
162+
}
163+
}

0 commit comments

Comments
 (0)