Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
Expand All @@ -53,18 +58,23 @@
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.quotas.QuotaExceededException;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.ThreadUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
Expand Down Expand Up @@ -573,15 +583,35 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
final FileSystem fs = mfs.getFileSystem();
List<Path> mergedFiles = new ArrayList<>();
HRegionFileSystem mergeRegionFs = HRegionFileSystem
.createRegionOnFileSystem(env.getMasterConfiguration(), fs, tableDir, mergedRegion);

Configuration conf = env.getMasterConfiguration();
int numOfThreads = conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT));
List<Path> mergedFiles = new ArrayList<Path>();
final ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads,
new ThreadFactoryBuilder().setNameFormat("StoreFileMerge-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
final List<Future<Path>> futures = new ArrayList<Future<Path>>();
for (RegionInfo ri : this.regionsToMerge) {
HRegionFileSystem regionFs = HRegionFileSystem
.openRegionFromFileSystem(env.getMasterConfiguration(), fs, tableDir, ri, false);
mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion));
mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion, threadPool, futures);
}
// Shutdown the pool
threadPool.shutdown();

// Wait for all the tasks to finish.
// When splits ran on the RegionServer, how-long-to-wait-configuration was named
// hbase.regionserver.fileSplitTimeout. If set, use its value.
long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
ThreadUtil.waitOnShutdown(threadPool, fileSplitTimeout,
"Took too long to merge the files and create the references, aborting merge");

List<Path> paths = ThreadUtil.getAllResults(futures);
mergedFiles.addAll(paths);
assert mergeRegionFs != null;
mergeRegionFs.commitMergedRegion(mergedFiles, env);

Expand All @@ -590,11 +620,11 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
.setState(State.MERGING_NEW);
}

private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion, ExecutorService threadPool,
List<Future<Path>> futures) throws IOException {
final TableDescriptor htd =
env.getMasterServices().getTableDescriptors().get(mergedRegion.getTable());
List<Path> mergedFiles = new ArrayList<>();
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
String family = hcd.getNameAsString();
StoreFileTracker tracker =
Expand All @@ -611,13 +641,17 @@ private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem reg
// is running in a regionserver's Store context, or we might not be able
// to read the hfiles.
storeFileInfo.setConf(storeConfiguration);
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
mergedFiles.add(refFile);
futures.add(threadPool.submit(new Callable<Path>() {
@Override
public Path call() throws Exception {
// TODO Auto-generated method stub
return mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
}
}));
}
}
}
return mergedFiles;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.assignment;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -27,11 +26,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -72,6 +69,7 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ThreadUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.util.ReflectionUtils;
Expand Down Expand Up @@ -740,37 +738,18 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
// hbase.regionserver.fileSplitTimeout. If set, use its value.
long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
try {
boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS);
if (stillRunning) {
threadPool.shutdownNow();
// wait for the thread to shutdown completely.
while (!threadPool.isTerminated()) {
Thread.sleep(50);
}
throw new IOException(
"Took too long to split the" + " files and create the references, aborting split");
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}

ThreadUtil.waitOnShutdown(threadPool, fileSplitTimeout,
"Took too long to split the files and create the references, aborting split");
List<Path> daughterA = new ArrayList<>();
List<Path> daughterB = new ArrayList<>();
// Look for any exception
for (Future<Pair<Path, Path>> future : futures) {
try {
Pair<Path, Path> p = future.get();
if (p.getFirst() != null) {
daughterA.add(p.getFirst());
}
if (p.getSecond() != null) {
daughterB.add(p.getSecond());
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
throw new IOException(e);
List<Pair<Path, Path>> paths = ThreadUtil.getAllResults(futures);
for (Pair<Path, Path> p : paths) {
if (p.getFirst() != null) {
daughterA.add(p.getFirst());
}
if (p.getSecond() != null) {
daughterB.add(p.getSecond());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Public
public class ThreadUtil {

/**
* Waits if necessary for the computation to complete, and then retrieves the results of all
* future objects.
* @param list of future objects for which the results will be retrieved
* @return list of computed results
* @throws InterruptedException if the current thread was interrupted while waiting
*/
public static <T> List<T> getAllResults(List<Future<T>> futures) throws IOException {
List<T> results = new ArrayList<T>();
for (Future<T> future : futures) {
try {
T t = future.get();
if (t != null) {
results.add(t);
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
throw new IOException(e);
}
}
return results;
}

/**
* Blocks until all tasks have completed execution after a shutdown request, or the timeout
* occurs, or the current thread is interrupted, whichever happens first.
* @param timeoutInMillis the maximum time to wait
* @return {@code true} if this executor terminated and {@code false} if the timeout elapsed
* before termination
* @throws InterruptedException if interrupted while waiting
*/
public static void waitOnShutdown(ExecutorService threadPool, long timeoutInMillis,
String errMessage) throws IOException {
try {
boolean stillRunning = !threadPool.awaitTermination(timeoutInMillis, TimeUnit.MILLISECONDS);
if (stillRunning) {
threadPool.shutdownNow();
// wait for the thread to shutdown completely.
while (!threadPool.isTerminated()) {
Thread.sleep(50);
}
throw new IOException(errMessage);
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}

}