Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ public void close() throws IOException {
if (abfsApacheHttpClient != null) {
abfsApacheHttpClient.close();
}
if (intercept != null) {
IOUtils.cleanupWithLogger(LOG, intercept);
}
if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG,
(Closeable) tokenProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.Closeable;
import java.io.IOException;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -34,7 +37,7 @@

import static org.apache.hadoop.util.Time.now;

class AbfsClientThrottlingAnalyzer {
class AbfsClientThrottlingAnalyzer implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingAnalyzer.class);
private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
Expand Down Expand Up @@ -172,6 +175,21 @@ public boolean suspendIfNecessary() {
return false;
}

/**
* Closes the throttling analyzer and releases associated resources.
* This method cancels the internal timer and cleans up any pending timer tasks.
* It is safe to call this method multiple times.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix javadoc here a well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

* @throws IOException if an I/O error occurs during cleanup
*/
@Override
public void close() throws IOException {
if (timer != null) {
timer.cancel();
timer.purge();
timer = null;
}
}

@VisibleForTesting
int getSleepDuration() {
return sleepDuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -223,4 +224,20 @@ private static long getContentLengthIfKnown(String range) {
}
return contentLength;
}

/**
* Closes the throttling intercept and releases associated resources.
* This method closes both the read and write throttling analyzers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Javadoc to include @ throws

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

*
* @throws IOException if an I/O error occurs during cleanup
*/
@Override
public void close() throws IOException {
if (readThrottler != null) {
readThrottler.close();
}
if (writeThrottler != null) {
writeThrottler.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;

/**
* Implementation of {@link AbfsThrottlingIntercept} that does not throttle
* the ABFS process.
Expand All @@ -40,4 +42,14 @@ public void updateMetrics(final AbfsRestOperationType operationType,
public void sendingRequest(final AbfsRestOperationType operationType,
final AbfsCounters abfsCounters) {
}

/**
* No-op implementation of close method.
*
* @throws IOException if an I/O error occurs during cleanup
*/
@Override
public void close() throws IOException {
// No resources to clean up in no-op implementation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

Expand All @@ -26,7 +28,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface AbfsThrottlingIntercept {
public interface AbfsThrottlingIntercept extends Closeable {

/**
* Updates the metrics for successful and failed read and write operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import java.io.IOException;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
Expand Down Expand Up @@ -180,4 +184,103 @@ public void testManySuccessAndErrorsAndWaiting() {
sleep(10 * ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
}

/**
* Test that timer thread is properly cleaned up when analyzer is closed.
* This validates the fix for HADOOP-19624.
*/
@Test
public void testAnalyzerTimerCleanup() throws Exception {
int initialTimerThreads = countAbfsTimerThreads();

// Create analyzer - should create one timer thread
AbfsClientThrottlingAnalyzer analyzer =
new AbfsClientThrottlingAnalyzer("test-cleanup", abfsConfiguration);

// Verify timer thread was created
assertEquals(initialTimerThreads + 1, countAbfsTimerThreads(),
"Timer thread should be created");

// Close analyzer - should clean up timer thread
analyzer.close();

// Wait for cleanup to complete
sleep(1000);

// Verify timer thread was cleaned up
assertEquals(initialTimerThreads, countAbfsTimerThreads(),
"Timer thread should be cleaned up after close");
}

/**
* Test that close() is idempotent and can be called multiple times.
*/
@Test
public void testAnalyzerCloseIdempotent() throws Exception {
AbfsClientThrottlingAnalyzer analyzer =
new AbfsClientThrottlingAnalyzer("test-idempotent", abfsConfiguration);

int beforeClose = countAbfsTimerThreads();

// Close multiple times - should not throw exceptions
analyzer.close();
analyzer.close();
analyzer.close();

sleep(500);

// Should only clean up once
assertTrue(countAbfsTimerThreads() < beforeClose,
"Multiple close() calls should be safe");
}

/**
* Test cleanup with multiple analyzers to ensure no interference.
*/
@Test
public void testMultipleAnalyzersCleanup() throws Exception {
int initialTimerThreads = countAbfsTimerThreads();

// Create multiple analyzers
AbfsClientThrottlingAnalyzer analyzer1 =
new AbfsClientThrottlingAnalyzer("test-multi-1", abfsConfiguration);
AbfsClientThrottlingAnalyzer analyzer2 =
new AbfsClientThrottlingAnalyzer("test-multi-2", abfsConfiguration);
AbfsClientThrottlingAnalyzer analyzer3 =
new AbfsClientThrottlingAnalyzer("test-multi-3", abfsConfiguration);

// Should have created 3 timer threads
assertEquals(initialTimerThreads + 3, countAbfsTimerThreads(),
"Should create 3 timer threads");

// Close all analyzers
analyzer1.close();
analyzer2.close();
analyzer3.close();

sleep(1000);

// All timer threads should be cleaned up
assertEquals(initialTimerThreads, countAbfsTimerThreads(),
"All timer threads should be cleaned up");
}

/**
* Helper method to count ABFS timer threads.
*/
private int countAbfsTimerThreads() {
java.lang.management.ThreadMXBean threadBean =
java.lang.management.ManagementFactory.getThreadMXBean();
long[] threadIds = threadBean.getAllThreadIds();

int count = 0;
for (long id : threadIds) {
java.lang.management.ThreadInfo info = threadBean.getThreadInfo(id);
if (info != null &&
info.getThreadName().contains("abfs-timer-client-throttling-analyzer")) {
count++;
}
}
return count;
}
}