diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index db357598e3cb6..f7808d0c75ab8 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -172,6 +172,12 @@
com.google.guava
guava
+
+ com.google.code.findbugs
+ annotations
+ 3.0.1
+ provided
+
org.eclipse.jetty
@@ -288,6 +294,12 @@
hamcrest-library
test
+
+ net.jodah
+ concurrentunit
+ 0.4.6
+ test
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 354176f5ff929..c5e65e5afd990 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -130,10 +130,20 @@ public class AbfsConfiguration{
DefaultValue = AZURE_BLOCK_LOCATION_HOST_DEFAULT)
private String azureBlockLocationHost;
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_SHOULD_USE_OLD_ABFSOUTPUTSTREAM,
+ DefaultValue = DEFAULT_AZURE_SHOULD_USE_OLD_ABFSOUTPUTSTREAM)
+ private boolean shouldUseOlderAbfsOutputStream;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_CONCURRENCY_FACTOR,
MinValue = 1,
- DefaultValue = MAX_CONCURRENT_WRITE_THREADS)
- private int maxConcurrentWriteThreads;
+ DefaultValue = DEFAULT_WRITE_CONCURRENCY_FACTOR)
+ private int writeConcurrencyFactor;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_WRITE_MEM_USAGE_PERCENTAGE,
+ MinValue = MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE,
+ MaxValue = MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE,
+ DefaultValue = DEFAULT_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE)
+ private int maxWriteMemoryUsagePercentage;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_LIST_MAX_RESULTS,
MinValue = 1,
@@ -464,8 +474,16 @@ public String getAzureBlockLocationHost() {
return this.azureBlockLocationHost;
}
- public int getMaxConcurrentWriteThreads() {
- return this.maxConcurrentWriteThreads;
+ public boolean shouldUseOlderAbfsOutputStream() {
+ return this.shouldUseOlderAbfsOutputStream;
+ }
+
+ public int getWriteConcurrencyFactor() {
+ return this.writeConcurrencyFactor;
+ }
+
+ public int getMaxWriteMemoryUsagePercentage() {
+ return this.maxWriteMemoryUsagePercentage;
}
public int getMaxConcurrentReadThreads() {
@@ -784,4 +802,10 @@ private String appendSlashIfNeeded(String authority) {
return authority;
}
+ @VisibleForTesting
+ void setShouldUseOlderAbfsOutputStream(
+ boolean shouldUseOlderAbfsOutputStream) {
+ this.shouldUseOlderAbfsOutputStream = shouldUseOlderAbfsOutputStream;
+ }
+
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 8e0e6c1eb0b84..e326da3441be4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -85,6 +85,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamOld;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
@@ -424,6 +425,15 @@ public OutputStream createFile(final Path path,
isNamespaceEnabled ? getOctalNotation(umask) : null);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
+ if (abfsConfiguration.shouldUseOlderAbfsOutputStream()) {
+ return new AbfsOutputStreamOld(
+ client,
+ statistics,
+ getRelativePath(path),
+ 0,
+ populateAbfsOutputStreamContext());
+ }
+
return new AbfsOutputStream(
client,
statistics,
@@ -439,6 +449,10 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
.enableFlush(abfsConfiguration.isFlushEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
+ .withWriteConcurrencyFactor(abfsConfiguration
+ .getWriteConcurrencyFactor())
+ .withMaxWriteMemoryUsagePercentage(abfsConfiguration
+ .getMaxWriteMemoryUsagePercentage())
.build();
}
@@ -530,6 +544,16 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
perfInfo.registerSuccess(true);
+
+ if (abfsConfiguration.shouldUseOlderAbfsOutputStream()) {
+ return new AbfsOutputStreamOld(
+ client,
+ statistics,
+ getRelativePath(path),
+ 0,
+ populateAbfsOutputStreamContext());
+ }
+
return new AbfsOutputStream(
client,
statistics,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 5794d32f468f4..2a6a86a1d8356 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -51,6 +51,9 @@ public final class ConfigurationKeys {
public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost";
public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
+ public static final String AZURE_SHOULD_USE_OLD_ABFSOUTPUTSTREAM = "fs.azure.use.olderabfsoutputstream";
+ public static final String AZURE_WRITE_CONCURRENCY_FACTOR = "fs.azure.write.concurrency.factor";
+ public static final String AZURE_MAX_WRITE_MEM_USAGE_PERCENTAGE = "fs.azure.max.write.memory.usage.percentage";
public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = "fs.azure.concurrentRequestCount.in";
public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append";
public static final String AZURE_LIST_MAX_RESULTS = "fs.azure.list.max.results";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 01d5202cc26c2..2176e93929a83 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -56,6 +56,11 @@ public final class FileSystemConfigurations {
public static final int MAX_CONCURRENT_READ_THREADS = 12;
public static final int MAX_CONCURRENT_WRITE_THREADS = 8;
+ public static final int DEFAULT_WRITE_CONCURRENCY_FACTOR = 4;
+ public static final boolean DEFAULT_AZURE_SHOULD_USE_OLD_ABFSOUTPUTSTREAM = false;
+ public static final int MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE = 20;
+ public static final int MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE = 90;
+ public static final int DEFAULT_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE = MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
public static final boolean DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = false;
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsByteBufferPool.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsByteBufferPool.java
new file mode 100644
index 0000000000000..ea8ba3c6c1ccc
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsByteBufferPool.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+import static java.lang.Math.ceil;
+import static java.lang.Math.min;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
+
+/**
+ * Pool for byte[]
+ */
+public class AbfsByteBufferPool {
+
+ private static final int HUNDRED = 100;
+
+ /**
+ * Queue holding the free buffers.
+ */
+ private ArrayBlockingQueue freeBuffers;
+ /**
+ * Count to track the buffers issued from AbfsByteBufferPool and yet to be
+ * returned.
+ */
+ private int numBuffersInUse;
+
+ private int bufferSize;
+
+ private int maxBuffersToPool;
+ private int maxMemUsagePercentage;
+
+ private static final Runtime RT = Runtime.getRuntime();
+ private static final int AVAILABLE_PROCESSORS = RT.availableProcessors();
+
+ private static int MAX_BUFFERS_THAT_CAN_BE_IN_USE;
+
+ /**
+ * @param bufferSize Size of the byte[] to be returned.
+ * @param queueCapacity Maximum number of buffers that the pool can
+ * keep within the pool.
+ * @param maxMemUsagePercentage Maximum percentage of memory that can
+ * be used by the pool from the max
+ * available memory.
+ */
+ public AbfsByteBufferPool(final int bufferSize, final int queueCapacity,
+ final int maxMemUsagePercentage) {
+ validate(queueCapacity, maxMemUsagePercentage);
+ this.maxMemUsagePercentage = maxMemUsagePercentage;
+ this.bufferSize = bufferSize;
+ this.numBuffersInUse = 0;
+ this.maxBuffersToPool = queueCapacity;
+ freeBuffers = new ArrayBlockingQueue<>(queueCapacity);
+ }
+
+ private void validate(final int queueCapacity,
+ final int maxWriteMemUsagePercentage) {
+ Preconditions.checkArgument(maxWriteMemUsagePercentage
+ >= MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE
+ && maxWriteMemUsagePercentage
+ <= MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE,
+ "maxWriteMemUsagePercentage should be in range (%s - %s)",
+ MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE,
+ MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE);
+ Preconditions
+ .checkArgument(queueCapacity > 0, "queueCapacity cannot be < 1");
+ }
+
+ private void setMaxBuffersThatCanBeInUse() {
+ double freeMemory = RT.maxMemory() - (RT.totalMemory() - RT.freeMemory());
+ int bufferCountByMemory = (int) ceil(
+ (freeMemory * maxMemUsagePercentage / HUNDRED) / bufferSize);
+ int bufferCountByMaxFreeBuffers = maxBuffersToPool + AVAILABLE_PROCESSORS;
+ MAX_BUFFERS_THAT_CAN_BE_IN_USE = min(bufferCountByMemory,
+ bufferCountByMaxFreeBuffers);
+ if (MAX_BUFFERS_THAT_CAN_BE_IN_USE < 2) {
+ MAX_BUFFERS_THAT_CAN_BE_IN_USE = 2;
+ }
+ }
+
+ private synchronized boolean isPossibleToIssueNewBuffer() {
+ setMaxBuffersThatCanBeInUse();
+ return numBuffersInUse < MAX_BUFFERS_THAT_CAN_BE_IN_USE;
+ }
+
+ /**
+ * @return byte[] from the pool if available otherwise new byte[] is returned.
+ * Waits if pool is empty and already maximum number of buffers are in use.
+ */
+ public byte[] get() {
+ byte[] byteArray = null;
+ synchronized (this) {
+ byteArray = freeBuffers.poll();
+ if (byteArray == null && isPossibleToIssueNewBuffer()) {
+ byteArray = new byte[bufferSize];
+ }
+ if (byteArray != null) {
+ numBuffersInUse++;
+ return byteArray;
+ }
+ }
+ try {
+ byteArray = freeBuffers.take();
+ synchronized (this) {
+ numBuffersInUse++;
+ return byteArray;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ }
+
+ /**
+ * @param byteArray The buffer to be offered back to the pool.
+ */
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
+ public synchronized void release(byte[] byteArray) {
+ Preconditions.checkArgument(byteArray.length == bufferSize,
+ "Buffer size has" + " to be %s (Received buffer length: %s)",
+ bufferSize, byteArray.length);
+ numBuffersInUse--;
+ if (numBuffersInUse < 0) {
+ numBuffersInUse = 0;
+ }
+ freeBuffers.offer(byteArray);
+ }
+
+ @VisibleForTesting
+ public synchronized int getBuffersInUse() {
+ return this.numBuffersInUse;
+ }
+
+ @VisibleForTesting
+ public synchronized ArrayBlockingQueue getFreeBuffers() {
+ return freeBuffers;
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 89afca4220251..68ed42e476556 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -20,14 +20,14 @@
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
-import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
@@ -41,7 +41,6 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
-import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
@@ -65,26 +64,25 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
- private final int bufferSize;
+ private static int bufferSize;
private byte[] buffer;
private int bufferIndex;
- private final int maxConcurrentRequestCount;
- private ConcurrentLinkedDeque writeOperations;
- private final ThreadPoolExecutor threadExecutor;
- private final ExecutorCompletionService completionService;
+ private final ConcurrentLinkedDeque writeOperations = new ConcurrentLinkedDeque<>();
+ private static ThreadPoolExecutor threadExecutor;
// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
/**
- * Queue storing buffers with the size of the Azure block ready for
+ * Pool storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
* blocks. After the data is sent to the service, the buffer is returned
* back to the queue
*/
- private final ElasticByteBufferPool byteBufferPool
- = new ElasticByteBufferPool();
+ private static AbfsByteBufferPool byteBufferPool;
+
+ private static volatile Boolean isFirstObj = true;
private final Statistics statistics;
private final AbfsOutputStreamStatistics outputStreamStatistics;
@@ -108,23 +106,64 @@ public AbfsOutputStream(
.isDisableOutputStreamFlush();
this.lastError = null;
this.lastFlushOffset = 0;
- this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
- this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0;
- this.writeOperations = new ConcurrentLinkedDeque<>();
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
-
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
-
- this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>());
- this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.cachedSasToken = new CachedSASToken(
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
+
+ init(abfsOutputStreamContext);
+ buffer = new byte[bufferSize];
+ }
+
+ private void init(final AbfsOutputStreamContext abfsOutputStreamContext) {
+ if (isCommonPoolsInitialised()) {
+ return;
+ }
+
+ initWriteBufferPool(abfsOutputStreamContext);
+
+ ThreadFactory daemonThreadFactory = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread daemonThread = Executors.defaultThreadFactory()
+ .newThread(runnable);
+ daemonThread.setDaemon(true);
+ return daemonThread;
+ }
+ };
+ int maxConcurrentThreadCount =
+ abfsOutputStreamContext.getWriteConcurrencyFactor() * Runtime.getRuntime()
+ .availableProcessors();
+ threadExecutor = new ThreadPoolExecutor(maxConcurrentThreadCount,
+ maxConcurrentThreadCount, 10L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(), daemonThreadFactory);
+ }
+
+ private boolean isCommonPoolsInitialised() {
+ if (threadExecutor != null) {
+ return true;
+ }
+
+ boolean isFirstObjTemp = isFirstObj;
+ if (isFirstObjTemp) {
+ synchronized (AbfsOutputStream.class) {
+ isFirstObjTemp = isFirstObj;
+ if (isFirstObjTemp) {
+ isFirstObj = false;
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private static synchronized void initWriteBufferPool(AbfsOutputStreamContext abfsosContext) {
+ bufferSize = abfsosContext.getWriteBufferSize();
+ int corePoolSize =
+ 1 + (abfsosContext.getWriteConcurrencyFactor() * Runtime.getRuntime()
+ .availableProcessors());
+ byteBufferPool = new AbfsByteBufferPool(bufferSize, corePoolSize,
+ abfsosContext.getMaxWriteMemoryUsagePercentage());
}
/**
@@ -275,7 +314,6 @@ public synchronized void close() throws IOException {
try {
flushInternal(true);
- threadExecutor.shutdown();
} catch (IOException e) {
// Problems surface in try-with-resources clauses if
// the exception thrown in a close == the one already thrown
@@ -288,9 +326,6 @@ public synchronized void close() throws IOException {
bufferIndex = 0;
closed = true;
writeOperations.clear();
- if (!threadExecutor.isShutdown()) {
- threadExecutor.shutdownNow();
- }
}
if (LOG.isDebugEnabled()) {
LOG.debug("Closing AbfsOutputStream ", toString());
@@ -318,18 +353,12 @@ private synchronized void writeCurrentBufferToService() throws IOException {
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
outputStreamStatistics.bytesToUpload(bytesLength);
- buffer = byteBufferPool.getBuffer(false, bufferSize).array();
+ buffer = byteBufferPool.get();
bufferIndex = 0;
final long offset = position;
position += bytesLength;
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
- long start = System.currentTimeMillis();
- waitForTaskToComplete();
- outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
- }
-
- final Future job = completionService.submit(new Callable() {
+ final Future job = threadExecutor.submit(new Callable() {
@Override
public Void call() throws Exception {
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
@@ -339,7 +368,7 @@ public Void call() throws Exception {
bytesLength, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult());
- byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
+ byteBufferPool.release(bytes);
perfInfo.registerSuccess(true);
return null;
}
@@ -429,22 +458,6 @@ private synchronized void shrinkWriteOperationQueue() throws IOException {
}
}
- private void waitForTaskToComplete() throws IOException {
- boolean completed;
- for (completed = false; completionService.poll() != null; completed = true) {
- // keep polling until there is no data
- }
-
- if (!completed) {
- try {
- completionService.take();
- } catch (InterruptedException e) {
- lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
- throw lastError;
- }
- }
- }
-
private static class WriteOperation {
private final Future task;
private final long startOffset;
@@ -462,8 +475,9 @@ private static class WriteOperation {
}
@VisibleForTesting
- public synchronized void waitForPendingUploads() throws IOException {
- waitForTaskToComplete();
+ public synchronized void waitForPendingUploads()
+ throws ExecutionException, InterruptedException {
+ writeOperations.getFirst().task.get();
}
/**
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index dcd6c45981734..b7dc4d794998c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -29,6 +29,10 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private boolean disableOutputStreamFlush;
+ private int writeConcurrencyFactor;
+
+ private int maxWriteMemoryUsagePercentage;
+
private AbfsOutputStreamStatistics streamStatistics;
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
@@ -58,6 +62,18 @@ public AbfsOutputStreamContext withStreamStatistics(
return this;
}
+ public AbfsOutputStreamContext withWriteConcurrencyFactor(
+ final int writeConcurrencyFactor) {
+ this.writeConcurrencyFactor = writeConcurrencyFactor;
+ return this;
+ }
+
+ public AbfsOutputStreamContext withMaxWriteMemoryUsagePercentage(
+ final int maxWriteMemoryUsagePercentage) {
+ this.maxWriteMemoryUsagePercentage = maxWriteMemoryUsagePercentage;
+ return this;
+ }
+
public AbfsOutputStreamContext build() {
// Validation of parameters to be done here.
return this;
@@ -75,6 +91,14 @@ public boolean isDisableOutputStreamFlush() {
return disableOutputStreamFlush;
}
+ public int getWriteConcurrencyFactor() {
+ return writeConcurrencyFactor;
+ }
+
+ public int getMaxWriteMemoryUsagePercentage() {
+ return maxWriteMemoryUsagePercentage;
+ }
+
public AbfsOutputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamOld.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamOld.java
new file mode 100644
index 0000000000000..86e9fb0ea5927
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamOld.java
@@ -0,0 +1,502 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.nio.ByteBuffer;
+import java.util.Locale;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
+
+import static org.apache.hadoop.io.IOUtils.wrapException;
+
+/**
+ * The BlobFsOutputStream for Rest AbfsClient.
+ */
+public class AbfsOutputStreamOld extends OutputStream implements Syncable, StreamCapabilities {
+
+ private final AbfsClient client;
+ private final String path;
+ private long position;
+ private boolean closed;
+ private boolean supportFlush;
+ private boolean disableOutputStreamFlush;
+ private volatile IOException lastError;
+
+ private long lastFlushOffset;
+ private long lastTotalAppendOffset = 0;
+
+ private final int bufferSize;
+ private byte[] buffer;
+ private int bufferIndex;
+ private final int maxConcurrentRequestCount;
+
+ private ConcurrentLinkedDeque writeOperations;
+ private final ThreadPoolExecutor threadExecutor;
+ private final ExecutorCompletionService completionService;
+
+ // SAS tokens can be re-used until they expire
+ private CachedSASToken cachedSasToken;
+
+ /**
+ * Queue storing buffers with the size of the Azure block ready for
+ * reuse. The pool allows reusing the blocks instead of allocating new
+ * blocks. After the data is sent to the service, the buffer is returned
+ * back to the queue
+ */
+ private final ElasticByteBufferPool byteBufferPool
+ = new ElasticByteBufferPool();
+
+ private final Statistics statistics;
+ private final AbfsOutputStreamStatistics outputStreamStatistics;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbfsOutputStream.class);
+
+ public AbfsOutputStreamOld(
+ final AbfsClient client,
+ final Statistics statistics,
+ final String path,
+ final long position,
+ AbfsOutputStreamContext abfsOutputStreamContext) {
+ this.client = client;
+ this.statistics = statistics;
+ this.path = path;
+ this.position = position;
+ this.closed = false;
+ this.supportFlush = abfsOutputStreamContext.isEnableFlush();
+ this.disableOutputStreamFlush = abfsOutputStreamContext
+ .isDisableOutputStreamFlush();
+ this.lastError = null;
+ this.lastFlushOffset = 0;
+ this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
+ this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
+ this.bufferIndex = 0;
+ this.writeOperations = new ConcurrentLinkedDeque<>();
+ this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
+
+ this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+
+ this.threadExecutor
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 10L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>());
+ this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
+ this.cachedSasToken = new CachedSASToken(
+ abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
+ }
+
+ /**
+ * Query the stream for a specific capability.
+ *
+ * @param capability string to query the stream support for.
+ * @return true for hsync and hflush.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
+ switch (capability.toLowerCase(Locale.ENGLISH)) {
+ case StreamCapabilities.HSYNC:
+ case StreamCapabilities.HFLUSH:
+ return supportFlush;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Writes the specified byte to this output stream. The general contract for
+ * write is that one byte is written to the output stream. The byte to be
+ * written is the eight low-order bits of the argument b. The 24 high-order
+ * bits of b are ignored.
+ *
+ * @param byteVal the byteValue to write.
+ * @throws IOException if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final int byteVal) throws IOException {
+ write(new byte[]{(byte) (byteVal & 0xFF)});
+ }
+
+ /**
+ * Writes length bytes from the specified byte array starting at off to
+ * this output stream.
+ *
+ * @param data the byte array to write.
+ * @param off the start off in the data.
+ * @param length the number of bytes to write.
+ * @throws IOException if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public synchronized void write(final byte[] data, final int off, final int length)
+ throws IOException {
+ maybeThrowLastError();
+
+ Preconditions.checkArgument(data != null, "null data");
+
+ if (off < 0 || length < 0 || length > data.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ int currentOffset = off;
+ int writableBytes = bufferSize - bufferIndex;
+ int numberOfBytesToWrite = length;
+
+ while (numberOfBytesToWrite > 0) {
+ if (writableBytes <= numberOfBytesToWrite) {
+ System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
+ bufferIndex += writableBytes;
+ writeCurrentBufferToService();
+ currentOffset += writableBytes;
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ } else {
+ System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
+ bufferIndex += numberOfBytesToWrite;
+ numberOfBytesToWrite = 0;
+ }
+
+ writableBytes = bufferSize - bufferIndex;
+ }
+ incrementWriteOps();
+ }
+
+ /**
+ * Increment Write Operations.
+ */
+ private void incrementWriteOps() {
+ if (statistics != null) {
+ statistics.incrementWriteOps(1);
+ }
+ }
+
+ /**
+ * Throw the last error recorded if not null.
+ * After the stream is closed, this is always set to
+ * an exception, so acts as a guard against method invocation once
+ * closed.
+ * @throws IOException if lastError is set
+ */
+ private void maybeThrowLastError() throws IOException {
+ if (lastError != null) {
+ throw lastError;
+ }
+ }
+
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be
+ * written out. If any data remains in the payload it is committed to the
+ * service. Data is queued for writing and forced out to the service
+ * before the call returns.
+ */
+ @Override
+ public void flush() throws IOException {
+ if (!disableOutputStreamFlush) {
+ flushInternalAsync();
+ }
+ }
+
+ /** Similar to posix fsync, flush out the data in client's user buffer
+ * all the way to the disk device (but the disk may have it in its cache).
+ * @throws IOException if error occurs
+ */
+ @Override
+ public void hsync() throws IOException {
+ if (supportFlush) {
+ flushInternal(false);
+ }
+ }
+
+ /** Flush out the data in client's user buffer. After the return of
+ * this call, new readers will see the data.
+ * @throws IOException if any error occurs
+ */
+ @Override
+ public void hflush() throws IOException {
+ if (supportFlush) {
+ flushInternal(false);
+ }
+ }
+
+ /**
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete. Close the access to the stream and
+ * shutdown the upload thread pool.
+ * If the blob was created, its lease will be released.
+ * Any error encountered caught in threads and stored will be rethrown here
+ * after cleanup.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ try {
+ flushInternal(true);
+ threadExecutor.shutdown();
+ } catch (IOException e) {
+ // Problems surface in try-with-resources clauses if
+ // the exception thrown in a close == the one already thrown
+ // -so we wrap any exception with a new one.
+ // See HADOOP-16785
+ throw wrapException(path, e.getMessage(), e);
+ } finally {
+ lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ buffer = null;
+ bufferIndex = 0;
+ closed = true;
+ writeOperations.clear();
+ if (!threadExecutor.isShutdown()) {
+ threadExecutor.shutdownNow();
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing AbfsOutputStream ", toString());
+ }
+ }
+
+ private synchronized void flushInternal(boolean isClose) throws IOException {
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToService(isClose);
+ }
+
+ private synchronized void flushInternalAsync() throws IOException {
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToServiceAsync();
+ }
+
+ private synchronized void writeCurrentBufferToService() throws IOException {
+ if (bufferIndex == 0) {
+ return;
+ }
+ outputStreamStatistics.writeCurrentBuffer();
+
+ final byte[] bytes = buffer;
+ final int bytesLength = bufferIndex;
+ outputStreamStatistics.bytesToUpload(bytesLength);
+ buffer = byteBufferPool.getBuffer(false, bufferSize).array();
+ bufferIndex = 0;
+ final long offset = position;
+ position += bytesLength;
+
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ long start = System.currentTimeMillis();
+ waitForTaskToComplete();
+ outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
+ }
+
+ final Future job = completionService.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ AbfsPerfTracker tracker = client.getAbfsPerfTracker();
+ try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
+ "writeCurrentBufferToService", "append")) {
+ AbfsRestOperation op = client.append(path, offset, bytes, 0,
+ bytesLength, cachedSasToken.get());
+ cachedSasToken.update(op.getSasToken());
+ perfInfo.registerResult(op.getResult());
+ byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
+ perfInfo.registerSuccess(true);
+ return null;
+ }
+ }
+ });
+
+ if (job.isCancelled()) {
+ outputStreamStatistics.uploadFailed(bytesLength);
+ } else {
+ outputStreamStatistics.uploadSuccessful(bytesLength);
+ }
+ writeOperations.add(new WriteOperation(job, offset, bytesLength));
+
+ // Try to shrink the queue
+ shrinkWriteOperationQueue();
+ }
+
+ private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
+ for (WriteOperation writeOperation : writeOperations) {
+ try {
+ writeOperation.task.get();
+ } catch (Exception ex) {
+ if (ex.getCause() instanceof AbfsRestOperationException) {
+ if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+ throw new FileNotFoundException(ex.getMessage());
+ }
+ }
+
+ if (ex.getCause() instanceof AzureBlobFileSystemException) {
+ ex = (AzureBlobFileSystemException) ex.getCause();
+ }
+ lastError = new IOException(ex);
+ throw lastError;
+ }
+ }
+ flushWrittenBytesToServiceInternal(position, false, isClose);
+ }
+
+ private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
+ shrinkWriteOperationQueue();
+
+ if (this.lastTotalAppendOffset > this.lastFlushOffset) {
+ this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
+ false/*Async flush on close not permitted*/);
+ }
+ }
+
+ private synchronized void flushWrittenBytesToServiceInternal(final long offset,
+ final boolean retainUncommitedData, final boolean isClose) throws IOException {
+ AbfsPerfTracker tracker = client.getAbfsPerfTracker();
+ try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
+ "flushWrittenBytesToServiceInternal", "flush")) {
+ AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get());
+ cachedSasToken.update(op.getSasToken());
+ perfInfo.registerResult(op.getResult()).registerSuccess(true);
+ } catch (AzureBlobFileSystemException ex) {
+ if (ex instanceof AbfsRestOperationException) {
+ if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+ throw new FileNotFoundException(ex.getMessage());
+ }
+ }
+ throw new IOException(ex);
+ }
+ this.lastFlushOffset = offset;
+ }
+
+ /**
+ * Try to remove the completed write operations from the beginning of write
+ * operation FIFO queue.
+ */
+ private synchronized void shrinkWriteOperationQueue() throws IOException {
+ try {
+ while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
+ writeOperations.peek().task.get();
+ lastTotalAppendOffset += writeOperations.peek().length;
+ writeOperations.remove();
+ // Incrementing statistics to indicate queue has been shrunk.
+ outputStreamStatistics.queueShrunk();
+ }
+ } catch (Exception e) {
+ if (e.getCause() instanceof AzureBlobFileSystemException) {
+ lastError = (AzureBlobFileSystemException) e.getCause();
+ } else {
+ lastError = new IOException(e);
+ }
+ throw lastError;
+ }
+ }
+
+ private void waitForTaskToComplete() throws IOException {
+ boolean completed;
+ for (completed = false; completionService.poll() != null; completed = true) {
+ // keep polling until there is no data
+ }
+
+ if (!completed) {
+ try {
+ completionService.take();
+ } catch (InterruptedException e) {
+ lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
+ throw lastError;
+ }
+ }
+ }
+
+ private static class WriteOperation {
+ private final Future task;
+ private final long startOffset;
+ private final long length;
+
+ WriteOperation(final Future task, final long startOffset, final long length) {
+ Preconditions.checkNotNull(task, "task");
+ Preconditions.checkArgument(startOffset >= 0, "startOffset");
+ Preconditions.checkArgument(length >= 0, "length");
+
+ this.task = task;
+ this.startOffset = startOffset;
+ this.length = length;
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized void waitForPendingUploads() throws IOException {
+ waitForTaskToComplete();
+ }
+
+ /**
+ * Getter method for AbfsOutputStream statistics.
+ *
+ * @return statistics for AbfsOutputStream.
+ */
+ @VisibleForTesting
+ public AbfsOutputStreamStatistics getOutputStreamStatistics() {
+ return outputStreamStatistics;
+ }
+
+ /**
+ * Getter to get the size of the task queue.
+ *
+ * @return the number of writeOperations in AbfsOutputStream.
+ */
+ @VisibleForTesting
+ public int getWriteOperationsSize() {
+ return writeOperations.size();
+ }
+
+ /**
+ * Appending AbfsOutputStream statistics to base toString().
+ *
+ * @return String with AbfsOutputStream statistics.
+ */
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(super.toString());
+ sb.append("AbfsOuputStream@").append(this.hashCode()).append("){");
+ sb.append(outputStreamStatistics.toString());
+ sb.append("}");
+ return sb.toString();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
index 60f7f7d23f02a..54988a51ea2ca 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
@@ -24,6 +24,7 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -31,6 +32,8 @@
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamOld;
+import org.assertj.core.api.Assertions;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsNot;
import org.junit.Test;
@@ -41,6 +44,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsEqual.equalTo;
+
/**
* Test flush operation.
* This class cannot be run in parallel test mode--check comments in
@@ -53,7 +59,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
private static final int ONE_MB = 1024 * 1024;
private static final int FLUSH_TIMES = 200;
private static final int THREAD_SLEEP_TIME = 1000;
-
+ private static final int CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT = 25;
private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
private static final int WAITING_TIME = 1000;
@@ -207,6 +213,92 @@ public Void call() throws Exception {
assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
}
+ @Test
+ public void testShouldUseOlderAbfsOutputStreamConf() throws IOException {
+ AzureBlobFileSystem fs = getFileSystem();
+ Path testPath = new Path(methodName.getMethodName() + "1");
+ getFileSystem().getAbfsStore().getAbfsConfiguration()
+ .setShouldUseOlderAbfsOutputStream(true);
+ try (FSDataOutputStream stream = fs.create(testPath)) {
+ Assertions.assertThat(stream.getWrappedStream()).describedAs("When the "
+ + "shouldUseOlderAbfsOutputStream is set the wrapped stream inside "
+ + "the FSDataOutputStream object should be of class "
+ + "AbfsOutputStreamOld.").isInstanceOf(AbfsOutputStreamOld.class);
+ }
+ testPath = new Path(methodName.getMethodName());
+ getFileSystem().getAbfsStore().getAbfsConfiguration()
+ .setShouldUseOlderAbfsOutputStream(false);
+ try (FSDataOutputStream stream = fs.create(testPath)) {
+ Assertions.assertThat(stream.getWrappedStream()).describedAs("When the "
+ + "shouldUseOlderAbfsOutputStream is set the wrapped stream inside "
+ + "the FSDataOutputStream object should be of class "
+ + "AbfsOutputStream.").isInstanceOf(AbfsOutputStream.class);
+ }
+ }
+
+ @Test
+ public void testWriteWithMultipleOutputStreamAtTheSameTime()
+ throws IOException, InterruptedException, ExecutionException {
+ AzureBlobFileSystem fs = getFileSystem();
+ String testFilePath = methodName.getMethodName();
+ Path[] testPaths = new Path[CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT];
+ createNStreamsAndWriteDifferentSizesConcurrently(fs, testFilePath,
+ CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT, testPaths);
+ assertSuccessfulWritesOnAllStreams(fs,
+ CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT, testPaths);
+ }
+
+ private void assertSuccessfulWritesOnAllStreams(final FileSystem fs,
+ final int numConcurrentObjects, final Path[] testPaths)
+ throws IOException {
+ for (int i = 0; i < numConcurrentObjects; i++) {
+ FileStatus fileStatus = fs.getFileStatus(testPaths[i]);
+ int numWritesMadeOnStream = i + 1;
+ long expectedLength = TEST_BUFFER_SIZE * numWritesMadeOnStream;
+ assertThat(fileStatus.getLen(), is(equalTo(expectedLength)));
+ }
+ }
+
+ private void createNStreamsAndWriteDifferentSizesConcurrently(
+ final FileSystem fs, final String testFilePath,
+ final int numConcurrentObjects, final Path[] testPaths)
+ throws ExecutionException, InterruptedException {
+ final byte[] b = new byte[TEST_BUFFER_SIZE];
+ new Random().nextBytes(b);
+ final ExecutorService es = Executors.newFixedThreadPool(40);
+ final List> futureTasks = new ArrayList<>();
+ for (int i = 0; i < numConcurrentObjects; i++) {
+ Path testPath = new Path(testFilePath + i);
+ testPaths[i] = testPath;
+ int numWritesToBeDone = i + 1;
+ futureTasks.add(es.submit(() -> {
+ try (FSDataOutputStream stream = fs.create(testPath)) {
+ makeNWritesToStream(stream, numWritesToBeDone, b, es);
+ }
+ return null;
+ }));
+ }
+ for (Future futureTask : futureTasks) {
+ futureTask.get();
+ }
+ es.shutdownNow();
+ }
+
+ private void makeNWritesToStream(final FSDataOutputStream stream,
+ final int numWrites, final byte[] b, final ExecutorService es)
+ throws ExecutionException, InterruptedException, IOException {
+ final List> futureTasks = new ArrayList<>();
+ for (int i = 0; i < numWrites; i++) {
+ futureTasks.add(es.submit(() -> {
+ stream.write(b);
+ return null;
+ }));
+ }
+ for (Future futureTask : futureTasks) {
+ futureTask.get();
+ }
+ }
+
@Test
public void testFlushWithOutputStreamFlushEnabled() throws Exception {
testFlush(false);
@@ -236,10 +328,15 @@ private void testFlush(boolean disableOutputStreamFlush) throws Exception {
stream.write(buffer);
// Write asynchronously uploads data, so we must wait for completion
- AbfsOutputStream abfsStream = (AbfsOutputStream) stream
- .getWrappedStream();
- abfsStream.waitForPendingUploads();
-
+ if (stream.getWrappedStream() instanceof AbfsOutputStream) {
+ AbfsOutputStream abfsStream = (AbfsOutputStream) stream
+ .getWrappedStream();
+ abfsStream.waitForPendingUploads();
+ } else {
+ AbfsOutputStreamOld abfsStream = (AbfsOutputStreamOld) stream
+ .getWrappedStream();
+ abfsStream.waitForPendingUploads();
+ }
// Flush commits the data so it can be read.
stream.flush();
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsByteBufferPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsByteBufferPool.java
new file mode 100644
index 0000000000000..728ab7080312a
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsByteBufferPool.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.lang.Thread.State;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import net.jodah.concurrentunit.Waiter;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.core.Is.is;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test class for AbfsByteBufferPool.
+ */
+public class TestAbfsByteBufferPool {
+
+ private static final int TWO_MB = 2 * 1024 * 1024;
+
+ private static final int MINUS_HUNDRED = -100;
+ private static final int HUNDRED_AND_ONE = 101;
+
+ private static final int MEM_USAGE_PC_20 = 20;
+ private static final int MEM_USAGE_PC_25 = 25;
+ private static final int MEM_USAGE_PC_30 = 30;
+ private static final int MEM_USAGE_PC_90 = 90;
+
+ private static final int SYNC_TEST_THREAD_CATEGORY_COUNT = 4;
+ private static final int SYNC_TEST_THREAD_COUNT_PER_CATEGORY = 4;
+ private static final int SYNC_TEST_VERIFICATION_PER_CATEGORY = 25;
+ private static final int SYNC_TEST_THREAD_POOL_SIZE = 50;
+ private static final int SYNC_TEST_MAX_FREE_BUFFER_COUNT = 4;
+ private static final long SYNC_TEST_WAITER_WAIT_TIME_6000_MS = 6000;
+ private static final long SYNC_TEST_VERIFIER_SLEEP_TIME_50_MS = 50;
+ private static final long SYNC_TEST_NON_VERIFIER_SLEEP_TIME_100_MS = 100;
+
+ private static final int BUFFER_SIZE_2 = 2;
+
+ private static final int QUEUE_CAPACITY_2 = 2;
+ private static final int QUEUE_CAPACITY_3 = 3;
+ private static final int QUEUE_CAPACITY_5 = 5;
+
+ private static final int MAX_FREE_BUFFERS_4 = 4;
+
+ private static final long SLEEP_TIME_5000_MS = 5000;
+
+ @Test
+ public void testWithInvalidMaxWriteMemUsagePercentage() throws Exception {
+ List invalidMaxWriteMemUsagePercentageList = Arrays
+ .asList(MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE - 1,
+ MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE + 1, MINUS_HUNDRED,
+ HUNDRED_AND_ONE);
+ for (int val : invalidMaxWriteMemUsagePercentageList) {
+ intercept(IllegalArgumentException.class, String
+ .format("maxWriteMemUsagePercentage should be in range (%s - %s)",
+ MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE,
+ MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE),
+ () -> new AbfsByteBufferPool(TWO_MB, QUEUE_CAPACITY_2, val));
+ }
+ }
+
+ @Test
+ public void testWithInvalidMaxFreeBuffers() throws Exception {
+ List invalidMaxFreeBuffers = Arrays.asList(0, -1);
+ for (int val : invalidMaxFreeBuffers) {
+ intercept(IllegalArgumentException.class, String
+ .format("queueCapacity cannot be < 1",
+ MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE,
+ MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE),
+ () -> new AbfsByteBufferPool(TWO_MB, val, MEM_USAGE_PC_20));
+ }
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testReleaseNull() {
+ AbfsByteBufferPool pool = new AbfsByteBufferPool(TWO_MB, QUEUE_CAPACITY_5,
+ MEM_USAGE_PC_30);
+ pool.release(null);
+ }
+
+ @Test
+ public void testReleaseMoreThanPoolCapacity() {
+ AbfsByteBufferPool pool = new AbfsByteBufferPool(TWO_MB, MAX_FREE_BUFFERS_4,
+ MEM_USAGE_PC_25);
+ int expectedPoolCapacity = MAX_FREE_BUFFERS_4;
+ for (int i = 0; i < expectedPoolCapacity * 2; i++) {
+ pool.release(new byte[TWO_MB]);
+ assertThat(pool.getFreeBuffers()).describedAs(
+ "Pool size should never exceed the expected capacity irrespective "
+ + "of the number of objects released to the pool")
+ .hasSizeLessThanOrEqualTo(expectedPoolCapacity);
+ }
+ }
+
+ @Test
+ public void testReleaseWithSameBufferSize() {
+ AbfsByteBufferPool pool = new AbfsByteBufferPool(BUFFER_SIZE_2,
+ QUEUE_CAPACITY_3, MEM_USAGE_PC_25);
+ pool.release(new byte[BUFFER_SIZE_2]);
+ }
+
+ @Test
+ public void testReleaseWithDifferentBufferSize() throws Exception {
+ AbfsByteBufferPool pool = new AbfsByteBufferPool(TWO_MB, QUEUE_CAPACITY_3,
+ MEM_USAGE_PC_25);
+ for (int i = 1; i < 2; i++) {
+ int finalI = i;
+ intercept(IllegalArgumentException.class,
+ String.format("Buffer size has to be %s", TWO_MB),
+ () -> pool.release(new byte[TWO_MB + finalI]));
+ intercept(IllegalArgumentException.class,
+ String.format("Buffer size has to be %s", TWO_MB),
+ () -> pool.release(new byte[TWO_MB - finalI]));
+ }
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ int expectedMaxBuffersInUse =
+ MAX_FREE_BUFFERS_4 + Runtime.getRuntime().availableProcessors();
+ AbfsByteBufferPool pool = new AbfsByteBufferPool(TWO_MB, MAX_FREE_BUFFERS_4,
+ MEM_USAGE_PC_90);
+
+ // Getting the maximum number of byte arrays from the pool
+ byte[] byteBuffer = null;
+ for (int i = 0; i < expectedMaxBuffersInUse; i++) {
+ byteBuffer = pool.get();
+ assertThat(byteBuffer.length).describedAs("Pool has to return an object "
+ + "immediately, until maximum buffers are in use.").isEqualTo(TWO_MB);
+ }
+
+ // Already maximum number of buffers are retrieved from the pool so the
+ // next get call is going to be blocked
+ Thread getThread = new Thread(() -> pool.get());
+ getThread.start();
+ sleep(SLEEP_TIME_5000_MS);
+ assertThat(getThread.getState()).describedAs("When maximum number of "
+ + "buffers are in use and no free buffers available in the pool the "
+ + "get call is blocked until an object is released to the pool.")
+ .isEqualTo(State.WAITING);
+ getThread.interrupt();
+
+ // Releasing one byte array back to the pool post which the get call we
+ // are making should not be blocking
+ pool.release(byteBuffer);
+ byteBuffer = null;
+ byteBuffer = pool.get();
+ assertThat(byteBuffer.length).describedAs("Pool has to return an object "
+ + "immediately, if there are free buffers available in the pool.")
+ .isEqualTo(TWO_MB);
+
+ // Again trying to get one byte buffer from the pool which will be
+ // blocked as the pool has already dished out the maximum number of byte
+ // arrays. Then we release another byte array and the blocked get call
+ // gets unblocked.
+ Callable callable = () -> pool.get();
+ FutureTask futureTask = new FutureTask(callable);
+ getThread = new Thread(futureTask);
+ getThread.start();
+ pool.release(new byte[TWO_MB]);
+ byteBuffer = (byte[]) futureTask.get();
+ assertThat(byteBuffer.length).describedAs("The blocked get call unblocks "
+ + "when an object is released back to the pool.").isEqualTo(TWO_MB);
+ }
+
+ @Test
+ public void testSynchronisation() throws Throwable {
+ final int expectedMaxBuffersInUse =
+ SYNC_TEST_MAX_FREE_BUFFER_COUNT + Runtime.getRuntime()
+ .availableProcessors();
+ final AbfsByteBufferPool pool = new AbfsByteBufferPool(TWO_MB,
+ SYNC_TEST_MAX_FREE_BUFFER_COUNT, MEM_USAGE_PC_90);
+
+ final int totalThreadCount =
+ SYNC_TEST_THREAD_CATEGORY_COUNT * SYNC_TEST_THREAD_COUNT_PER_CATEGORY;
+
+ final LinkedBlockingQueue objsQueue = new LinkedBlockingQueue();
+ final Waiter waiter = new Waiter();
+
+ // Creating 3 types of runnables
+ // getter: would make get calls
+ // releaser: would make release calls
+ // verifier: will be verifying the conditions
+ final Runnable getter = getNewGetterRunnable(pool, objsQueue, waiter,
+ expectedMaxBuffersInUse, SYNC_TEST_MAX_FREE_BUFFER_COUNT);
+ final Runnable releaser = getNewReleaserRunnable(pool, objsQueue, waiter,
+ expectedMaxBuffersInUse, SYNC_TEST_MAX_FREE_BUFFER_COUNT);
+ final Runnable verifier = getNewVerifierRunnable(pool,
+ expectedMaxBuffersInUse, SYNC_TEST_MAX_FREE_BUFFER_COUNT, waiter);
+
+ final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
+ .newFixedThreadPool(SYNC_TEST_THREAD_POOL_SIZE);
+ for (int i = 0; i < SYNC_TEST_THREAD_COUNT_PER_CATEGORY; i++) {
+ executor.submit(new Thread(getter));
+ executor.submit(new Thread(releaser));
+ executor.submit(new Thread(verifier));
+ executor.submit(new Thread(verifier));
+ }
+ waiter.await(SYNC_TEST_WAITER_WAIT_TIME_6000_MS, totalThreadCount);
+ executor.shutdown();
+ }
+
+ private Runnable getNewGetterRunnable(final AbfsByteBufferPool pool,
+ final LinkedBlockingQueue objsQueue, final Waiter waiter,
+ final int expectedMaxBuffersInUse, final int maxFreeBuffers) {
+ Runnable runnable = () -> {
+ for (int i = 0; i < SYNC_TEST_VERIFICATION_PER_CATEGORY; i++) {
+ verify(pool, waiter, maxFreeBuffers, expectedMaxBuffersInUse);
+ objsQueue.offer(pool.get());
+ verify(pool, waiter, maxFreeBuffers, expectedMaxBuffersInUse);
+ try {
+ sleep(SYNC_TEST_NON_VERIFIER_SLEEP_TIME_100_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ waiter.resume();
+ };
+ return runnable;
+ }
+
+ private Runnable getNewReleaserRunnable(final AbfsByteBufferPool pool,
+ final LinkedBlockingQueue objsQueue, final Waiter waiter,
+ final int expectedMaxBuffersInUse, final int maxFreeBuffers) {
+ Runnable runnable = () -> {
+ for (int i = 0; i < SYNC_TEST_VERIFICATION_PER_CATEGORY; i++) {
+ try {
+ verify(pool, waiter, maxFreeBuffers, expectedMaxBuffersInUse);
+ pool.release(objsQueue.take());
+ verify(pool, waiter, maxFreeBuffers, expectedMaxBuffersInUse);
+ sleep(SYNC_TEST_NON_VERIFIER_SLEEP_TIME_100_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ waiter.resume();
+ };
+ return runnable;
+ }
+
+ private Runnable getNewVerifierRunnable(final AbfsByteBufferPool pool,
+ final int expectedMaxBuffersInUse, final int maxFreeBuffers,
+ final Waiter waiter) {
+ Runnable runnable = () -> {
+ for (int i = 0; i < SYNC_TEST_VERIFICATION_PER_CATEGORY; i++) {
+ verify(pool, waiter, maxFreeBuffers, expectedMaxBuffersInUse);
+ try {
+ sleep(SYNC_TEST_VERIFIER_SLEEP_TIME_50_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ waiter.resume();
+ };
+ return runnable;
+ }
+
+ private void verify(final AbfsByteBufferPool pool, final Waiter waiter,
+ final int maxFreeBuffers, final int expectedMaxBuffersInUse) {
+ waiter.assertThat(pool.getFreeBuffers(), is(notNullValue()));
+ waiter.assertThat(pool.getFreeBuffers().size(),
+ is(lessThanOrEqualTo(maxFreeBuffers)));
+ waiter.assertThat(pool.getBuffersInUse(),
+ is(lessThanOrEqualTo(expectedMaxBuffersInUse)));
+ waiter.assertThat(pool.getBuffersInUse(), is(greaterThanOrEqualTo(0)));
+ }
+
+}