Skip to content

Commit f757461

Browse files
committed
remove usage of S2AGrpcChannelPool + avoid creating Channel ref per conn.
1 parent a15421b commit f757461

File tree

7 files changed

+27
-338
lines changed

7 files changed

+27
-338
lines changed

s2a/src/main/java/io/grpc/s2a/internal/channel/S2AChannelPool.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

s2a/src/main/java/io/grpc/s2a/internal/channel/S2AGrpcChannelPool.java

Lines changed: 0 additions & 109 deletions
This file was deleted.

s2a/src/main/java/io/grpc/s2a/internal/channel/S2AHandshakerServiceChannel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public ChannelResource(String targetAddress, ChannelCredentials channelCredentia
9191
public Channel create() {
9292
return NettyChannelBuilder.forTarget(targetAddress, channelCredentials)
9393
.directExecutor()
94+
.idleTimeout(5, SECONDS)
9495
.build();
9596
}
9697

s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AProtocolNegotiatorFactory.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
3838
import io.grpc.netty.InternalProtocolNegotiators;
3939
import io.grpc.netty.InternalProtocolNegotiators.ProtocolNegotiationHandler;
40-
import io.grpc.s2a.internal.channel.S2AChannelPool;
41-
import io.grpc.s2a.internal.channel.S2AGrpcChannelPool;
4240
import io.grpc.s2a.internal.handshaker.S2AIdentity;
4341
import io.netty.channel.ChannelHandler;
4442
import io.netty.channel.ChannelHandlerAdapter;
@@ -70,17 +68,16 @@ public final class S2AProtocolNegotiatorFactory {
7068
public static InternalProtocolNegotiator.ClientFactory createClientFactory(
7169
@Nullable S2AIdentity localIdentity, ObjectPool<Channel> s2aChannelPool) {
7270
checkNotNull(s2aChannelPool, "S2A channel pool should not be null.");
73-
S2AChannelPool channelPool = S2AGrpcChannelPool.create(s2aChannelPool);
74-
return new S2AClientProtocolNegotiatorFactory(localIdentity, channelPool);
71+
return new S2AClientProtocolNegotiatorFactory(localIdentity, s2aChannelPool);
7572
}
7673

7774
static final class S2AClientProtocolNegotiatorFactory
7875
implements InternalProtocolNegotiator.ClientFactory {
7976
private final @Nullable S2AIdentity localIdentity;
80-
private final S2AChannelPool channelPool;
77+
private final ObjectPool<Channel> channelPool;
8178

8279
S2AClientProtocolNegotiatorFactory(
83-
@Nullable S2AIdentity localIdentity, S2AChannelPool channelPool) {
80+
@Nullable S2AIdentity localIdentity, ObjectPool<Channel> channelPool) {
8481
this.localIdentity = localIdentity;
8582
this.channelPool = channelPool;
8683
}
@@ -100,13 +97,14 @@ public int getDefaultPort() {
10097
@VisibleForTesting
10198
static final class S2AProtocolNegotiator implements ProtocolNegotiator {
10299

103-
private final S2AChannelPool channelPool;
100+
private final ObjectPool<Channel> channelPool;
101+
private final Channel channel;
104102
private final Optional<S2AIdentity> localIdentity;
105103
private final ListeningExecutorService service =
106104
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
107105

108106
static S2AProtocolNegotiator createForClient(
109-
S2AChannelPool channelPool, @Nullable S2AIdentity localIdentity) {
107+
ObjectPool<Channel> channelPool, @Nullable S2AIdentity localIdentity) {
110108
checkNotNull(channelPool, "Channel pool should not be null.");
111109
if (localIdentity == null) {
112110
return new S2AProtocolNegotiator(channelPool, Optional.empty());
@@ -123,9 +121,11 @@ static S2AProtocolNegotiator createForClient(
123121
return HostAndPort.fromString(authority).getHost();
124122
}
125123

126-
private S2AProtocolNegotiator(S2AChannelPool channelPool, Optional<S2AIdentity> localIdentity) {
124+
private S2AProtocolNegotiator(ObjectPool<Channel> channelPool,
125+
Optional<S2AIdentity> localIdentity) {
127126
this.channelPool = channelPool;
128127
this.localIdentity = localIdentity;
128+
this.channel = channelPool.getObject();
129129
}
130130

131131
@Override
@@ -139,13 +139,13 @@ public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
139139
String hostname = getHostNameFromAuthority(grpcHandler.getAuthority());
140140
checkArgument(!isNullOrEmpty(hostname), "hostname should not be null or empty.");
141141
return new S2AProtocolNegotiationHandler(
142-
grpcHandler, channelPool, localIdentity, hostname, service);
142+
grpcHandler, channel, localIdentity, hostname, service);
143143
}
144144

145145
@Override
146146
public void close() {
147147
service.shutdown();
148-
channelPool.close();
148+
channelPool.returnObject(channel);
149149
}
150150
}
151151

@@ -180,15 +180,15 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
180180
}
181181

182182
private static final class S2AProtocolNegotiationHandler extends ProtocolNegotiationHandler {
183-
private final S2AChannelPool channelPool;
183+
private final Channel channel;
184184
private final Optional<S2AIdentity> localIdentity;
185185
private final String hostname;
186186
private final GrpcHttp2ConnectionHandler grpcHandler;
187187
private final ListeningExecutorService service;
188188

189189
private S2AProtocolNegotiationHandler(
190190
GrpcHttp2ConnectionHandler grpcHandler,
191-
S2AChannelPool channelPool,
191+
Channel channel,
192192
Optional<S2AIdentity> localIdentity,
193193
String hostname,
194194
ListeningExecutorService service) {
@@ -204,7 +204,7 @@ public void handlerAdded(ChannelHandlerContext ctx) {
204204
},
205205
grpcHandler.getNegotiationLogger());
206206
this.grpcHandler = grpcHandler;
207-
this.channelPool = channelPool;
207+
this.channel = channel;
208208
this.localIdentity = localIdentity;
209209
this.hostname = hostname;
210210
checkNotNull(service, "service should not be null.");
@@ -217,8 +217,7 @@ protected void handlerAdded0(ChannelHandlerContext ctx) {
217217
BufferReadsHandler bufferReads = new BufferReadsHandler();
218218
ctx.pipeline().addBefore(ctx.name(), /* name= */ null, bufferReads);
219219

220-
Channel ch = channelPool.getChannel();
221-
S2AServiceGrpc.S2AServiceStub stub = S2AServiceGrpc.newStub(ch);
220+
S2AServiceGrpc.S2AServiceStub stub = S2AServiceGrpc.newStub(channel);
222221
S2AStub s2aStub = S2AStub.newInstance(stub);
223222

224223
ListenableFuture<SslContext> sslContextFuture =

0 commit comments

Comments
 (0)