diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 71da8f9bda96e..4f903e587fff7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -375,6 +375,9 @@ public void close() throws IOException { if (abfsApacheHttpClient != null) { abfsApacheHttpClient.close(); } + if (intercept != null) { + IOUtils.cleanupWithLogger(LOG, intercept); + } if (tokenProvider instanceof Closeable) { IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java index f1eb3a2a77476..e830a758ddf09 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java @@ -18,6 +18,9 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.Closeable; +import java.io.IOException; + import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,7 +37,7 @@ import static org.apache.hadoop.util.Time.now; -class AbfsClientThrottlingAnalyzer { +class AbfsClientThrottlingAnalyzer implements Closeable { private static final Logger LOG = LoggerFactory.getLogger( AbfsClientThrottlingAnalyzer.class); private static final int MIN_ANALYSIS_PERIOD_MS = 1000; @@ -172,6 +175,21 @@ public boolean suspendIfNecessary() { return false; } + /** + * Closes the throttling analyzer and releases associated resources. + * This method cancels the internal timer and cleans up any pending timer tasks. + * It is safe to call this method multiple times. + * @throws IOException if an I/O error occurs during cleanup + */ +@Override +public void close() throws IOException { + if (timer != null) { + timer.cancel(); + timer.purge(); + timer = null; + } +} + @VisibleForTesting int getSleepDuration() { return sleepDuration; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java index 39aaf34db0d57..da39231f55dd3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; import java.net.HttpURLConnection; import java.util.concurrent.locks.ReentrantLock; @@ -223,4 +224,20 @@ private static long getContentLengthIfKnown(String range) { } return contentLength; } + + /** + * Closes the throttling intercept and releases associated resources. + * This method closes both the read and write throttling analyzers. + * + * @throws IOException if an I/O error occurs during cleanup + */ + @Override + public void close() throws IOException { + if (readThrottler != null) { + readThrottler.close(); + } + if (writeThrottler != null) { + writeThrottler.close(); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java index 58e50592997dc..ef6c74cef0d3d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; + /** * Implementation of {@link AbfsThrottlingIntercept} that does not throttle * the ABFS process. @@ -40,4 +42,14 @@ public void updateMetrics(final AbfsRestOperationType operationType, public void sendingRequest(final AbfsRestOperationType operationType, final AbfsCounters abfsCounters) { } + + /** + * No-op implementation of close method. + * + * @throws IOException if an I/O error occurs during cleanup + */ + @Override + public void close() throws IOException { + // No resources to clean up in no-op implementation + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java index 725377714642b..5d516a41b7acc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.Closeable; +import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -26,7 +28,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Unstable -public interface AbfsThrottlingIntercept { +public interface AbfsThrottlingIntercept extends Closeable { /** * Updates the metrics for successful and failed read and write operations. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java index 69dc0a607cbf2..69e6a587935d4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java @@ -20,6 +20,10 @@ import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.contract.ContractTestUtils; @@ -180,4 +184,103 @@ public void testManySuccessAndErrorsAndWaiting() { sleep(10 * ANALYSIS_PERIOD); validate(0, analyzer.getSleepDuration()); } + + /** + * Test that timer thread is properly cleaned up when analyzer is closed. + * This validates the fix for HADOOP-19624. + */ + @Test + public void testAnalyzerTimerCleanup() throws Exception { + int initialTimerThreads = countAbfsTimerThreads(); + + // Create analyzer - should create one timer thread + AbfsClientThrottlingAnalyzer analyzer = + new AbfsClientThrottlingAnalyzer("test-cleanup", abfsConfiguration); + + // Verify timer thread was created + assertEquals(initialTimerThreads + 1, countAbfsTimerThreads(), + "Timer thread should be created"); + + // Close analyzer - should clean up timer thread + analyzer.close(); + + // Wait for cleanup to complete + sleep(1000); + + // Verify timer thread was cleaned up + assertEquals(initialTimerThreads, countAbfsTimerThreads(), + "Timer thread should be cleaned up after close"); + } + + /** + * Test that close() is idempotent and can be called multiple times. + */ + @Test + public void testAnalyzerCloseIdempotent() throws Exception { + AbfsClientThrottlingAnalyzer analyzer = + new AbfsClientThrottlingAnalyzer("test-idempotent", abfsConfiguration); + + int beforeClose = countAbfsTimerThreads(); + + // Close multiple times - should not throw exceptions + analyzer.close(); + analyzer.close(); + analyzer.close(); + + sleep(500); + + // Should only clean up once + assertTrue(countAbfsTimerThreads() < beforeClose, + "Multiple close() calls should be safe"); + } + + /** + * Test cleanup with multiple analyzers to ensure no interference. + */ + @Test + public void testMultipleAnalyzersCleanup() throws Exception { + int initialTimerThreads = countAbfsTimerThreads(); + + // Create multiple analyzers + AbfsClientThrottlingAnalyzer analyzer1 = + new AbfsClientThrottlingAnalyzer("test-multi-1", abfsConfiguration); + AbfsClientThrottlingAnalyzer analyzer2 = + new AbfsClientThrottlingAnalyzer("test-multi-2", abfsConfiguration); + AbfsClientThrottlingAnalyzer analyzer3 = + new AbfsClientThrottlingAnalyzer("test-multi-3", abfsConfiguration); + + // Should have created 3 timer threads + assertEquals(initialTimerThreads + 3, countAbfsTimerThreads(), + "Should create 3 timer threads"); + + // Close all analyzers + analyzer1.close(); + analyzer2.close(); + analyzer3.close(); + + sleep(1000); + + // All timer threads should be cleaned up + assertEquals(initialTimerThreads, countAbfsTimerThreads(), + "All timer threads should be cleaned up"); + } + + /** + * Helper method to count ABFS timer threads. + */ + private int countAbfsTimerThreads() { + java.lang.management.ThreadMXBean threadBean = + java.lang.management.ManagementFactory.getThreadMXBean(); + long[] threadIds = threadBean.getAllThreadIds(); + + int count = 0; + for (long id : threadIds) { + java.lang.management.ThreadInfo info = threadBean.getThreadInfo(id); + if (info != null && + info.getThreadName().contains("abfs-timer-client-throttling-analyzer")) { + count++; + } + } + return count; + } } \ No newline at end of file