Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,32 @@
package org.apache.hadoop.hbase.master.cleaner;

import java.io.IOException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
Expand Down Expand Up @@ -211,11 +211,16 @@ private void preRunCleaner() {
cleanersChain.forEach(FileCleanerDelegate::preClean);
}

public Boolean runCleaner() {
public boolean runCleaner() {
preRunCleaner();
CleanerTask task = new CleanerTask(this.oldFileDir, true);
pool.execute(task);
return task.join();
try {
CompletableFuture<Boolean> future = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
return future.get();
} catch (Exception e) {
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
return false;
}
}

/**
Expand Down Expand Up @@ -380,126 +385,97 @@ public boolean setEnabled(final boolean enabled) {
}

private interface Action<T> {
T act() throws IOException;
T act() throws Exception;
}

/**
* Attemps to clean up a directory, its subdirectories, and files. Return value is true if
* everything was deleted. false on partial / total failures.
* Attempts to clean up a directory(its subdirectories, and files) in a
* {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
* calling result.get().
*/
private final class CleanerTask extends RecursiveTask<Boolean> {

private static final long serialVersionUID = -5444212174088754172L;

private final Path dir;
private final boolean root;

CleanerTask(final FileStatus dir, final boolean root) {
this(dir.getPath(), root);
}

CleanerTask(final Path dir, final boolean root) {
this.dir = dir;
this.root = root;
}

@Override
protected Boolean compute() {
LOG.trace("Cleaning under {}", dir);
List<FileStatus> subDirs;
List<FileStatus> files;
try {
// if dir doesn't exist, we'll get null back for both of these
// which will fall through to succeeding.
subDirs = getFilteredStatus(FileStatus::isDirectory);
files = getFilteredStatus(FileStatus::isFile);
} catch (IOException ioe) {
LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
return false;
}

boolean allFilesDeleted = true;
if (!files.isEmpty()) {
allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
}

boolean allSubdirsDeleted = true;
private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
try {
// Step.1: List all files under the given directory.
List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
List<FileStatus> subDirs =
allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
List<FileStatus> files =
allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());

// Step.2: Try to delete all the deletable files.
boolean allFilesDeleted =
files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);

// Step.3: Start to traverse and delete the sub-directories.
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
if (!subDirs.isEmpty()) {
List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
sortByConsumedSpace(subDirs);
for (FileStatus subdir : subDirs) {
CleanerTask task = new CleanerTask(subdir, false);
tasks.add(task);
task.fork();
}
allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
// Submit the request of sub-directory deletion.
subDirs.forEach(subDir -> {
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
futures.add(subFuture);
});
}

boolean result = allFilesDeleted && allSubdirsDeleted && isEmptyDirDeletable(dir);
// if and only if files and subdirs under current dir are deleted successfully, and the empty
// directory can be deleted, and it is not the root dir then task will try to delete it.
if (result && !root) {
result &= deleteAction(() -> fs.delete(dir, false), "dir");
}
return result;
}

/**
* Get FileStatus with filter.
* @param function a filter function
* @return filtered FileStatus or empty list if dir doesn't exist
* @throws IOException if there's an error other than dir not existing
*/
private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
status -> function.test(status))).orElseGet(Collections::emptyList);
}

/**
* Perform a delete on a specified type.
* @param deletion a delete
* @param type possible values are 'files', 'subdirs', 'dirs'
* @return true if it deleted successfully, false otherwise
*/
private boolean deleteAction(Action<Boolean> deletion, String type) {
boolean deleted;
try {
LOG.trace("Start deleting {} under {}", type, dir);
deleted = deletion.act();
} catch (PathIsNotEmptyDirectoryException exception) {
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
// message below.
LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +
"exception details at TRACE.", dir);
LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
deleted = false;
} catch (IOException ioe) {
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +
"happening, use following exception when asking on mailing list.",
type, dir, ioe);
deleted = false;
}
LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
return deleted;
// Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
// current directory asynchronously.
FutureUtils.addListener(
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
(voidObj, e) -> {
if (e != null) {
result.completeExceptionally(e);
return;
}
try {
boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir);
if (deleted && !root) {
// If and only if files and sub-dirs under current dir are deleted successfully, and
// the empty directory can be deleted, and it is not the root dir then task will
// try to delete it.
deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
}
result.complete(deleted);
} catch (Exception ie) {
// Must handle the inner exception here, otherwise the result may get stuck if one
// sub-directory get some failure.
result.completeExceptionally(ie);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the past we will return false instead of throwing the exception out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Em, let me check whether it will impact the behavior ..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked the logic, should have no difference here. Both the exeption from sub-directory & boolean flag will skip the deletion of current directory, should be OK.

}
});
} catch (Exception e) {
LOG.debug("Failed to traverse and delete the path: {}", dir, e);
result.completeExceptionally(e);
}
}

/**
* Get cleaner results of subdirs.
* @param tasks subdirs cleaner tasks
* @return true if all subdirs deleted successfully, false for patial/all failures
* @throws IOException something happen during computation
*/
private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
boolean cleaned = true;
try {
for (CleanerTask task : tasks) {
cleaned &= task.get();
}
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
return cleaned;
/**
* Perform a delete on a specified type.
* @param deletion a delete
* @param type possible values are 'files', 'subdirs', 'dirs'
* @return true if it deleted successfully, false otherwise
*/
private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
boolean deleted;
try {
LOG.trace("Start deleting {} under {}", type, dir);
deleted = deletion.act();
} catch (PathIsNotEmptyDirectoryException exception) {
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
// message below.
LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
deleted = false;
} catch (IOException ioe) {
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
+ "happening, use following exception when asking on mailing list.",
type, dir, ioe);
deleted = false;
} catch (Exception e) {
LOG.info("unexpected exception: ", e);
deleted = false;
}
LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
return deleted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hbase.master.cleaner;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand All @@ -32,7 +35,7 @@
public class DirScanPool implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class);
private volatile int size;
private ForkJoinPool pool;
private final ThreadPoolExecutor pool;
private int cleanerLatch;
private boolean reconfigNotification;

Expand All @@ -42,11 +45,18 @@ public DirScanPool(Configuration conf) {
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
pool = new ForkJoinPool(size);
pool = initializePool(size);
LOG.info("Cleaner pool size is {}", size);
cleanerLatch = 0;
}

private static ThreadPoolExecutor initializePool(int size) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(), new DaemonThreadFactory("dir-scan-pool"));
executor.allowCoreThreadTimeOut(true);
return executor;
}

/**
* Checks if pool can be updated. If so, mark for update later.
* @param conf configuration
Expand All @@ -73,8 +83,8 @@ synchronized void latchCountDown() {
notifyAll();
}

synchronized void execute(ForkJoinTask<?> task) {
pool.execute(task);
synchronized void execute(Runnable runnable) {
pool.execute(runnable);
}

public synchronized void shutdownNow() {
Expand All @@ -99,9 +109,8 @@ synchronized void tryUpdatePoolSize(long timeout) {
break;
}
}
shutdownNow();
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
pool = new ForkJoinPool(size);
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
pool.setCorePoolSize(size);
}

public int getSize() {
Expand Down