Skip to content

Commit 529e704

Browse files
authored
Unify http channels and exception handling (#31379)
This is a general cleanup of channels and exception handling in http. This commit introduces a CloseableChannel that is a superclass of TcpChannel and HttpChannel. This allows us to unify the closing logic between tcp and http transports. Additionally, the normal http channels are extracted to the abstract server transport. Finally, this commit (mostly) unifies the exception handling between nio and netty4 http server transports.
1 parent 8fd1f5f commit 529e704

File tree

28 files changed

+353
-392
lines changed

28 files changed

+353
-392
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelPromise;
2424
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.common.concurrent.CompletableContext;
2526
import org.elasticsearch.http.HttpChannel;
2627
import org.elasticsearch.http.HttpResponse;
2728
import org.elasticsearch.transport.netty4.Netty4Utils;
@@ -31,9 +32,23 @@
3132
public class Netty4HttpChannel implements HttpChannel {
3233

3334
private final Channel channel;
35+
private final CompletableContext<Void> closeContext = new CompletableContext<>();
3436

3537
Netty4HttpChannel(Channel channel) {
3638
this.channel = channel;
39+
this.channel.closeFuture().addListener(f -> {
40+
if (f.isSuccess()) {
41+
closeContext.complete(null);
42+
} else {
43+
Throwable cause = f.cause();
44+
if (cause instanceof Error) {
45+
Netty4Utils.maybeDie(cause);
46+
closeContext.completeExceptionally(new Exception(cause));
47+
} else {
48+
closeContext.completeExceptionally((Exception) cause);
49+
}
50+
}
51+
});
3752
}
3853

3954
@Override
@@ -65,6 +80,16 @@ public InetSocketAddress getRemoteAddress() {
6580
return (InetSocketAddress) channel.remoteAddress();
6681
}
6782

83+
@Override
84+
public void addCloseListener(ActionListener<Void> listener) {
85+
closeContext.addListener(ActionListener.toBiConsumer(listener));
86+
}
87+
88+
@Override
89+
public boolean isOpen() {
90+
return channel.isOpen();
91+
}
92+
6893
@Override
6994
public void close() {
7095
channel.close();
@@ -73,4 +98,12 @@ public void close() {
7398
public Channel getNettyChannel() {
7499
return channel;
75100
}
101+
102+
@Override
103+
public String toString() {
104+
return "Netty4HttpChannel{" +
105+
"localAddress=" + getLocalAddress() +
106+
", remoteAddress=" + getRemoteAddress() +
107+
'}';
108+
}
76109
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.elasticsearch.http.HttpPipelinedRequest;
3030
import org.elasticsearch.transport.netty4.Netty4Utils;
3131

32+
import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY;
33+
3234
@ChannelHandler.Sharable
3335
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
3436

@@ -40,7 +42,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
4042

4143
@Override
4244
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
43-
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
45+
Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
4446
FullHttpRequest request = msg.getRequest();
4547

4648
try {
@@ -75,7 +77,12 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<Full
7577
@Override
7678
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
7779
Netty4Utils.maybeDie(cause);
78-
serverTransport.exceptionCaught(ctx, cause);
80+
Netty4HttpChannel httpChannel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
81+
if (cause instanceof Error) {
82+
serverTransport.onException(httpChannel, new Exception(cause));
83+
} else {
84+
serverTransport.onException(httpChannel, (Exception) cause);
85+
}
7986
}
8087

8188
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 24 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,13 @@
4040
import io.netty.handler.timeout.ReadTimeoutException;
4141
import io.netty.handler.timeout.ReadTimeoutHandler;
4242
import io.netty.util.AttributeKey;
43-
import org.apache.logging.log4j.message.ParameterizedMessage;
44-
import org.apache.logging.log4j.util.Supplier;
4543
import org.elasticsearch.common.Strings;
44+
import org.elasticsearch.common.network.CloseableChannel;
4645
import org.elasticsearch.common.network.NetworkAddress;
4746
import org.elasticsearch.common.network.NetworkService;
4847
import org.elasticsearch.common.settings.Setting;
4948
import org.elasticsearch.common.settings.Setting.Property;
5049
import org.elasticsearch.common.settings.Settings;
51-
import org.elasticsearch.common.transport.NetworkExceptionHelper;
5250
import org.elasticsearch.common.transport.TransportAddress;
5351
import org.elasticsearch.common.unit.ByteSizeUnit;
5452
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -57,14 +55,14 @@
5755
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
5856
import org.elasticsearch.http.AbstractHttpServerTransport;
5957
import org.elasticsearch.http.BindHttpException;
58+
import org.elasticsearch.http.HttpChannel;
6059
import org.elasticsearch.http.HttpHandlingSettings;
6160
import org.elasticsearch.http.HttpStats;
6261
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
6362
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
6463
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
6564
import org.elasticsearch.rest.RestUtils;
6665
import org.elasticsearch.threadpool.ThreadPool;
67-
import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler;
6866
import org.elasticsearch.transport.netty4.Netty4Utils;
6967

7068
import java.io.IOException;
@@ -171,10 +169,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
171169

172170
protected final List<Channel> serverChannels = new ArrayList<>();
173171

174-
// package private for testing
175-
Netty4OpenChannelsHandler serverOpenChannels;
176-
177-
178172
private final Netty4CorsConfig corsConfig;
179173

180174
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
@@ -216,8 +210,6 @@ public Settings settings() {
216210
protected void doStart() {
217211
boolean success = false;
218212
try {
219-
this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);
220-
221213
serverBootstrap = new ServerBootstrap();
222214

223215
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
@@ -281,10 +273,9 @@ static Netty4CorsConfig buildCorsConfig(Settings settings) {
281273
builder.allowCredentials();
282274
}
283275
String[] strMethods = Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_METHODS.get(settings), ",");
284-
HttpMethod[] methods = Arrays.asList(strMethods)
285-
.stream()
276+
HttpMethod[] methods = Arrays.stream(strMethods)
286277
.map(HttpMethod::valueOf)
287-
.toArray(size -> new HttpMethod[size]);
278+
.toArray(HttpMethod[]::new);
288279
return builder.allowedRequestMethods(methods)
289280
.maxAge(SETTING_CORS_MAX_AGE.get(settings))
290281
.allowedRequestHeaders(Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_HEADERS.get(settings), ","))
@@ -327,15 +318,21 @@ protected void doStop() {
327318
Netty4Utils.closeChannels(serverChannels);
328319
} catch (IOException e) {
329320
logger.trace("exception while closing channels", e);
321+
} finally {
322+
serverChannels.clear();
330323
}
331-
serverChannels.clear();
332324
}
333325
}
334326

335-
if (serverOpenChannels != null) {
336-
serverOpenChannels.close();
337-
serverOpenChannels = null;
327+
// TODO: Move all of channel closing to abstract class once server channels are handled
328+
try {
329+
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
330+
} catch (Exception e) {
331+
logger.warn("unexpected exception while closing http channels", e);
338332
}
333+
httpChannels.clear();
334+
335+
339336

340337
if (serverBootstrap != null) {
341338
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
@@ -349,38 +346,18 @@ protected void doClose() {
349346

350347
@Override
351348
public HttpStats stats() {
352-
Netty4OpenChannelsHandler channels = serverOpenChannels;
353-
return new HttpStats(channels == null ? 0 : channels.numberOfOpenChannels(), channels == null ? 0 : channels.totalChannels());
349+
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
354350
}
355351

356-
public Netty4CorsConfig getCorsConfig() {
357-
return corsConfig;
358-
}
359-
360-
protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
352+
@Override
353+
protected void onException(HttpChannel channel, Exception cause) {
361354
if (cause instanceof ReadTimeoutException) {
362355
if (logger.isTraceEnabled()) {
363-
logger.trace("Read timeout [{}]", ctx.channel().remoteAddress());
356+
logger.trace("Http read timeout {}", channel);
364357
}
365-
ctx.channel().close();
358+
CloseableChannel.closeChannel(channel);;
366359
} else {
367-
if (!lifecycle.started()) {
368-
// ignore
369-
return;
370-
}
371-
if (!NetworkExceptionHelper.isCloseConnectionException(cause)) {
372-
logger.warn(
373-
(Supplier<?>) () -> new ParameterizedMessage(
374-
"caught exception while handling client http traffic, closing connection {}", ctx.channel()),
375-
cause);
376-
ctx.channel().close();
377-
} else {
378-
logger.debug(
379-
(Supplier<?>) () -> new ParameterizedMessage(
380-
"caught exception while handling client http traffic, closing connection {}", ctx.channel()),
381-
cause);
382-
ctx.channel().close();
383-
}
360+
super.onException(channel, cause);
384361
}
385362
}
386363

@@ -404,9 +381,8 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht
404381

405382
@Override
406383
protected void initChannel(Channel ch) throws Exception {
407-
Netty4HttpChannel nettyTcpChannel = new Netty4HttpChannel(ch);
408-
ch.attr(HTTP_CHANNEL_KEY).set(nettyTcpChannel);
409-
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
384+
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
385+
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
410386
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
411387
final HttpRequestDecoder decoder = new HttpRequestDecoder(
412388
handlingSettings.getMaxInitialLineLength(),
@@ -423,10 +399,11 @@ protected void initChannel(Channel ch) throws Exception {
423399
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
424400
}
425401
if (handlingSettings.isCorsEnabled()) {
426-
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig()));
402+
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));
427403
}
428404
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
429405
ch.pipeline().addLast("handler", requestHandler);
406+
transport.serverAcceptedChannel(nettyHttpChannel);
430407
}
431408

432409
@Override

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4OpenChannelsHandler.java

Lines changed: 0 additions & 96 deletions
This file was deleted.

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.Version;
2323
import org.elasticsearch.cluster.node.DiscoveryNode;
2424
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
25+
import org.elasticsearch.common.network.CloseableChannel;
2526
import org.elasticsearch.common.network.NetworkService;
2627
import org.elasticsearch.common.settings.ClusterSettings;
2728
import org.elasticsearch.common.settings.Settings;
@@ -91,7 +92,7 @@ protected void closeConnectionChannel(Transport transport, Transport.Connection
9192
final Netty4Transport t = (Netty4Transport) transport;
9293
@SuppressWarnings("unchecked")
9394
final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
94-
TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
95+
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
9596
}
9697

9798
public void testConnectException() throws UnknownHostException {

plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,17 @@ public class NioHttpChannel extends NioSocketChannel implements HttpChannel {
3636
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
3737
getContext().sendMessage(response, ActionListener.toBiConsumer(listener));
3838
}
39+
40+
@Override
41+
public void addCloseListener(ActionListener<Void> listener) {
42+
addCloseListener(ActionListener.toBiConsumer(listener));
43+
}
44+
45+
@Override
46+
public String toString() {
47+
return "NioHttpChannel{" +
48+
"localAddress=" + getLocalAddress() +
49+
", remoteAddress=" + getRemoteAddress() +
50+
'}';
51+
}
3952
}

0 commit comments

Comments
 (0)