diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/doppler/MultipartCodec.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/doppler/MultipartCodec.java index f985c25a2c3..aad7dfe4488 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/doppler/MultipartCodec.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/doppler/MultipartCodec.java @@ -16,6 +16,9 @@ package org.cloudfoundry.reactor.doppler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.netty.buffer.Unpooled; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.http.HttpHeaderNames; @@ -23,6 +26,7 @@ import reactor.netty.ByteBufFlux; import reactor.netty.http.client.HttpClientResponse; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.regex.Matcher; @@ -30,6 +34,8 @@ final class MultipartCodec { + private static final Logger LOGGER = LoggerFactory.getLogger(MultipartCodec.class); + private static final Pattern BOUNDARY_PATTERN = Pattern.compile("multipart/.+; boundary=(.*)"); private static final int MAX_PAYLOAD_SIZE = 1024 * 1024; @@ -49,7 +55,8 @@ static DelimiterBasedFrameDecoder createDecoder(HttpClientResponse response) { static Flux decode(ByteBufFlux body) { return body.asInputStream() - .skip(1); + .skip(1) + .doOnDiscard(InputStream.class, MultipartCodec::close); } private static String extractMultipartBoundary(HttpClientResponse response) { @@ -63,4 +70,12 @@ private static String extractMultipartBoundary(HttpClientResponse response) { } } + private static void close(InputStream in) { + try { + in.close(); + } catch (IOException e) { + LOGGER.warn("Could not close input stream. This will cause a direct memory leak.", e); + } + } + } diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java index eb925abbf3e..bdaeb7f7685 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java @@ -164,7 +164,7 @@ public Flux parseBodyToFlux(Function connection.dispose()); }); } @@ -251,7 +251,7 @@ private Publisher handleWebsocketCommunication(WebsocketInbound inb return inbound.aggregateFrames() .receive() .asInputStream() - .doOnTerminate(outbound::sendClose); + .doFinally(signalType -> outbound.sendClose()); } }