Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -53,7 +52,6 @@
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -89,7 +87,6 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
Expand Down Expand Up @@ -1747,64 +1744,6 @@ public void replayCompactionMarker(CompactionDescriptor compaction, boolean pick
}
}

/**
* This method tries to compact N recent files for testing.
* Note that because compacting "recent" files only makes sense for some policies,
* e.g. the default one, it assumes default policy is used. It doesn't use policy,
* but instead makes a compaction candidate list by itself.
* @param N Number of files.
*/
public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
List<HStoreFile> filesToCompact;
boolean isMajor;

this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
HStoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = filesToCompact.indexOf(last);
Preconditions.checkArgument(idx != -1);
filesToCompact.subList(0, idx + 1).clear();
}
int count = filesToCompact.size();
if (N > count) {
throw new RuntimeException("Not enough files");
}

filesToCompact = filesToCompact.subList(count - N, count);
isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
filesCompacting.addAll(filesToCompact);
Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
.getStoreFileComparator());
}
} finally {
this.lock.readLock().unlock();
}

try {
// Ready to go. Have list of files to compact.
List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
.compactForTesting(filesToCompact, isMajor);
for (Path newFile: newFiles) {
// Move the compaction into place.
HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompact(this, sf, null, null, null);
}
replaceStoreFiles(filesToCompact, Collections.singletonList(sf));
refreshStoreSizeAndTotalBytes();
}
} finally {
synchronized (filesCompacting) {
filesCompacting.removeAll(filesToCompact);
}
}
}

@Override
public boolean hasReferences() {
// Grab the read lock here, because we need to ensure that: only when the atomic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@
package org.apache.hadoop.hbase.regionserver.compactions;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -65,23 +62,6 @@ public List<Path> compact(final CompactionRequestImpl request,
return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
}

/**
* Compact a list of files for testing. Creates a fake {@link CompactionRequestImpl} to pass to
* {@link #compact(CompactionRequestImpl, ThroughputController, User)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for the
* generated {@link CompactionRequestImpl}.
* @param isMajor true to major compact (prune all deletes, max versions, etc)
* @return Product of compaction or an empty list if all cells expired or deleted and nothing \
* made it through the compaction.
* @throws IOException
*/
public List<Path> compactForTesting(Collection<HStoreFile> filesToCompact, boolean isMajor)
throws IOException {
CompactionRequestImpl cr = new CompactionRequestImpl(filesToCompact);
cr.setIsMajor(isMajor, isMajor);
return compact(cr, NoLimitThroughputController.INSTANCE, null);
}

@Override
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand Down Expand Up @@ -82,25 +81,18 @@ public class TestScannerSelectionUsingTTL {

public final int numFreshFiles, totalNumFiles;

/** Whether we are specifying the exact files to compact */
private final boolean explicitCompaction;

@Parameters
public static Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
for (int numFreshFiles = 1; numFreshFiles <= 3; ++numFreshFiles) {
for (boolean explicitCompaction : new boolean[] { false, true }) {
params.add(new Object[] { numFreshFiles, explicitCompaction });
}
params.add(new Object[] { numFreshFiles });
}
return params;
}

public TestScannerSelectionUsingTTL(int numFreshFiles,
boolean explicitCompaction) {
public TestScannerSelectionUsingTTL(int numFreshFiles) {
this.numFreshFiles = numFreshFiles;
this.totalNumFiles = numFreshFiles + NUM_EXPIRED_FILES;
this.explicitCompaction = explicitCompaction;
}

@Test
Expand Down Expand Up @@ -152,13 +144,7 @@ public void testScannerSelection() throws IOException {
Set<String> accessedFiles = cache.getCachedFileNamesForTest();
LOG.debug("Files accessed during scan: " + accessedFiles);

// Exercise both compaction codepaths.
if (explicitCompaction) {
HStore store = region.getStore(FAMILY_BYTES);
store.compactRecentForTestingAssumingDefaultPolicy(totalNumFiles);
} else {
region.compact(false);
}
region.compact(false);

HBaseTestingUtility.closeRegionAndWAL(region);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test compaction framework and common functions
Expand All @@ -94,9 +96,12 @@ public class TestCompaction {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompaction.class);
HBaseClassTestRule.forClass(TestCompaction.class);

@Rule public TestName name = new TestName();
private static final Logger LOG = LoggerFactory.getLogger(TestCompaction.class);

@Rule
public TestName name = new TestName();
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
protected Configuration conf = UTIL.getConfiguration();

Expand Down Expand Up @@ -154,7 +159,6 @@ public void tearDown() throws Exception {
/**
* Verify that you can stop a long-running compaction
* (used during RS shutdown)
* @throws Exception
*/
@Test
public void testInterruptCompactionBySize() throws Exception {
Expand All @@ -180,7 +184,7 @@ public void testInterruptCompactionBySize() throws Exception {
}

HRegion spyR = spy(r);
doAnswer(new Answer() {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
r.writestate.writesEnabled = false;
Expand Down Expand Up @@ -256,7 +260,7 @@ public void testInterruptCompactionByTime() throws Exception {
}

HRegion spyR = spy(r);
doAnswer(new Answer() {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
r.writestate.writesEnabled = false;
Expand Down Expand Up @@ -311,15 +315,14 @@ public Object answer(InvocationOnMock invocation) throws Throwable {

private int count() throws IOException {
int count = 0;
for (HStoreFile f: this.r.stores.
get(COLUMN_FAMILY_TEXT).getStorefiles()) {
for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
HFileScanner scanner = f.getReader().getScanner(false, false);
if (!scanner.seekTo()) {
continue;
}
do {
count++;
} while(scanner.next());
} while (scanner.next());
}
return count;
}
Expand All @@ -344,7 +347,8 @@ public void testCompactionWithCorruptResult() throws Exception {

Collection<HStoreFile> storeFiles = store.getStorefiles();
DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
tool.compactForTesting(storeFiles, false);
CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
tool.compact(request, NoLimitThroughputController.INSTANCE, null);

// Now lets corrupt the compacted file.
FileSystem fs = store.getFileSystem();
Expand All @@ -363,7 +367,7 @@ public void testCompactionWithCorruptResult() throws Exception {
// in the 'tmp' directory;
assertTrue(fs.exists(origPath));
assertFalse(fs.exists(dstPath));
System.out.println("testCompactionWithCorruptResult Passed");
LOG.info("testCompactionWithCorruptResult Passed");
return;
}
fail("testCompactionWithCorruptResult failed since no exception was" +
Expand Down Expand Up @@ -418,28 +422,27 @@ public void testCompactionFailure() throws Exception {
Mockito.when(mockRegion.checkSplit()).
thenThrow(new RuntimeException("Thrown intentionally by test!"));

MetricsRegionWrapper metricsWrapper = new MetricsRegionWrapperImpl(r);
try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) {

long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
long preFailedCount = metricsWrapper.getNumCompactionsFailed();
long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
long preFailedCount = metricsWrapper.getNumCompactionsFailed();

CountDownLatch latch = new CountDownLatch(1);
Tracker tracker = new Tracker(latch);
thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER,
tracker, null);
// wait for the latch to complete.
latch.await(120, TimeUnit.SECONDS);

// compaction should have completed and been marked as failed due to error in split request
long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
long postFailedCount = metricsWrapper.getNumCompactionsFailed();

assertTrue("Completed count should have increased (pre=" + preCompletedCount +
", post="+postCompletedCount+")",
postCompletedCount > preCompletedCount);
assertTrue("Failed count should have increased (pre=" + preFailedCount +
", post=" + postFailedCount + ")",
postFailedCount > preFailedCount);
CountDownLatch latch = new CountDownLatch(1);
Tracker tracker = new Tracker(latch);
thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker,
null);
// wait for the latch to complete.
latch.await(120, TimeUnit.SECONDS);

// compaction should have completed and been marked as failed due to error in split request
long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
long postFailedCount = metricsWrapper.getNumCompactionsFailed();

assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post=" +
postCompletedCount + ")", postCompletedCount > preCompletedCount);
assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post=" +
postFailedCount + ")", postFailedCount > preFailedCount);
}
}

/**
Expand Down
Loading