diff --git a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java index 4549249002d..07384c5b8d1 100644 --- a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java @@ -246,21 +246,29 @@ public void readAsync(final int numBytes, final AsyncCompletionHandler readAsync(numBytes, handler, 0); } - /** - * @param additionalTimeout Must be equal to {@link #NO_SCHEDULE_TIMEOUT} when the method is called by a Netty channel handler. - * A timeout is scheduled only by the public read methods. Taking into account that concurrent pending readers - * are not allowed, there must not be a situation when threads attempt to schedule a timeout - * before the previous one is removed or completed. - */ private void readAsync(final int numBytes, final AsyncCompletionHandler handler, final int additionalTimeout) { - scheduleReadTimeout(additionalTimeout); + readAsync(numBytes, handler, additionalTimeout, null); + } + + private void readAsync(final PendingReader pendingReader) { + readAsync(pendingReader.numBytes, pendingReader.handler, 0, pendingReader); + } + + private void readAsync(final int numBytes, final AsyncCompletionHandler handler, final int additionalTimeout, + final PendingReader pendingReaderToUse) { ByteBuf buffer = null; Throwable exceptionResult = null; + TimeoutHandle preparedTimeoutHandle = null; synchronized (this) { exceptionResult = pendingException; if (exceptionResult == null) { if (!hasBytesAvailable(numBytes)) { - pendingReader = new PendingReader(numBytes, handler); + if (pendingReaderToUse == null) { + preparedTimeoutHandle = prepareTimeout(additionalTimeout); + pendingReader = new PendingReader(numBytes, handler, preparedTimeoutHandle); + } else { + pendingReader = pendingReaderToUse; + } } else { CompositeByteBuf composite = allocator.compositeBuffer(pendingInboundBuffers.size()); int bytesNeeded = numBytes; @@ -284,12 +292,21 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler } } } + + if (preparedTimeoutHandle != null) { + preparedTimeoutHandle.scheduleTimeout(); + } + if (exceptionResult != null) { - disableReadTimeout(); + if (pendingReaderToUse != null) { + pendingReaderToUse.timeoutHandle.cancel(); + } handler.failed(exceptionResult); } if (buffer != null) { - disableReadTimeout(); + if (pendingReaderToUse != null) { + pendingReaderToUse.timeoutHandle.cancel(); + } handler.completed(buffer); } } @@ -320,8 +337,7 @@ private void handleReadResponse(final io.netty.buffer.ByteBuf buffer, final Thro } if (localPendingReader != null) { - //if there is a pending reader, then the reader has scheduled a timeout and we should not attempt to schedule another one - readAsync(localPendingReader.numBytes, localPendingReader.handler, NO_SCHEDULE_TIMEOUT); + readAsync(localPendingReader); } } @@ -397,10 +413,12 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable t) private static final class PendingReader { private final int numBytes; private final AsyncCompletionHandler handler; + private final TimeoutHandle timeoutHandle; - private PendingReader(final int numBytes, final AsyncCompletionHandler handler) { + private PendingReader(final int numBytes, final AsyncCompletionHandler handler, final TimeoutHandle timeoutHandle) { this.numBytes = numBytes; this.handler = handler; + this.timeoutHandle = timeoutHandle; } } @@ -485,9 +503,6 @@ public void operationComplete(final ChannelFuture future) { } private void scheduleReadTimeout(final int additionalTimeout) { - if (additionalTimeout == NO_SCHEDULE_TIMEOUT) { - return; - } adjustTimeout(false, additionalTimeout); } @@ -509,4 +524,46 @@ private void adjustTimeout(final boolean disable, final int additionalTimeout) { } } } + + private TimeoutHandle prepareTimeout(final int additionalTimeout) { + return new SimpleTimeoutHandle(additionalTimeout); + } + + private final class SimpleTimeoutHandle implements TimeoutHandle { + private final int additionalTimeout; + + private boolean cancelled = false; + private boolean scheduled = false; + + private SimpleTimeoutHandle(final int additionalTimeout) { + this.additionalTimeout = additionalTimeout; + } + + @Override + public synchronized void scheduleTimeout() { + assert !scheduled : "Attempted to schedule an already scheduled timeout"; + + if (!cancelled) { + scheduleReadTimeout(additionalTimeout); + scheduled = true; + } + } + + @Override + public synchronized void cancel() { + assert !cancelled : "Attempted to cancel an already cancelled timeout"; + + if (scheduled) { + disableReadTimeout(); + scheduled = false; + } + cancelled = true; + } + } + + private interface TimeoutHandle { + void scheduleTimeout(); + + void cancel(); + } }