Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ public Collection<HStoreFile> getCompactedfiles() {
}

@Override
public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
public void insertNewFiles(Collection<HStoreFile> sfs) {
this.storefiles =
ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(this.storefiles, sfs));
ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(this.storefiles, sfs));
}

@Override
Expand Down Expand Up @@ -132,11 +132,10 @@ public void addCompactionResults(Collection<HStoreFile> newCompactedfiles,
}

@Override
public void removeCompactedFiles(Collection<HStoreFile> removedCompactedfiles)
throws IOException {
public void removeCompactedFiles(Collection<HStoreFile> removedCompactedfiles) {
this.compactedfiles =
this.compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
.sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
this.compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
.sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;

/**
* Manages the store files and basic metadata about that that determines the logical structure
* (e.g. what files to return for scan, how to determine split point, and such).
* Does NOT affect the physical structure of files in HDFS.
* Example alternative structures - the default list of files by seqNum; levelDB one sorted
* by level and seqNum.
*
* Manages the store files and basic metadata about that that determines the logical structure (e.g.
* what files to return for scan, how to determine split point, and such). Does NOT affect the
* physical structure of files in HDFS. Example alternative structures - the default list of files
* by seqNum; levelDB one sorted by level and seqNum.
* <p/>
* Notice that, all the states are only in memory, we do not persist anything here. The only place
* where we throw an {@link IOException} is the {@link #getSplitPoint()} method, where we need to
* read startKey, endKey etc, which may lead to an {@link IOException}.
* <p/>
* Implementations are assumed to be not thread safe.
*/
@InterfaceAudience.Private
Expand All @@ -52,22 +55,20 @@ public interface StoreFileManager {
* Adds new files, either for from MemStore flush or bulk insert, into the structure.
* @param sfs New store files.
*/
void insertNewFiles(Collection<HStoreFile> sfs) throws IOException;
void insertNewFiles(Collection<HStoreFile> sfs);

/**
* Adds only the new compaction results into the structure.
* @param compactedFiles The input files for the compaction.
* @param results The resulting files for the compaction.
*/
void addCompactionResults(
Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results) throws IOException;
void addCompactionResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results);

/**
* Remove the compacted files
* @param compactedFiles the list of compacted files
* @throws IOException
*/
void removeCompactedFiles(Collection<HStoreFile> compactedFiles) throws IOException;
void removeCompactedFiles(Collection<HStoreFile> compactedFiles);

/**
* Clears all the files currently in use and returns them.
Expand Down Expand Up @@ -145,7 +146,6 @@ Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore(Iterator<HStoreFile> ca
/**
* Gets the split point for the split of this set of store files (approx. middle).
* @return The mid-point if possible.
* @throws IOException
*/
Optional<byte[]> getSplitPoint() throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,9 @@ public int getCompactedFilesCount() {
}

@Override
public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
public void insertNewFiles(Collection<HStoreFile> sfs) {
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
// Passing null does not cause NPE??
cmc.mergeResults(null, sfs);
cmc.mergeResults(Collections.emptyList(), sfs);
debugDumpState("Added new files");
}

Expand Down Expand Up @@ -321,11 +320,11 @@ public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeSt
}

@Override
public void addCompactionResults(
Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results) throws IOException {
public void addCompactionResults(Collection<HStoreFile> compactedFiles,
Collection<HStoreFile> results) {
// See class comment for the assumptions we make here.
LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
+ " files replaced by " + results.size());
LOG.debug("Attempting to merge compaction results: " + compactedFiles.size() +
" files replaced by " + results.size());
// In order to be able to fail in the middle of the operation, we'll operate on lazy
// copies and apply the result at the end.
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
Expand All @@ -345,7 +344,7 @@ private void markCompactedAway(Collection<HStoreFile> compactedFiles) {
}

@Override
public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) throws IOException {
public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
// See class comment for the assumptions we make here.
LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
// In order to be able to fail in the middle of the operation, we'll operate on lazy
Expand Down Expand Up @@ -728,13 +727,15 @@ public CompactionOrFlushMergeCopy(boolean isFlush) {
this.isFlush = isFlush;
}

private void mergeResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results)
throws IOException {
private void mergeResults(Collection<HStoreFile> compactedFiles,
Collection<HStoreFile> results) {
assert this.compactedFiles == null && this.results == null;
this.compactedFiles = compactedFiles;
this.results = results;
// Do logical processing.
if (!isFlush) removeCompactedFiles();
if (!isFlush) {
removeCompactedFiles();
}
TreeMap<byte[], HStoreFile> newStripes = processResults();
if (newStripes != null) {
processNewCandidateStripes(newStripes);
Expand All @@ -745,7 +746,7 @@ private void mergeResults(Collection<HStoreFile> compactedFiles, Collection<HSto
updateMetadataMaps();
}

private void deleteResults(Collection<HStoreFile> compactedFiles) throws IOException {
private void deleteResults(Collection<HStoreFile> compactedFiles) {
this.compactedFiles = compactedFiles;
// Create new state and update parent.
State state = createNewState(true);
Expand Down Expand Up @@ -828,11 +829,11 @@ private final ArrayList<HStoreFile> getLevel0Copy() {
}

/**
* Process new files, and add them either to the structure of existing stripes,
* or to the list of new candidate stripes.
* Process new files, and add them either to the structure of existing stripes, or to the list
* of new candidate stripes.
* @return New candidate stripes.
*/
private TreeMap<byte[], HStoreFile> processResults() throws IOException {
private TreeMap<byte[], HStoreFile> processResults() {
TreeMap<byte[], HStoreFile> newStripes = null;
for (HStoreFile sf : this.results) {
byte[] startRow = startOf(sf), endRow = endOf(sf);
Expand All @@ -859,8 +860,9 @@ private TreeMap<byte[], HStoreFile> processResults() throws IOException {
}
HStoreFile oldSf = newStripes.put(endRow, sf);
if (oldSf != null) {
throw new IOException("Compactor has produced multiple files for the stripe ending in ["
+ Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
throw new IllegalStateException(
"Compactor has produced multiple files for the stripe ending in [" +
Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
}
}
return newStripes;
Expand All @@ -869,7 +871,7 @@ private TreeMap<byte[], HStoreFile> processResults() throws IOException {
/**
* Remove compacted files.
*/
private void removeCompactedFiles() throws IOException {
private void removeCompactedFiles() {
for (HStoreFile oldFile : this.compactedFiles) {
byte[] oldEndRow = endOf(oldFile);
List<HStoreFile> source = null;
Expand All @@ -878,13 +880,14 @@ private void removeCompactedFiles() throws IOException {
} else {
int stripeIndex = findStripeIndexByEndRow(oldEndRow);
if (stripeIndex < 0) {
throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
+ " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
throw new IllegalStateException(
"An allegedly compacted file [" + oldFile + "] does not belong" +
" to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
}
source = getStripeCopy(stripeIndex);
}
if (!source.remove(oldFile)) {
throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
LOG.warn("An allegedly compacted file [{}] was not found", oldFile);
}
}
}
Expand All @@ -894,16 +897,15 @@ private void removeCompactedFiles() throws IOException {
* new candidate stripes/removes old stripes; produces new set of stripe end rows.
* @param newStripes New stripes - files by end row.
*/
private void processNewCandidateStripes(
TreeMap<byte[], HStoreFile> newStripes) throws IOException {
private void processNewCandidateStripes(TreeMap<byte[], HStoreFile> newStripes) {
// Validate that the removed and added aggregate ranges still make for a full key space.
boolean hasStripes = !this.stripeFiles.isEmpty();
this.stripeEndRows = new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
int removeFrom = 0;
byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
byte[] lastEndRow = newStripes.lastKey();
if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
throw new IOException("Newly created stripes do not cover the entire key space.");
throw new IllegalStateException("Newly created stripes do not cover the entire key space.");
}

boolean canAddNewStripes = true;
Expand All @@ -915,11 +917,15 @@ private void processNewCandidateStripes(
removeFrom = 0;
} else {
removeFrom = findStripeIndexByEndRow(firstStartRow);
if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
if (removeFrom < 0) {
throw new IllegalStateException("Compaction is trying to add a bad range.");
}
++removeFrom;
}
int removeTo = findStripeIndexByEndRow(lastEndRow);
if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
if (removeTo < 0) {
throw new IllegalStateException("Compaction is trying to add a bad range.");
}
// See if there are files in the stripes we are trying to replace.
ArrayList<HStoreFile> conflictingFiles = new ArrayList<>();
for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
Expand Down Expand Up @@ -961,7 +967,9 @@ private void processNewCandidateStripes(
}
}

if (!canAddNewStripes) return; // Files were already put into L0.
if (!canAddNewStripes) {
return; // Files were already put into L0.
}

// Now, insert new stripes. The total ranges match, so we can insert where we removed.
byte[] previousEndRow = null;
Expand All @@ -972,8 +980,8 @@ private void processNewCandidateStripes(
assert !isOpen(previousEndRow);
byte[] startRow = startOf(newStripe.getValue());
if (!rowEquals(previousEndRow, startRow)) {
throw new IOException("The new stripes produced by "
+ (isFlush ? "flush" : "compaction") + " are not contiguous");
throw new IllegalStateException("The new stripes produced by " +
(isFlush ? "flush" : "compaction") + " are not contiguous");
}
}
// Add the new stripe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -542,14 +543,10 @@ private void testPriorityScenario(int expectedPriority,
}

private void verifyInvalidCompactionScenario(StripeStoreFileManager manager,
ArrayList<HStoreFile> filesToCompact, ArrayList<HStoreFile> filesToInsert) throws Exception {
ArrayList<HStoreFile> filesToCompact, ArrayList<HStoreFile> filesToInsert) throws Exception {
Collection<HStoreFile> allFiles = manager.getStorefiles();
try {
manager.addCompactionResults(filesToCompact, filesToInsert);
fail("Should have thrown");
} catch (IOException ex) {
// Ignore it.
}
assertThrows(IllegalStateException.class,
() -> manager.addCompactionResults(filesToCompact, filesToInsert));
verifyAllFiles(manager, allFiles); // must have the same files.
}

Expand Down