Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public static String createFromHFileLink(final Configuration conf, final FileSys
* Create the back reference name
*/
// package-private for testing
static String createBackReferenceName(final String tableNameStr, final String regionName) {
public static String createBackReferenceName(final String tableNameStr, final String regionName) {

return regionName + "." + tableNameStr.replace(TableName.NAMESPACE_DELIM, '=');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public static Reference convert(final FSProtos.Reference r) {
* delimiter, pb reads to EOF which may not be what you want).
* @return This instance serialized as a delimited protobuf w/ a magic pb prefix.
*/
byte[] toByteArray() throws IOException {
public byte[] toByteArray() throws IOException {
return ProtobufUtil.prependPBMagic(convert().toByteArray());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem reg
// to read the hfiles.
storeFileInfo.setConf(storeConfiguration);
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED), tracker);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in the future we would be moving mergeStoreFile to the SFT interface itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in future if we have split/merge as well in SFT layer based on the impl (FILE, DEFAULT) we can commit all the ref/link files at once for FSFT based implementations

mergedFiles.add(refFile);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,9 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
// table dir. In case of failure, the proc would go through this again, already existing
// region dirs and split files would just be ignored, new split files should get created.
int nbFiles = 0;
final Map<String, Collection<StoreFileInfo>> files =
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
final Map<String, Pair<Collection<StoreFileInfo>, StoreFileTracker>> files =
new HashMap<String, Pair<Collection<StoreFileInfo>, StoreFileTracker>>(
htd.getColumnFamilyCount());
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
String family = cfd.getNameAsString();
StoreFileTracker tracker =
Expand All @@ -705,7 +706,7 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
}
if (filteredSfis == null) {
filteredSfis = new ArrayList<StoreFileInfo>(sfis.size());
files.put(family, filteredSfis);
files.put(family, new Pair(filteredSfis, tracker));
}
filteredSfis.add(sfi);
nbFiles++;
Expand All @@ -728,10 +729,12 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);

// Split each store file.
for (Map.Entry<String, Collection<StoreFileInfo>> e : files.entrySet()) {
for (Map.Entry<String, Pair<Collection<StoreFileInfo>, StoreFileTracker>> e : files
.entrySet()) {
byte[] familyName = Bytes.toBytes(e.getKey());
final ColumnFamilyDescriptor hcd = htd.getColumnFamily(familyName);
final Collection<StoreFileInfo> storeFiles = e.getValue();
Pair<Collection<StoreFileInfo>, StoreFileTracker> storeFilesAndTracker = e.getValue();
final Collection<StoreFileInfo> storeFiles = storeFilesAndTracker.getFirst();
if (storeFiles != null && storeFiles.size() > 0) {
final Configuration storeConfiguration =
StoreUtils.createStoreConfiguration(env.getMasterConfiguration(), htd, hcd);
Expand All @@ -742,8 +745,9 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
// is running in a regionserver's Store context, or we might not be able
// to read the hfiles.
storeFileInfo.setConf(storeConfiguration);
StoreFileSplitter sfs = new StoreFileSplitter(regionFs, familyName,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
StoreFileSplitter sfs =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Assign the value of .getSecond() to a local variable of type StoreFileTracker and use that variable instead so it is easier to follow what is going on here.

new StoreFileSplitter(regionFs, storeFilesAndTracker.getSecond(), familyName,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
futures.add(threadPool.submit(sfs));
}
}
Expand Down Expand Up @@ -809,19 +813,19 @@ private void assertSplitResultFilesCount(final FileSystem fs,
}
}

private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, byte[] family, HStoreFile sf)
throws IOException {
private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, StoreFileTracker tracker,
byte[] family, HStoreFile sf) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + getProcId() + " splitting started for store file: " + sf.getPath()
+ " for region: " + getParentRegion().getShortNameToLog());
}

final byte[] splitRow = getSplitRow();
final String familyName = Bytes.toString(family);
final Path path_first =
regionFs.splitStoreFile(this.daughterOneRI, familyName, sf, splitRow, false, splitPolicy);
final Path path_second =
regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf, splitRow, true, splitPolicy);
final Path path_first = regionFs.splitStoreFile(this.daughterOneRI, familyName, sf, splitRow,
false, splitPolicy, tracker);
final Path path_second = regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf, splitRow,
true, splitPolicy, tracker);
if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + getProcId() + " splitting complete for store file: " + sf.getPath()
+ " for region: " + getParentRegion().getShortNameToLog());
Expand All @@ -837,22 +841,25 @@ private class StoreFileSplitter implements Callable<Pair<Path, Path>> {
private final HRegionFileSystem regionFs;
private final byte[] family;
private final HStoreFile sf;
private final StoreFileTracker tracker;

/**
* Constructor that takes what it needs to split
* @param regionFs the file system
* @param family Family that contains the store file
* @param sf which file
*/
public StoreFileSplitter(HRegionFileSystem regionFs, byte[] family, HStoreFile sf) {
public StoreFileSplitter(HRegionFileSystem regionFs, StoreFileTracker tracker, byte[] family,
HStoreFile sf) {
this.regionFs = regionFs;
this.sf = sf;
this.family = family;
this.tracker = tracker;
}

@Override
public Pair<Path, Path> call() throws IOException {
return splitStoreFile(regionFs, family, sf);
return splitStoreFile(regionFs, tracker, family, sf);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -50,6 +52,8 @@
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -422,7 +426,16 @@ private static Pair<Boolean, Boolean> checkRegionReferences(MasterServices servi
try {
HRegionFileSystem regionFs = HRegionFileSystem
.openRegionFromFileSystem(services.getConfiguration(), fs, tabledir, region, true);
boolean references = regionFs.hasReferences(tableDescriptor);
ColumnFamilyDescriptor[] families = tableDescriptor.getColumnFamilies();
boolean references = false;
for (ColumnFamilyDescriptor cfd : families) {
StoreFileTracker sft = StoreFileTrackerFactory.create(services.getConfiguration(),
tableDescriptor, ColumnFamilyDescriptorBuilder.of(cfd.getNameAsString()), regionFs);
references = references || sft.hasReferences();
if (references) {
break;
}
}
Comment on lines +429 to +438
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also be moved to HRegionFileSystem.hasReferences(), like other methods did?

Additional question: Is it guaranteed that we don't have more than one SFT instance for the same store at any point in time? Because the file based SFT impl relies on the fact it's the single instance manipulating its meta files. If we have multiple instances of SFT for the same store, it could lead to inconsistencies, where one of the instances update the meta file, than the others would be looking at an outdated state of the meta files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also be moved to HRegionFileSystem.hasReferences(), like other methods did?

When we introduce virtual links to Link/Ref files using SFT, hasReferences() abstraction also should move to SFT layer and there is only one reference left of HRegionFileSystem.hasReferences() which will be eventually removed in HBASE-28861

Is it guaranteed that we don't have more than one SFT instance for the same store at any point in time? Because the file based SFT impl relies on the fact it's the single instance manipulating its meta files

This should only be true for WRITE mode of SFT instance right? there could be multiple instances of READ only mode SFT for the same store. I have created HBASE-28863 for introducing cache primarily for READ only mode

If we have multiple instances of SFT for the same store, it could lead to inconsistencies, where one of the instances update the meta file, than the others would be looking at an outdated state of the meta files.

Is this part not already handled based on timestamp? Or Flush and Compaction both are using SFT instance from StoreEngine due to that same reason?
Also looks like at couple of places I am using non READ only mode and I was only needing READ mode there, let me quickly reiterate on that, but I guess it should be good to have that check at StoreFileTrackerFactory layer as well. thanks for pointing that out @wchevreuil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only be true for WRITE mode of SFT instance right? there could be multiple instances of READ only mode SFT for the same store.

How would the "READ_ONLY" SFTs know that the "WRITER" had rotated the file? Specially if its on a different process, like here? Other master based processes such as split take it for granted since the region would be closed, but I don't think it's the case here.

Copy link
Contributor

@wchevreuil wchevreuil Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at the FileBasedStoreFileTracker code. Whenever compaction or flush happens, we rotate the meta file as we call FileBasedStoreFileTracker.update, and then we also update the map of store files within this FileBasedStoreFileTracker instance. At this point, any other existing FileBasedStoreFileTracker instance for this same store should reload the store files, otherwise the store files map on those instances would be outdated. For SFT instances on read replicas, that's not a problem because we signal these events in the wal, so the read replicas would know they need to do a reload in their SFT instances.

Copy link
Contributor Author

@gvprathyusha6 gvprathyusha6 Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, any other existing FileBasedStoreFileTracker instance for this same store should reload the store files, otherwise the store files map on those instances would be outdated.

load of SFT every time does a backedfile.load() again right, at that time it should know the latest manifest and load that one again, it does not return the storefiles it cached know

return new Pair<>(Boolean.TRUE, references);
} catch (IOException e) {
LOG.error("Error trying to determine if region {} has references, assuming it does",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -41,10 +42,10 @@ public CachedMobFile(HStoreFile sf) {
}

public static CachedMobFile create(FileSystem fs, Path path, Configuration conf,
CacheConfig cacheConf) throws IOException {
CacheConfig cacheConf, StoreFileTracker sft) throws IOException {
// XXX: primaryReplica is only used for constructing the key of block cache so it is not a
// critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true, sft);
return new CachedMobFile(sf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
* @param tableName The current table name.
* @param family The current family.
*/
public void cleanExpiredMobFiles(String tableName, ColumnFamilyDescriptor family)
public void cleanExpiredMobFiles(TableDescriptor htd, ColumnFamilyDescriptor family)
throws IOException {
Configuration conf = getConf();
TableName tn = TableName.valueOf(tableName);
String tableName = htd.getTableName().getNameAsString();
FileSystem fs = FileSystem.get(conf);
LOG.info("Cleaning the expired MOB files of " + family.getNameAsString() + " in " + tableName);
// disable the block cache.
Configuration copyOfConf = new Configuration(conf);
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
CacheConfig cacheConfig = new CacheConfig(copyOfConf);
MobUtils.cleanExpiredMobFiles(fs, conf, tn, family, cacheConfig,
MobUtils.cleanExpiredMobFiles(fs, conf, htd, family, cacheConfig,
EnvironmentEdgeManager.currentTime());
}

Expand Down Expand Up @@ -105,7 +105,7 @@ public int run(String[] args) throws Exception {
throw new IOException(
"The minVersions of the column family is not 0, could not be handled by this cleaner");
}
cleanExpiredMobFiles(tableName, family);
cleanExpiredMobFiles(htd, family);
return 0;
} finally {
admin.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -134,11 +135,11 @@ public void close() throws IOException {
* @param cacheConf The CacheConfig.
* @return An instance of the MobFile.
*/
public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf)
throws IOException {
public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf,
StoreFileTracker sft) throws IOException {
// XXX: primaryReplica is only used for constructing the key of block cache so it is not a
// critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true, sft);
return new MobFile(sf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -198,9 +201,11 @@ public void evictFile(String fileName) {
* @param cacheConf The current MobCacheConfig
* @return A opened mob file.
*/
public MobFile openFile(FileSystem fs, Path path, CacheConfig cacheConf) throws IOException {
public MobFile openFile(FileSystem fs, Path path, CacheConfig cacheConf,
StoreContext storeContext) throws IOException {
StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, storeContext);
if (!isCacheEnabled) {
MobFile mobFile = MobFile.create(fs, path, conf, cacheConf);
MobFile mobFile = MobFile.create(fs, path, conf, cacheConf, sft);
mobFile.open();
return mobFile;
} else {
Expand All @@ -214,7 +219,7 @@ public MobFile openFile(FileSystem fs, Path path, CacheConfig cacheConf) throws
if (map.size() > mobFileMaxCacheSize) {
evict();
}
cached = CachedMobFile.create(fs, path, conf, cacheConf);
cached = CachedMobFile.create(fs, path, conf, cacheConf, sft);
cached.open();
map.put(fileName, cached);
miss.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected void chore() {
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
try {
cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
cleaner.cleanExpiredMobFiles(htd, hcd);
} catch (IOException e) {
LOG.error("Failed to clean the expired mob files table={} family={}",
htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -90,7 +94,10 @@ public static void cleanupObsoleteMobFiles(Configuration conf, TableName table,
Set<String> allActiveMobFileName = new HashSet<String>();
for (Path regionPath : regionDirs) {
regionNames.add(regionPath.getName());
HRegionFileSystem regionFS =
HRegionFileSystem.create(conf, fs, tableDir, MobUtils.getMobRegionInfo(table));
for (ColumnFamilyDescriptor hcd : list) {
StoreFileTracker sft = StoreFileTrackerFactory.create(conf, htd, hcd, regionFS, false);
String family = hcd.getNameAsString();
Path storePath = new Path(regionPath, family);
boolean succeed = false;
Expand All @@ -102,26 +109,19 @@ public static void cleanupObsoleteMobFiles(Configuration conf, TableName table,
+ " execution, aborting MOB file cleaner chore.", storePath);
throw new IOException(errMsg);
}
RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(storePath);
List<Path> storeFiles = new ArrayList<Path>();
// Load list of store files first
while (rit.hasNext()) {
Path p = rit.next().getPath();
if (fs.isFile(p)) {
storeFiles.add(p);
}
}
LOG.info("Found {} store files in: {}", storeFiles.size(), storePath);
List<StoreFileInfo> storeFileInfos = sft.load();
LOG.info("Found {} store files in: {}", storeFileInfos.size(), storePath);
Path currentPath = null;
try {
for (Path pp : storeFiles) {
for (StoreFileInfo storeFileInfo : storeFileInfos) {
Path pp = storeFileInfo.getPath();
currentPath = pp;
LOG.trace("Store file: {}", pp);
HStoreFile sf = null;
byte[] mobRefData = null;
byte[] bulkloadMarkerData = null;
try {
sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
sf = new HStoreFile(storeFileInfo, BloomType.NONE, CacheConfig.DISABLED);
sf.initReader();
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
Expand Down
Loading