diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index db357598e3cb6..f7808d0c75ab8 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -172,6 +172,12 @@ com.google.guava guava + + com.google.code.findbugs + annotations + 3.0.1 + provided + org.eclipse.jetty @@ -288,6 +294,12 @@ hamcrest-library test + + net.jodah + concurrentunit + 0.4.6 + test + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 354176f5ff929..c5e65e5afd990 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -130,10 +130,20 @@ public class AbfsConfiguration{ DefaultValue = AZURE_BLOCK_LOCATION_HOST_DEFAULT) private String azureBlockLocationHost; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_OUT, + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_SHOULD_USE_OLD_ABFSOUTPUTSTREAM, + DefaultValue = DEFAULT_AZURE_SHOULD_USE_OLD_ABFSOUTPUTSTREAM) + private boolean shouldUseOlderAbfsOutputStream; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_CONCURRENCY_FACTOR, MinValue = 1, - DefaultValue = MAX_CONCURRENT_WRITE_THREADS) - private int maxConcurrentWriteThreads; + DefaultValue = DEFAULT_WRITE_CONCURRENCY_FACTOR) + private int writeConcurrencyFactor; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_WRITE_MEM_USAGE_PERCENTAGE, + MinValue = MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE, + MaxValue = MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE, + DefaultValue = DEFAULT_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE) + private int maxWriteMemoryUsagePercentage; @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_LIST_MAX_RESULTS, MinValue = 1, @@ -464,8 +474,16 @@ public String getAzureBlockLocationHost() { return this.azureBlockLocationHost; } - public int getMaxConcurrentWriteThreads() { - return this.maxConcurrentWriteThreads; + public boolean shouldUseOlderAbfsOutputStream() { + return this.shouldUseOlderAbfsOutputStream; + } + + public int getWriteConcurrencyFactor() { + return this.writeConcurrencyFactor; + } + + public int getMaxWriteMemoryUsagePercentage() { + return this.maxWriteMemoryUsagePercentage; } public int getMaxConcurrentReadThreads() { @@ -784,4 +802,10 @@ private String appendSlashIfNeeded(String authority) { return authority; } + @VisibleForTesting + void setShouldUseOlderAbfsOutputStream( + boolean shouldUseOlderAbfsOutputStream) { + this.shouldUseOlderAbfsOutputStream = shouldUseOlderAbfsOutputStream; + } + } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 8e0e6c1eb0b84..e326da3441be4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -85,6 +85,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamOld; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; @@ -424,6 +425,15 @@ public OutputStream createFile(final Path path, isNamespaceEnabled ? getOctalNotation(umask) : null); perfInfo.registerResult(op.getResult()).registerSuccess(true); + if (abfsConfiguration.shouldUseOlderAbfsOutputStream()) { + return new AbfsOutputStreamOld( + client, + statistics, + getRelativePath(path), + 0, + populateAbfsOutputStreamContext()); + } + return new AbfsOutputStream( client, statistics, @@ -439,6 +449,10 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext() { .enableFlush(abfsConfiguration.isFlushEnabled()) .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) + .withWriteConcurrencyFactor(abfsConfiguration + .getWriteConcurrencyFactor()) + .withMaxWriteMemoryUsagePercentage(abfsConfiguration + .getMaxWriteMemoryUsagePercentage()) .build(); } @@ -530,6 +544,16 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic perfInfo.registerSuccess(true); + + if (abfsConfiguration.shouldUseOlderAbfsOutputStream()) { + return new AbfsOutputStreamOld( + client, + statistics, + getRelativePath(path), + 0, + populateAbfsOutputStreamContext()); + } + return new AbfsOutputStream( client, statistics, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 5794d32f468f4..2a6a86a1d8356 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -51,6 +51,9 @@ public final class ConfigurationKeys { public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out"; + public static final String AZURE_SHOULD_USE_OLD_ABFSOUTPUTSTREAM = "fs.azure.use.olderabfsoutputstream"; + public static final String AZURE_WRITE_CONCURRENCY_FACTOR = "fs.azure.write.concurrency.factor"; + public static final String AZURE_MAX_WRITE_MEM_USAGE_PERCENTAGE = "fs.azure.max.write.memory.usage.percentage"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = "fs.azure.concurrentRequestCount.in"; public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append"; public static final String AZURE_LIST_MAX_RESULTS = "fs.azure.list.max.results"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 01d5202cc26c2..2176e93929a83 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -56,6 +56,11 @@ public final class FileSystemConfigurations { public static final int MAX_CONCURRENT_READ_THREADS = 12; public static final int MAX_CONCURRENT_WRITE_THREADS = 8; + public static final int DEFAULT_WRITE_CONCURRENCY_FACTOR = 4; + public static final boolean DEFAULT_AZURE_SHOULD_USE_OLD_ABFSOUTPUTSTREAM = false; + public static final int MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE = 20; + public static final int MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE = 90; + public static final int DEFAULT_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE = MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE; public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false; public static final boolean DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = false; public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsByteBufferPool.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsByteBufferPool.java new file mode 100644 index 0000000000000..ea8ba3c6c1ccc --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsByteBufferPool.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.concurrent.ArrayBlockingQueue; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +import static java.lang.Math.ceil; +import static java.lang.Math.min; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE; + +/** + * Pool for byte[] + */ +public class AbfsByteBufferPool { + + private static final int HUNDRED = 100; + + /** + * Queue holding the free buffers. + */ + private ArrayBlockingQueue freeBuffers; + /** + * Count to track the buffers issued from AbfsByteBufferPool and yet to be + * returned. + */ + private int numBuffersInUse; + + private int bufferSize; + + private int maxBuffersToPool; + private int maxMemUsagePercentage; + + private static final Runtime RT = Runtime.getRuntime(); + private static final int AVAILABLE_PROCESSORS = RT.availableProcessors(); + + private static int MAX_BUFFERS_THAT_CAN_BE_IN_USE; + + /** + * @param bufferSize Size of the byte[] to be returned. + * @param queueCapacity Maximum number of buffers that the pool can + * keep within the pool. + * @param maxMemUsagePercentage Maximum percentage of memory that can + * be used by the pool from the max + * available memory. + */ + public AbfsByteBufferPool(final int bufferSize, final int queueCapacity, + final int maxMemUsagePercentage) { + validate(queueCapacity, maxMemUsagePercentage); + this.maxMemUsagePercentage = maxMemUsagePercentage; + this.bufferSize = bufferSize; + this.numBuffersInUse = 0; + this.maxBuffersToPool = queueCapacity; + freeBuffers = new ArrayBlockingQueue<>(queueCapacity); + } + + private void validate(final int queueCapacity, + final int maxWriteMemUsagePercentage) { + Preconditions.checkArgument(maxWriteMemUsagePercentage + >= MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE + && maxWriteMemUsagePercentage + <= MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE, + "maxWriteMemUsagePercentage should be in range (%s - %s)", + MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE, + MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE); + Preconditions + .checkArgument(queueCapacity > 0, "queueCapacity cannot be < 1"); + } + + private void setMaxBuffersThatCanBeInUse() { + double freeMemory = RT.maxMemory() - (RT.totalMemory() - RT.freeMemory()); + int bufferCountByMemory = (int) ceil( + (freeMemory * maxMemUsagePercentage / HUNDRED) / bufferSize); + int bufferCountByMaxFreeBuffers = maxBuffersToPool + AVAILABLE_PROCESSORS; + MAX_BUFFERS_THAT_CAN_BE_IN_USE = min(bufferCountByMemory, + bufferCountByMaxFreeBuffers); + if (MAX_BUFFERS_THAT_CAN_BE_IN_USE < 2) { + MAX_BUFFERS_THAT_CAN_BE_IN_USE = 2; + } + } + + private synchronized boolean isPossibleToIssueNewBuffer() { + setMaxBuffersThatCanBeInUse(); + return numBuffersInUse < MAX_BUFFERS_THAT_CAN_BE_IN_USE; + } + + /** + * @return byte[] from the pool if available otherwise new byte[] is returned. + * Waits if pool is empty and already maximum number of buffers are in use. + */ + public byte[] get() { + byte[] byteArray = null; + synchronized (this) { + byteArray = freeBuffers.poll(); + if (byteArray == null && isPossibleToIssueNewBuffer()) { + byteArray = new byte[bufferSize]; + } + if (byteArray != null) { + numBuffersInUse++; + return byteArray; + } + } + try { + byteArray = freeBuffers.take(); + synchronized (this) { + numBuffersInUse++; + return byteArray; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + + /** + * @param byteArray The buffer to be offered back to the pool. + */ + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED") + public synchronized void release(byte[] byteArray) { + Preconditions.checkArgument(byteArray.length == bufferSize, + "Buffer size has" + " to be %s (Received buffer length: %s)", + bufferSize, byteArray.length); + numBuffersInUse--; + if (numBuffersInUse < 0) { + numBuffersInUse = 0; + } + freeBuffers.offer(byteArray); + } + + @VisibleForTesting + public synchronized int getBuffersInUse() { + return this.numBuffersInUse; + } + + @VisibleForTesting + public synchronized ArrayBlockingQueue getFreeBuffers() { + return freeBuffers; + } + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 89afca4220251..68ed42e476556 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -20,14 +20,14 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.HttpURLConnection; -import java.nio.ByteBuffer; import java.util.Locale; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -41,7 +41,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; -import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.StreamCapabilities; @@ -65,26 +64,25 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private long lastFlushOffset; private long lastTotalAppendOffset = 0; - private final int bufferSize; + private static int bufferSize; private byte[] buffer; private int bufferIndex; - private final int maxConcurrentRequestCount; - private ConcurrentLinkedDeque writeOperations; - private final ThreadPoolExecutor threadExecutor; - private final ExecutorCompletionService completionService; + private final ConcurrentLinkedDeque writeOperations = new ConcurrentLinkedDeque<>(); + private static ThreadPoolExecutor threadExecutor; // SAS tokens can be re-used until they expire private CachedSASToken cachedSasToken; /** - * Queue storing buffers with the size of the Azure block ready for + * Pool storing buffers with the size of the Azure block ready for * reuse. The pool allows reusing the blocks instead of allocating new * blocks. After the data is sent to the service, the buffer is returned * back to the queue */ - private final ElasticByteBufferPool byteBufferPool - = new ElasticByteBufferPool(); + private static AbfsByteBufferPool byteBufferPool; + + private static volatile Boolean isFirstObj = true; private final Statistics statistics; private final AbfsOutputStreamStatistics outputStreamStatistics; @@ -108,23 +106,64 @@ public AbfsOutputStream( .isDisableOutputStreamFlush(); this.lastError = null; this.lastFlushOffset = 0; - this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); - this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; - this.writeOperations = new ConcurrentLinkedDeque<>(); this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); - - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); - - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>()); - this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.cachedSasToken = new CachedSASToken( abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); + + init(abfsOutputStreamContext); + buffer = new byte[bufferSize]; + } + + private void init(final AbfsOutputStreamContext abfsOutputStreamContext) { + if (isCommonPoolsInitialised()) { + return; + } + + initWriteBufferPool(abfsOutputStreamContext); + + ThreadFactory daemonThreadFactory = new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + Thread daemonThread = Executors.defaultThreadFactory() + .newThread(runnable); + daemonThread.setDaemon(true); + return daemonThread; + } + }; + int maxConcurrentThreadCount = + abfsOutputStreamContext.getWriteConcurrencyFactor() * Runtime.getRuntime() + .availableProcessors(); + threadExecutor = new ThreadPoolExecutor(maxConcurrentThreadCount, + maxConcurrentThreadCount, 10L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), daemonThreadFactory); + } + + private boolean isCommonPoolsInitialised() { + if (threadExecutor != null) { + return true; + } + + boolean isFirstObjTemp = isFirstObj; + if (isFirstObjTemp) { + synchronized (AbfsOutputStream.class) { + isFirstObjTemp = isFirstObj; + if (isFirstObjTemp) { + isFirstObj = false; + return false; + } + } + } + return true; + } + + private static synchronized void initWriteBufferPool(AbfsOutputStreamContext abfsosContext) { + bufferSize = abfsosContext.getWriteBufferSize(); + int corePoolSize = + 1 + (abfsosContext.getWriteConcurrencyFactor() * Runtime.getRuntime() + .availableProcessors()); + byteBufferPool = new AbfsByteBufferPool(bufferSize, corePoolSize, + abfsosContext.getMaxWriteMemoryUsagePercentage()); } /** @@ -275,7 +314,6 @@ public synchronized void close() throws IOException { try { flushInternal(true); - threadExecutor.shutdown(); } catch (IOException e) { // Problems surface in try-with-resources clauses if // the exception thrown in a close == the one already thrown @@ -288,9 +326,6 @@ public synchronized void close() throws IOException { bufferIndex = 0; closed = true; writeOperations.clear(); - if (!threadExecutor.isShutdown()) { - threadExecutor.shutdownNow(); - } } if (LOG.isDebugEnabled()) { LOG.debug("Closing AbfsOutputStream ", toString()); @@ -318,18 +353,12 @@ private synchronized void writeCurrentBufferToService() throws IOException { final byte[] bytes = buffer; final int bytesLength = bufferIndex; outputStreamStatistics.bytesToUpload(bytesLength); - buffer = byteBufferPool.getBuffer(false, bufferSize).array(); + buffer = byteBufferPool.get(); bufferIndex = 0; final long offset = position; position += bytesLength; - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { - long start = System.currentTimeMillis(); - waitForTaskToComplete(); - outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis()); - } - - final Future job = completionService.submit(new Callable() { + final Future job = threadExecutor.submit(new Callable() { @Override public Void call() throws Exception { AbfsPerfTracker tracker = client.getAbfsPerfTracker(); @@ -339,7 +368,7 @@ public Void call() throws Exception { bytesLength, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()); - byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); + byteBufferPool.release(bytes); perfInfo.registerSuccess(true); return null; } @@ -429,22 +458,6 @@ private synchronized void shrinkWriteOperationQueue() throws IOException { } } - private void waitForTaskToComplete() throws IOException { - boolean completed; - for (completed = false; completionService.poll() != null; completed = true) { - // keep polling until there is no data - } - - if (!completed) { - try { - completionService.take(); - } catch (InterruptedException e) { - lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); - throw lastError; - } - } - } - private static class WriteOperation { private final Future task; private final long startOffset; @@ -462,8 +475,9 @@ private static class WriteOperation { } @VisibleForTesting - public synchronized void waitForPendingUploads() throws IOException { - waitForTaskToComplete(); + public synchronized void waitForPendingUploads() + throws ExecutionException, InterruptedException { + writeOperations.getFirst().task.get(); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index dcd6c45981734..b7dc4d794998c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -29,6 +29,10 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private boolean disableOutputStreamFlush; + private int writeConcurrencyFactor; + + private int maxWriteMemoryUsagePercentage; + private AbfsOutputStreamStatistics streamStatistics; public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { @@ -58,6 +62,18 @@ public AbfsOutputStreamContext withStreamStatistics( return this; } + public AbfsOutputStreamContext withWriteConcurrencyFactor( + final int writeConcurrencyFactor) { + this.writeConcurrencyFactor = writeConcurrencyFactor; + return this; + } + + public AbfsOutputStreamContext withMaxWriteMemoryUsagePercentage( + final int maxWriteMemoryUsagePercentage) { + this.maxWriteMemoryUsagePercentage = maxWriteMemoryUsagePercentage; + return this; + } + public AbfsOutputStreamContext build() { // Validation of parameters to be done here. return this; @@ -75,6 +91,14 @@ public boolean isDisableOutputStreamFlush() { return disableOutputStreamFlush; } + public int getWriteConcurrencyFactor() { + return writeConcurrencyFactor; + } + + public int getMaxWriteMemoryUsagePercentage() { + return maxWriteMemoryUsagePercentage; + } + public AbfsOutputStreamStatistics getStreamStatistics() { return streamStatistics; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamOld.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamOld.java new file mode 100644 index 0000000000000..86e9fb0ea5927 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamOld.java @@ -0,0 +1,502 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.nio.ByteBuffer; +import java.util.Locale; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; +import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.Syncable; + +import static org.apache.hadoop.io.IOUtils.wrapException; + +/** + * The BlobFsOutputStream for Rest AbfsClient. + */ +public class AbfsOutputStreamOld extends OutputStream implements Syncable, StreamCapabilities { + + private final AbfsClient client; + private final String path; + private long position; + private boolean closed; + private boolean supportFlush; + private boolean disableOutputStreamFlush; + private volatile IOException lastError; + + private long lastFlushOffset; + private long lastTotalAppendOffset = 0; + + private final int bufferSize; + private byte[] buffer; + private int bufferIndex; + private final int maxConcurrentRequestCount; + + private ConcurrentLinkedDeque writeOperations; + private final ThreadPoolExecutor threadExecutor; + private final ExecutorCompletionService completionService; + + // SAS tokens can be re-used until they expire + private CachedSASToken cachedSasToken; + + /** + * Queue storing buffers with the size of the Azure block ready for + * reuse. The pool allows reusing the blocks instead of allocating new + * blocks. After the data is sent to the service, the buffer is returned + * back to the queue + */ + private final ElasticByteBufferPool byteBufferPool + = new ElasticByteBufferPool(); + + private final Statistics statistics; + private final AbfsOutputStreamStatistics outputStreamStatistics; + + private static final Logger LOG = + LoggerFactory.getLogger(AbfsOutputStream.class); + + public AbfsOutputStreamOld( + final AbfsClient client, + final Statistics statistics, + final String path, + final long position, + AbfsOutputStreamContext abfsOutputStreamContext) { + this.client = client; + this.statistics = statistics; + this.path = path; + this.position = position; + this.closed = false; + this.supportFlush = abfsOutputStreamContext.isEnableFlush(); + this.disableOutputStreamFlush = abfsOutputStreamContext + .isDisableOutputStreamFlush(); + this.lastError = null; + this.lastFlushOffset = 0; + this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); + this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); + this.bufferIndex = 0; + this.writeOperations = new ConcurrentLinkedDeque<>(); + this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); + + this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + + this.threadExecutor + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>()); + this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + this.cachedSasToken = new CachedSASToken( + abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); + } + + /** + * Query the stream for a specific capability. + * + * @param capability string to query the stream support for. + * @return true for hsync and hflush. + */ + @Override + public boolean hasCapability(String capability) { + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return supportFlush; + default: + return false; + } + } + + /** + * Writes the specified byte to this output stream. The general contract for + * write is that one byte is written to the output stream. The byte to be + * written is the eight low-order bits of the argument b. The 24 high-order + * bits of b are ignored. + * + * @param byteVal the byteValue to write. + * @throws IOException if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + @Override + public void write(final int byteVal) throws IOException { + write(new byte[]{(byte) (byteVal & 0xFF)}); + } + + /** + * Writes length bytes from the specified byte array starting at off to + * this output stream. + * + * @param data the byte array to write. + * @param off the start off in the data. + * @param length the number of bytes to write. + * @throws IOException if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + @Override + public synchronized void write(final byte[] data, final int off, final int length) + throws IOException { + maybeThrowLastError(); + + Preconditions.checkArgument(data != null, "null data"); + + if (off < 0 || length < 0 || length > data.length - off) { + throw new IndexOutOfBoundsException(); + } + + int currentOffset = off; + int writableBytes = bufferSize - bufferIndex; + int numberOfBytesToWrite = length; + + while (numberOfBytesToWrite > 0) { + if (writableBytes <= numberOfBytesToWrite) { + System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); + bufferIndex += writableBytes; + writeCurrentBufferToService(); + currentOffset += writableBytes; + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + } else { + System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); + bufferIndex += numberOfBytesToWrite; + numberOfBytesToWrite = 0; + } + + writableBytes = bufferSize - bufferIndex; + } + incrementWriteOps(); + } + + /** + * Increment Write Operations. + */ + private void incrementWriteOps() { + if (statistics != null) { + statistics.incrementWriteOps(1); + } + } + + /** + * Throw the last error recorded if not null. + * After the stream is closed, this is always set to + * an exception, so acts as a guard against method invocation once + * closed. + * @throws IOException if lastError is set + */ + private void maybeThrowLastError() throws IOException { + if (lastError != null) { + throw lastError; + } + } + + /** + * Flushes this output stream and forces any buffered output bytes to be + * written out. If any data remains in the payload it is committed to the + * service. Data is queued for writing and forced out to the service + * before the call returns. + */ + @Override + public void flush() throws IOException { + if (!disableOutputStreamFlush) { + flushInternalAsync(); + } + } + + /** Similar to posix fsync, flush out the data in client's user buffer + * all the way to the disk device (but the disk may have it in its cache). + * @throws IOException if error occurs + */ + @Override + public void hsync() throws IOException { + if (supportFlush) { + flushInternal(false); + } + } + + /** Flush out the data in client's user buffer. After the return of + * this call, new readers will see the data. + * @throws IOException if any error occurs + */ + @Override + public void hflush() throws IOException { + if (supportFlush) { + flushInternal(false); + } + } + + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. Close the access to the stream and + * shutdown the upload thread pool. + * If the blob was created, its lease will be released. + * Any error encountered caught in threads and stored will be rethrown here + * after cleanup. + */ + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + try { + flushInternal(true); + threadExecutor.shutdown(); + } catch (IOException e) { + // Problems surface in try-with-resources clauses if + // the exception thrown in a close == the one already thrown + // -so we wrap any exception with a new one. + // See HADOOP-16785 + throw wrapException(path, e.getMessage(), e); + } finally { + lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + buffer = null; + bufferIndex = 0; + closed = true; + writeOperations.clear(); + if (!threadExecutor.isShutdown()) { + threadExecutor.shutdownNow(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Closing AbfsOutputStream ", toString()); + } + } + + private synchronized void flushInternal(boolean isClose) throws IOException { + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToService(isClose); + } + + private synchronized void flushInternalAsync() throws IOException { + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToServiceAsync(); + } + + private synchronized void writeCurrentBufferToService() throws IOException { + if (bufferIndex == 0) { + return; + } + outputStreamStatistics.writeCurrentBuffer(); + + final byte[] bytes = buffer; + final int bytesLength = bufferIndex; + outputStreamStatistics.bytesToUpload(bytesLength); + buffer = byteBufferPool.getBuffer(false, bufferSize).array(); + bufferIndex = 0; + final long offset = position; + position += bytesLength; + + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + long start = System.currentTimeMillis(); + waitForTaskToComplete(); + outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis()); + } + + final Future job = completionService.submit(new Callable() { + @Override + public Void call() throws Exception { + AbfsPerfTracker tracker = client.getAbfsPerfTracker(); + try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, + "writeCurrentBufferToService", "append")) { + AbfsRestOperation op = client.append(path, offset, bytes, 0, + bytesLength, cachedSasToken.get()); + cachedSasToken.update(op.getSasToken()); + perfInfo.registerResult(op.getResult()); + byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); + perfInfo.registerSuccess(true); + return null; + } + } + }); + + if (job.isCancelled()) { + outputStreamStatistics.uploadFailed(bytesLength); + } else { + outputStreamStatistics.uploadSuccessful(bytesLength); + } + writeOperations.add(new WriteOperation(job, offset, bytesLength)); + + // Try to shrink the queue + shrinkWriteOperationQueue(); + } + + private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException { + for (WriteOperation writeOperation : writeOperations) { + try { + writeOperation.task.get(); + } catch (Exception ex) { + if (ex.getCause() instanceof AbfsRestOperationException) { + if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + throw new FileNotFoundException(ex.getMessage()); + } + } + + if (ex.getCause() instanceof AzureBlobFileSystemException) { + ex = (AzureBlobFileSystemException) ex.getCause(); + } + lastError = new IOException(ex); + throw lastError; + } + } + flushWrittenBytesToServiceInternal(position, false, isClose); + } + + private synchronized void flushWrittenBytesToServiceAsync() throws IOException { + shrinkWriteOperationQueue(); + + if (this.lastTotalAppendOffset > this.lastFlushOffset) { + this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true, + false/*Async flush on close not permitted*/); + } + } + + private synchronized void flushWrittenBytesToServiceInternal(final long offset, + final boolean retainUncommitedData, final boolean isClose) throws IOException { + AbfsPerfTracker tracker = client.getAbfsPerfTracker(); + try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, + "flushWrittenBytesToServiceInternal", "flush")) { + AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get()); + cachedSasToken.update(op.getSasToken()); + perfInfo.registerResult(op.getResult()).registerSuccess(true); + } catch (AzureBlobFileSystemException ex) { + if (ex instanceof AbfsRestOperationException) { + if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + throw new FileNotFoundException(ex.getMessage()); + } + } + throw new IOException(ex); + } + this.lastFlushOffset = offset; + } + + /** + * Try to remove the completed write operations from the beginning of write + * operation FIFO queue. + */ + private synchronized void shrinkWriteOperationQueue() throws IOException { + try { + while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { + writeOperations.peek().task.get(); + lastTotalAppendOffset += writeOperations.peek().length; + writeOperations.remove(); + // Incrementing statistics to indicate queue has been shrunk. + outputStreamStatistics.queueShrunk(); + } + } catch (Exception e) { + if (e.getCause() instanceof AzureBlobFileSystemException) { + lastError = (AzureBlobFileSystemException) e.getCause(); + } else { + lastError = new IOException(e); + } + throw lastError; + } + } + + private void waitForTaskToComplete() throws IOException { + boolean completed; + for (completed = false; completionService.poll() != null; completed = true) { + // keep polling until there is no data + } + + if (!completed) { + try { + completionService.take(); + } catch (InterruptedException e) { + lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); + throw lastError; + } + } + } + + private static class WriteOperation { + private final Future task; + private final long startOffset; + private final long length; + + WriteOperation(final Future task, final long startOffset, final long length) { + Preconditions.checkNotNull(task, "task"); + Preconditions.checkArgument(startOffset >= 0, "startOffset"); + Preconditions.checkArgument(length >= 0, "length"); + + this.task = task; + this.startOffset = startOffset; + this.length = length; + } + } + + @VisibleForTesting + public synchronized void waitForPendingUploads() throws IOException { + waitForTaskToComplete(); + } + + /** + * Getter method for AbfsOutputStream statistics. + * + * @return statistics for AbfsOutputStream. + */ + @VisibleForTesting + public AbfsOutputStreamStatistics getOutputStreamStatistics() { + return outputStreamStatistics; + } + + /** + * Getter to get the size of the task queue. + * + * @return the number of writeOperations in AbfsOutputStream. + */ + @VisibleForTesting + public int getWriteOperationsSize() { + return writeOperations.size(); + } + + /** + * Appending AbfsOutputStream statistics to base toString(). + * + * @return String with AbfsOutputStream statistics. + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(super.toString()); + sb.append("AbfsOuputStream@").append(this.hashCode()).append("){"); + sb.append(outputStreamStatistics.toString()); + sb.append("}"); + return sb.toString(); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java index 60f7f7d23f02a..54988a51ea2ca 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java @@ -24,6 +24,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -31,6 +32,8 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamOld; +import org.assertj.core.api.Assertions; import org.hamcrest.core.IsEqual; import org.hamcrest.core.IsNot; import org.junit.Test; @@ -41,6 +44,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsEqual.equalTo; + /** * Test flush operation. * This class cannot be run in parallel test mode--check comments in @@ -53,7 +59,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { private static final int ONE_MB = 1024 * 1024; private static final int FLUSH_TIMES = 200; private static final int THREAD_SLEEP_TIME = 1000; - + private static final int CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT = 25; private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8; private static final int WAITING_TIME = 1000; @@ -207,6 +213,92 @@ public Void call() throws Exception { assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen()); } + @Test + public void testShouldUseOlderAbfsOutputStreamConf() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + Path testPath = new Path(methodName.getMethodName() + "1"); + getFileSystem().getAbfsStore().getAbfsConfiguration() + .setShouldUseOlderAbfsOutputStream(true); + try (FSDataOutputStream stream = fs.create(testPath)) { + Assertions.assertThat(stream.getWrappedStream()).describedAs("When the " + + "shouldUseOlderAbfsOutputStream is set the wrapped stream inside " + + "the FSDataOutputStream object should be of class " + + "AbfsOutputStreamOld.").isInstanceOf(AbfsOutputStreamOld.class); + } + testPath = new Path(methodName.getMethodName()); + getFileSystem().getAbfsStore().getAbfsConfiguration() + .setShouldUseOlderAbfsOutputStream(false); + try (FSDataOutputStream stream = fs.create(testPath)) { + Assertions.assertThat(stream.getWrappedStream()).describedAs("When the " + + "shouldUseOlderAbfsOutputStream is set the wrapped stream inside " + + "the FSDataOutputStream object should be of class " + + "AbfsOutputStream.").isInstanceOf(AbfsOutputStream.class); + } + } + + @Test + public void testWriteWithMultipleOutputStreamAtTheSameTime() + throws IOException, InterruptedException, ExecutionException { + AzureBlobFileSystem fs = getFileSystem(); + String testFilePath = methodName.getMethodName(); + Path[] testPaths = new Path[CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT]; + createNStreamsAndWriteDifferentSizesConcurrently(fs, testFilePath, + CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT, testPaths); + assertSuccessfulWritesOnAllStreams(fs, + CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT, testPaths); + } + + private void assertSuccessfulWritesOnAllStreams(final FileSystem fs, + final int numConcurrentObjects, final Path[] testPaths) + throws IOException { + for (int i = 0; i < numConcurrentObjects; i++) { + FileStatus fileStatus = fs.getFileStatus(testPaths[i]); + int numWritesMadeOnStream = i + 1; + long expectedLength = TEST_BUFFER_SIZE * numWritesMadeOnStream; + assertThat(fileStatus.getLen(), is(equalTo(expectedLength))); + } + } + + private void createNStreamsAndWriteDifferentSizesConcurrently( + final FileSystem fs, final String testFilePath, + final int numConcurrentObjects, final Path[] testPaths) + throws ExecutionException, InterruptedException { + final byte[] b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + final ExecutorService es = Executors.newFixedThreadPool(40); + final List> futureTasks = new ArrayList<>(); + for (int i = 0; i < numConcurrentObjects; i++) { + Path testPath = new Path(testFilePath + i); + testPaths[i] = testPath; + int numWritesToBeDone = i + 1; + futureTasks.add(es.submit(() -> { + try (FSDataOutputStream stream = fs.create(testPath)) { + makeNWritesToStream(stream, numWritesToBeDone, b, es); + } + return null; + })); + } + for (Future futureTask : futureTasks) { + futureTask.get(); + } + es.shutdownNow(); + } + + private void makeNWritesToStream(final FSDataOutputStream stream, + final int numWrites, final byte[] b, final ExecutorService es) + throws ExecutionException, InterruptedException, IOException { + final List> futureTasks = new ArrayList<>(); + for (int i = 0; i < numWrites; i++) { + futureTasks.add(es.submit(() -> { + stream.write(b); + return null; + })); + } + for (Future futureTask : futureTasks) { + futureTask.get(); + } + } + @Test public void testFlushWithOutputStreamFlushEnabled() throws Exception { testFlush(false); @@ -236,10 +328,15 @@ private void testFlush(boolean disableOutputStreamFlush) throws Exception { stream.write(buffer); // Write asynchronously uploads data, so we must wait for completion - AbfsOutputStream abfsStream = (AbfsOutputStream) stream - .getWrappedStream(); - abfsStream.waitForPendingUploads(); - + if (stream.getWrappedStream() instanceof AbfsOutputStream) { + AbfsOutputStream abfsStream = (AbfsOutputStream) stream + .getWrappedStream(); + abfsStream.waitForPendingUploads(); + } else { + AbfsOutputStreamOld abfsStream = (AbfsOutputStreamOld) stream + .getWrappedStream(); + abfsStream.waitForPendingUploads(); + } // Flush commits the data so it can be read. stream.flush(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsByteBufferPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsByteBufferPool.java new file mode 100644 index 0000000000000..728ab7080312a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsByteBufferPool.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import java.lang.Thread.State; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; + +import net.jodah.concurrentunit.Waiter; +import org.junit.Test; + +import static java.lang.Thread.sleep; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.core.Is.is; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test class for AbfsByteBufferPool. + */ +public class TestAbfsByteBufferPool { + + private static final int TWO_MB = 2 * 1024 * 1024; + + private static final int MINUS_HUNDRED = -100; + private static final int HUNDRED_AND_ONE = 101; + + private static final int MEM_USAGE_PC_20 = 20; + private static final int MEM_USAGE_PC_25 = 25; + private static final int MEM_USAGE_PC_30 = 30; + private static final int MEM_USAGE_PC_90 = 90; + + private static final int SYNC_TEST_THREAD_CATEGORY_COUNT = 4; + private static final int SYNC_TEST_THREAD_COUNT_PER_CATEGORY = 4; + private static final int SYNC_TEST_VERIFICATION_PER_CATEGORY = 25; + private static final int SYNC_TEST_THREAD_POOL_SIZE = 50; + private static final int SYNC_TEST_MAX_FREE_BUFFER_COUNT = 4; + private static final long SYNC_TEST_WAITER_WAIT_TIME_6000_MS = 6000; + private static final long SYNC_TEST_VERIFIER_SLEEP_TIME_50_MS = 50; + private static final long SYNC_TEST_NON_VERIFIER_SLEEP_TIME_100_MS = 100; + + private static final int BUFFER_SIZE_2 = 2; + + private static final int QUEUE_CAPACITY_2 = 2; + private static final int QUEUE_CAPACITY_3 = 3; + private static final int QUEUE_CAPACITY_5 = 5; + + private static final int MAX_FREE_BUFFERS_4 = 4; + + private static final long SLEEP_TIME_5000_MS = 5000; + + @Test + public void testWithInvalidMaxWriteMemUsagePercentage() throws Exception { + List invalidMaxWriteMemUsagePercentageList = Arrays + .asList(MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE - 1, + MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE + 1, MINUS_HUNDRED, + HUNDRED_AND_ONE); + for (int val : invalidMaxWriteMemUsagePercentageList) { + intercept(IllegalArgumentException.class, String + .format("maxWriteMemUsagePercentage should be in range (%s - %s)", + MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE, + MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE), + () -> new AbfsByteBufferPool(TWO_MB, QUEUE_CAPACITY_2, val)); + } + } + + @Test + public void testWithInvalidMaxFreeBuffers() throws Exception { + List invalidMaxFreeBuffers = Arrays.asList(0, -1); + for (int val : invalidMaxFreeBuffers) { + intercept(IllegalArgumentException.class, String + .format("queueCapacity cannot be < 1", + MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE, + MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE), + () -> new AbfsByteBufferPool(TWO_MB, val, MEM_USAGE_PC_20)); + } + } + + @Test(expected = NullPointerException.class) + public void testReleaseNull() { + AbfsByteBufferPool pool = new AbfsByteBufferPool(TWO_MB, QUEUE_CAPACITY_5, + MEM_USAGE_PC_30); + pool.release(null); + } + + @Test + public void testReleaseMoreThanPoolCapacity() { + AbfsByteBufferPool pool = new AbfsByteBufferPool(TWO_MB, MAX_FREE_BUFFERS_4, + MEM_USAGE_PC_25); + int expectedPoolCapacity = MAX_FREE_BUFFERS_4; + for (int i = 0; i < expectedPoolCapacity * 2; i++) { + pool.release(new byte[TWO_MB]); + assertThat(pool.getFreeBuffers()).describedAs( + "Pool size should never exceed the expected capacity irrespective " + + "of the number of objects released to the pool") + .hasSizeLessThanOrEqualTo(expectedPoolCapacity); + } + } + + @Test + public void testReleaseWithSameBufferSize() { + AbfsByteBufferPool pool = new AbfsByteBufferPool(BUFFER_SIZE_2, + QUEUE_CAPACITY_3, MEM_USAGE_PC_25); + pool.release(new byte[BUFFER_SIZE_2]); + } + + @Test + public void testReleaseWithDifferentBufferSize() throws Exception { + AbfsByteBufferPool pool = new AbfsByteBufferPool(TWO_MB, QUEUE_CAPACITY_3, + MEM_USAGE_PC_25); + for (int i = 1; i < 2; i++) { + int finalI = i; + intercept(IllegalArgumentException.class, + String.format("Buffer size has to be %s", TWO_MB), + () -> pool.release(new byte[TWO_MB + finalI])); + intercept(IllegalArgumentException.class, + String.format("Buffer size has to be %s", TWO_MB), + () -> pool.release(new byte[TWO_MB - finalI])); + } + } + + @Test + public void testGet() throws Exception { + int expectedMaxBuffersInUse = + MAX_FREE_BUFFERS_4 + Runtime.getRuntime().availableProcessors(); + AbfsByteBufferPool pool = new AbfsByteBufferPool(TWO_MB, MAX_FREE_BUFFERS_4, + MEM_USAGE_PC_90); + + // Getting the maximum number of byte arrays from the pool + byte[] byteBuffer = null; + for (int i = 0; i < expectedMaxBuffersInUse; i++) { + byteBuffer = pool.get(); + assertThat(byteBuffer.length).describedAs("Pool has to return an object " + + "immediately, until maximum buffers are in use.").isEqualTo(TWO_MB); + } + + // Already maximum number of buffers are retrieved from the pool so the + // next get call is going to be blocked + Thread getThread = new Thread(() -> pool.get()); + getThread.start(); + sleep(SLEEP_TIME_5000_MS); + assertThat(getThread.getState()).describedAs("When maximum number of " + + "buffers are in use and no free buffers available in the pool the " + + "get call is blocked until an object is released to the pool.") + .isEqualTo(State.WAITING); + getThread.interrupt(); + + // Releasing one byte array back to the pool post which the get call we + // are making should not be blocking + pool.release(byteBuffer); + byteBuffer = null; + byteBuffer = pool.get(); + assertThat(byteBuffer.length).describedAs("Pool has to return an object " + + "immediately, if there are free buffers available in the pool.") + .isEqualTo(TWO_MB); + + // Again trying to get one byte buffer from the pool which will be + // blocked as the pool has already dished out the maximum number of byte + // arrays. Then we release another byte array and the blocked get call + // gets unblocked. + Callable callable = () -> pool.get(); + FutureTask futureTask = new FutureTask(callable); + getThread = new Thread(futureTask); + getThread.start(); + pool.release(new byte[TWO_MB]); + byteBuffer = (byte[]) futureTask.get(); + assertThat(byteBuffer.length).describedAs("The blocked get call unblocks " + + "when an object is released back to the pool.").isEqualTo(TWO_MB); + } + + @Test + public void testSynchronisation() throws Throwable { + final int expectedMaxBuffersInUse = + SYNC_TEST_MAX_FREE_BUFFER_COUNT + Runtime.getRuntime() + .availableProcessors(); + final AbfsByteBufferPool pool = new AbfsByteBufferPool(TWO_MB, + SYNC_TEST_MAX_FREE_BUFFER_COUNT, MEM_USAGE_PC_90); + + final int totalThreadCount = + SYNC_TEST_THREAD_CATEGORY_COUNT * SYNC_TEST_THREAD_COUNT_PER_CATEGORY; + + final LinkedBlockingQueue objsQueue = new LinkedBlockingQueue(); + final Waiter waiter = new Waiter(); + + // Creating 3 types of runnables + // getter: would make get calls + // releaser: would make release calls + // verifier: will be verifying the conditions + final Runnable getter = getNewGetterRunnable(pool, objsQueue, waiter, + expectedMaxBuffersInUse, SYNC_TEST_MAX_FREE_BUFFER_COUNT); + final Runnable releaser = getNewReleaserRunnable(pool, objsQueue, waiter, + expectedMaxBuffersInUse, SYNC_TEST_MAX_FREE_BUFFER_COUNT); + final Runnable verifier = getNewVerifierRunnable(pool, + expectedMaxBuffersInUse, SYNC_TEST_MAX_FREE_BUFFER_COUNT, waiter); + + final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors + .newFixedThreadPool(SYNC_TEST_THREAD_POOL_SIZE); + for (int i = 0; i < SYNC_TEST_THREAD_COUNT_PER_CATEGORY; i++) { + executor.submit(new Thread(getter)); + executor.submit(new Thread(releaser)); + executor.submit(new Thread(verifier)); + executor.submit(new Thread(verifier)); + } + waiter.await(SYNC_TEST_WAITER_WAIT_TIME_6000_MS, totalThreadCount); + executor.shutdown(); + } + + private Runnable getNewGetterRunnable(final AbfsByteBufferPool pool, + final LinkedBlockingQueue objsQueue, final Waiter waiter, + final int expectedMaxBuffersInUse, final int maxFreeBuffers) { + Runnable runnable = () -> { + for (int i = 0; i < SYNC_TEST_VERIFICATION_PER_CATEGORY; i++) { + verify(pool, waiter, maxFreeBuffers, expectedMaxBuffersInUse); + objsQueue.offer(pool.get()); + verify(pool, waiter, maxFreeBuffers, expectedMaxBuffersInUse); + try { + sleep(SYNC_TEST_NON_VERIFIER_SLEEP_TIME_100_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + waiter.resume(); + }; + return runnable; + } + + private Runnable getNewReleaserRunnable(final AbfsByteBufferPool pool, + final LinkedBlockingQueue objsQueue, final Waiter waiter, + final int expectedMaxBuffersInUse, final int maxFreeBuffers) { + Runnable runnable = () -> { + for (int i = 0; i < SYNC_TEST_VERIFICATION_PER_CATEGORY; i++) { + try { + verify(pool, waiter, maxFreeBuffers, expectedMaxBuffersInUse); + pool.release(objsQueue.take()); + verify(pool, waiter, maxFreeBuffers, expectedMaxBuffersInUse); + sleep(SYNC_TEST_NON_VERIFIER_SLEEP_TIME_100_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + waiter.resume(); + }; + return runnable; + } + + private Runnable getNewVerifierRunnable(final AbfsByteBufferPool pool, + final int expectedMaxBuffersInUse, final int maxFreeBuffers, + final Waiter waiter) { + Runnable runnable = () -> { + for (int i = 0; i < SYNC_TEST_VERIFICATION_PER_CATEGORY; i++) { + verify(pool, waiter, maxFreeBuffers, expectedMaxBuffersInUse); + try { + sleep(SYNC_TEST_VERIFIER_SLEEP_TIME_50_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + waiter.resume(); + }; + return runnable; + } + + private void verify(final AbfsByteBufferPool pool, final Waiter waiter, + final int maxFreeBuffers, final int expectedMaxBuffersInUse) { + waiter.assertThat(pool.getFreeBuffers(), is(notNullValue())); + waiter.assertThat(pool.getFreeBuffers().size(), + is(lessThanOrEqualTo(maxFreeBuffers))); + waiter.assertThat(pool.getBuffersInUse(), + is(lessThanOrEqualTo(expectedMaxBuffersInUse))); + waiter.assertThat(pool.getBuffersInUse(), is(greaterThanOrEqualTo(0))); + } + +}