Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.elasticsearch.common.util.concurrent;

import org.apache.lucene.store.AlreadyClosedException;

import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -68,7 +66,7 @@ public final void decRef() {
}

protected void alreadyClosed() {
throw new AlreadyClosedException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]");
throw new IllegalStateException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface RefCounted {
*
* @see #decRef
* @see #tryIncRef()
* @throws org.apache.lucene.store.AlreadyClosedException iff the reference counter can not be incremented.
* @throws IllegalStateException iff the reference counter can not be incremented.
*/
void incRef();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.common.util.concurrent;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

Expand Down Expand Up @@ -70,14 +69,14 @@ public void testRefCount() throws IOException {
try {
counted.incRef();
fail(" expected exception");
} catch (AlreadyClosedException ex) {
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("test is already closed can't increment refCount current count [0]"));
}

try {
counted.ensureOpen();
fail(" expected exception");
} catch (AlreadyClosedException ex) {
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("closed"));
}
}
Expand Down Expand Up @@ -116,7 +115,7 @@ public void run() {
try {
counted.ensureOpen();
fail("expected to be closed");
} catch (AlreadyClosedException ex) {
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("closed"));
}
assertThat(counted.refCount(), is(0));
Expand All @@ -140,7 +139,7 @@ protected void closeInternal() {
public void ensureOpen() {
if (closed.get()) {
assert this.refCount() == 0;
throw new AlreadyClosedException("closed");
throw new IllegalStateException("closed");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.nio;

import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.nio.utils.ExceptionsHelper;

import java.nio.ByteBuffer;
Expand All @@ -41,6 +42,7 @@ public final class InboundChannelBuffer implements AutoCloseable {
private static final int PAGE_MASK = PAGE_SIZE - 1;
private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE);
private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
private static final Page[] EMPTY_BYTE_PAGE_ARRAY = new Page[0];


private final ArrayDeque<Page> pages;
Expand Down Expand Up @@ -152,6 +154,46 @@ public ByteBuffer[] sliceBuffersTo(long to) {
return buffers;
}

/**
* This method will return an array of {@link Page} representing the bytes from the beginning of
* this buffer up through the index argument that was passed. The pages and buffers will be duplicates of
* the internal components, so any modifications to the markers {@link ByteBuffer#position()},
* {@link ByteBuffer#limit()}, etc will not modify the this class. Additionally, this will internally
* retain the underlying pages, so the pages returned by this method must be closed.
*
* @param to the index to slice up to
* @return the pages
*/
public Page[] sliceAndRetainPagesTo(long to) {
if (to > capacity) {
throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity +
"], with slice parameters to [" + to + "]");
} else if (to == 0) {
return EMPTY_BYTE_PAGE_ARRAY;
}
long indexWithOffset = to + offset;
int pageCount = pageIndex(indexWithOffset);
int finalLimit = indexInPage(indexWithOffset);
if (finalLimit != 0) {
pageCount += 1;
}

Page[] pages = new Page[pageCount];
Iterator<Page> pageIterator = this.pages.iterator();
Page firstPage = pageIterator.next().duplicate();
ByteBuffer firstBuffer = firstPage.byteBuffer;
firstBuffer.position(firstBuffer.position() + offset);
pages[0] = firstPage;
for (int i = 1; i < pages.length; i++) {
pages[i] = pageIterator.next().duplicate();
}
if (finalLimit != 0) {
pages[pages.length - 1].byteBuffer.limit(finalLimit);
}

return pages;
}

/**
* This method will return an array of {@link ByteBuffer} representing the bytes from the index passed
* through the end of this buffer. The buffers will be duplicates of the internal buffers, so any
Expand Down Expand Up @@ -231,16 +273,49 @@ private int indexInPage(long index) {
public static class Page implements AutoCloseable {

private final ByteBuffer byteBuffer;
private final Runnable closeable;
// This is reference counted as some implementations want to retain the byte pages by calling
// sliceAndRetainPagesTo. With reference counting we can increment the reference count, return the
// pages, and safely close them when this channel buffer is done with them. The reference count
// would be 1 at that point, meaning that the pages will remain until the implementation closes
// theirs.
private final RefCountedCloseable refCountedCloseable;

public Page(ByteBuffer byteBuffer, Runnable closeable) {
this(byteBuffer, new RefCountedCloseable(closeable));
}

private Page(ByteBuffer byteBuffer, RefCountedCloseable refCountedCloseable) {
this.byteBuffer = byteBuffer;
this.closeable = closeable;
this.refCountedCloseable = refCountedCloseable;
}

private Page duplicate() {
refCountedCloseable.incRef();
return new Page(byteBuffer.duplicate(), refCountedCloseable);
}

public ByteBuffer getByteBuffer() {
return byteBuffer;
}

@Override
public void close() {
closeable.run();
refCountedCloseable.decRef();
}

private static class RefCountedCloseable extends AbstractRefCounted {

private final Runnable closeable;

private RefCountedCloseable(Runnable closeable) {
super("byte array page");
this.closeable = closeable;
}

@Override
protected void closeInternal() {
closeable.run();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public class InboundChannelBufferTests extends ESTestCase {

private static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES;
private final Supplier<InboundChannelBuffer.Page> defaultPageSupplier = () ->
new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {});
new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {
});

public void testNewBufferHasSinglePage() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
Expand Down Expand Up @@ -167,6 +168,49 @@ public void testClose() {
expectThrows(IllegalStateException.class, () -> channelBuffer.ensureCapacity(1));
}

public void testCloseRetainedPages() {
ConcurrentLinkedQueue<AtomicBoolean> queue = new ConcurrentLinkedQueue<>();
Supplier<InboundChannelBuffer.Page> supplier = () -> {
AtomicBoolean atomicBoolean = new AtomicBoolean();
queue.add(atomicBoolean);
return new InboundChannelBuffer.Page(ByteBuffer.allocate(PAGE_SIZE), () -> atomicBoolean.set(true));
};
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(supplier);
channelBuffer.ensureCapacity(PAGE_SIZE * 4);

assertEquals(4, queue.size());

for (AtomicBoolean closedRef : queue) {
assertFalse(closedRef.get());
}

InboundChannelBuffer.Page[] pages = channelBuffer.sliceAndRetainPagesTo(PAGE_SIZE * 2);

pages[1].close();

for (AtomicBoolean closedRef : queue) {
assertFalse(closedRef.get());
}

channelBuffer.close();

int i = 0;
for (AtomicBoolean closedRef : queue) {
if (i < 1) {
assertFalse(closedRef.get());
} else {
assertTrue(closedRef.get());
}
++i;
}

pages[0].close();

for (AtomicBoolean closedRef : queue) {
assertTrue(closedRef.get());
}
}

public void testAccessByteBuffers() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP

@Override
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class HttpReadWriteHandler implements ReadWriteHandler {

@Override
public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException {
int bytesConsumed = adaptor.read(channelBuffer.sliceBuffersTo(channelBuffer.getIndex()));
int bytesConsumed = adaptor.read(channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex()));
Object message;
while ((message = adaptor.pollInboundMessage()) != null) {
handleRequest(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.netty.channel.embedded.EmbeddedChannel;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.nio.FlushOperation;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.WriteOperation;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -97,6 +98,13 @@ public int read(ByteBuffer[] buffers) {
return byteBuf.readerIndex() - initialReaderIndex;
}

public int read(InboundChannelBuffer.Page[] pages) {
ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages);
int readableBytes = byteBuf.readableBytes();
nettyChannel.writeInbound(byteBuf);
return readableBytes;
}

public Object pollInboundMessage() {
return nettyChannel.readInbound();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
Expand All @@ -63,6 +65,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
Expand Down Expand Up @@ -103,6 +106,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "http.nio.worker_count"), Setting.Property.NodeScope);

private final PageCacheRecycler pageCacheRecycler;

private final boolean tcpNoDelay;
private final boolean tcpKeepAlive;
private final boolean reuseAddress;
Expand All @@ -115,9 +120,11 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
private HttpChannelFactory channelFactory;
private final NioCorsConfig corsConfig;

public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, HttpServerTransport.Dispatcher dispatcher) {
public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry,
HttpServerTransport.Dispatcher dispatcher) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
this.pageCacheRecycler = pageCacheRecycler;

ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
Expand Down Expand Up @@ -329,11 +336,15 @@ private HttpChannelFactory() {
@Override
public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
NioHttpChannel nioChannel = new NioHttpChannel(channel);
java.util.function.Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
};
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this,
handlingSettings, corsConfig);
Consumer<Exception> exceptionHandler = (e) -> exceptionCaught(nioChannel, e);
SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline,
InboundChannelBuffer.allocatingInstance());
new InboundChannelBuffer(pageSupplier));
nioChannel.setContext(context);
return nioChannel;
}
Expand Down
Loading