@@ -1121,7 +1121,7 @@ else if (methodName.equals("close")) {
11211121 return null ;
11221122 }
11231123 else {
1124- physicalClose (proxy );
1124+ physicalClose ();
11251125 return null ;
11261126 }
11271127 }
@@ -1193,24 +1193,16 @@ else if (txEnds.contains(methodName)) {
11931193 }
11941194 }
11951195
1196- private void releasePermitIfNecessary (Object proxy ) {
1196+ private void releasePermitIfNecessary () {
11971197 if (CachingConnectionFactory .this .channelCheckoutTimeout > 0 ) {
1198- /*
1199- * Only release a permit if this is a normal close; if the channel is
1200- * in the list, it means we're closing a cached channel (for which a permit
1201- * has already been released).
1202- */
1203- synchronized (this .channelList ) {
1204- if (this .channelList .contains (proxy )) {
1205- return ;
1206- }
1207- }
12081198 Semaphore permits = CachingConnectionFactory .this .checkoutPermits .get (this .theConnection );
12091199 if (permits != null ) {
1210- permits .release ();
1211- if (logger .isDebugEnabled ()) {
1212- logger .debug ("Released permit for '" + this .theConnection + "', remaining: "
1213- + permits .availablePermits ());
1200+ if (permits .availablePermits () < CachingConnectionFactory .this .channelCacheSize ) {
1201+ permits .release ();
1202+ if (logger .isDebugEnabled ()) {
1203+ logger .debug ("Released permit for '" + this .theConnection + "', remaining: "
1204+ + permits .availablePermits ());
1205+ }
12141206 }
12151207 }
12161208 else {
@@ -1235,11 +1227,8 @@ private void logicalClose(ChannelProxy proxy) throws IOException, TimeoutExcepti
12351227 if (this .target instanceof PublisherCallbackChannel ) {
12361228 this .target .close (); // emit nacks if necessary
12371229 }
1238- if (this .channelList .contains (proxy )) {
1239- this .channelList .remove (proxy );
1240- }
1241- else {
1242- releasePermitIfNecessary (proxy );
1230+ if (!this .channelList .remove (proxy )) {
1231+ releasePermitIfNecessary ();
12431232 }
12441233 this .target = null ;
12451234 return ;
@@ -1275,7 +1264,7 @@ private void returnToCache(ChannelProxy proxy) {
12751264 // The channel didn't handle confirms, so close it altogether to avoid
12761265 // memory leaks for pending confirms
12771266 try {
1278- physicalClose (this . theConnection . channelsAwaitingAcks . remove ( this . target ) );
1267+ physicalClose ();
12791268 }
12801269 catch (@ SuppressWarnings (UNUSED ) Exception e ) {
12811270 }
@@ -1298,7 +1287,7 @@ private void doReturnToCache(Channel proxy) {
12981287 else {
12991288 if (proxy .isOpen ()) {
13001289 try {
1301- physicalClose (proxy );
1290+ physicalClose ();
13021291 }
13031292 catch (@ SuppressWarnings (UNUSED ) Exception e ) {
13041293 }
@@ -1315,7 +1304,7 @@ private void cacheOrClose(Channel proxy) {
13151304 logger .trace ("Cache limit reached: " + this .target );
13161305 }
13171306 try {
1318- physicalClose (proxy );
1307+ physicalClose ();
13191308 }
13201309 catch (@ SuppressWarnings (UNUSED ) Exception e ) {
13211310 }
@@ -1324,8 +1313,8 @@ else if (!alreadyCached) {
13241313 if (logger .isTraceEnabled ()) {
13251314 logger .trace ("Returning cached Channel: " + this .target );
13261315 }
1327- releasePermitIfNecessary (proxy );
13281316 this .channelList .addLast ((ChannelProxy ) proxy );
1317+ releasePermitIfNecessary ();
13291318 setHighWaterMark ();
13301319 }
13311320 }
@@ -1342,7 +1331,7 @@ private void setHighWaterMark() {
13421331 }
13431332 }
13441333
1345- private void physicalClose (Object proxy ) throws IOException , TimeoutException {
1334+ private void physicalClose () throws IOException , TimeoutException {
13461335 if (logger .isDebugEnabled ()) {
13471336 logger .debug ("Closing cached Channel: " + this .target );
13481337 }
@@ -1356,7 +1345,7 @@ private void physicalClose(Object proxy) throws IOException, TimeoutException {
13561345 (ConfirmType .CORRELATED .equals (CachingConnectionFactory .this .confirmType ) ||
13571346 CachingConnectionFactory .this .publisherReturns )) {
13581347 async = true ;
1359- asyncClose (proxy );
1348+ asyncClose ();
13601349 }
13611350 else {
13621351 this .target .close ();
@@ -1373,12 +1362,12 @@ private void physicalClose(Object proxy) throws IOException, TimeoutException {
13731362 finally {
13741363 this .target = null ;
13751364 if (!async ) {
1376- releasePermitIfNecessary (proxy );
1365+ releasePermitIfNecessary ();
13771366 }
13781367 }
13791368 }
13801369
1381- private void asyncClose (Object proxy ) {
1370+ private void asyncClose () {
13821371 ExecutorService executorService = getChannelsExecutor ();
13831372 final Channel channel = CachedChannelInvocationHandler .this .target ;
13841373 CachingConnectionFactory .this .inFlightAsyncCloses .add (channel );
@@ -1414,7 +1403,7 @@ private void asyncClose(Object proxy) {
14141403 }
14151404 finally {
14161405 CachingConnectionFactory .this .inFlightAsyncCloses .release (channel );
1417- releasePermitIfNecessary (proxy );
1406+ releasePermitIfNecessary ();
14181407 }
14191408 }
14201409 });
0 commit comments