Skip to content

Commit 30f5966

Browse files
committed
HBASE-28004 Persistent cache map can get corrupt if crash happens midway through the write (#5341)
Signed-off-by: Ankit Singhal <[email protected]> Reviewed-by: Rahul Agarkar <[email protected]>
1 parent 7a4f59a commit 30f5966

File tree

17 files changed

+601
-200
lines changed

17 files changed

+601
-200
lines changed

hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ message BucketCacheEntry {
3232
map<int32, string> deserializers = 4;
3333
required BackingMap backing_map = 5;
3434
optional bytes checksum = 6;
35+
map<string, bool> prefetched_files = 7;
3536
}
3637

3738
message BackingMap {
@@ -71,6 +72,8 @@ message BucketEntry {
7172
required int64 access_counter = 3;
7273
required int32 deserialiser_index = 4;
7374
required BlockPriority priority = 5;
75+
required int64 cachedTime = 6;
76+
optional int32 disk_size_with_header = 7;
7477
}
7578

7679
enum BlockPriority {

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ public class CacheConfig {
9393
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
9494
"hbase.hfile.drop.behind.compaction";
9595

96-
public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path";
97-
9896
/**
9997
* Configuration key to set interval for persisting bucket cache to disk.
10098
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
package org.apache.hadoop.hbase.io.hfile;
1919

2020
import java.io.IOException;
21+
import java.util.Optional;
22+
import org.apache.commons.lang3.mutable.MutableBoolean;
2123
import org.apache.hadoop.conf.Configuration;
2224
import org.apache.hadoop.fs.Path;
2325
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
26+
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
27+
import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
2428
import org.apache.yetus.audience.InterfaceAudience;
2529
import org.slf4j.Logger;
2630
import org.slf4j.LoggerFactory;
@@ -35,8 +39,14 @@ public class HFilePreadReader extends HFileReaderImpl {
3539
public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
3640
Configuration conf) throws IOException {
3741
super(context, fileInfo, cacheConf, conf);
42+
final MutableBoolean fileAlreadyCached = new MutableBoolean(false);
43+
BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent(bc -> fileAlreadyCached
44+
.setValue(bc.getFullyCachedFiles().get(path.getName()) == null ? false : true));
3845
// Prefetch file blocks upon open if requested
39-
if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()) {
46+
if (
47+
cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()
48+
&& !fileAlreadyCached.booleanValue()
49+
) {
4050
PrefetchExecutor.request(path, new Runnable() {
4151
@Override
4252
public void run() {
@@ -55,12 +65,36 @@ public void run() {
5565
if (LOG.isTraceEnabled()) {
5666
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
5767
}
68+
Optional<BucketCache> bucketCacheOptional =
69+
BucketCache.getBuckedCacheFromCacheConfig(cacheConf);
5870
// Don't use BlockIterator here, because it's designed to read load-on-open section.
5971
long onDiskSizeOfNextBlock = -1;
6072
while (offset < end) {
6173
if (Thread.interrupted()) {
6274
break;
6375
}
76+
// BucketCache can be persistent and resilient to restarts, so we check first if the
77+
// block exists on its in-memory index, if so, we just update the offset and move on
78+
// to the next block without actually going read all the way to the cache.
79+
if (bucketCacheOptional.isPresent()) {
80+
BucketCache cache = bucketCacheOptional.get();
81+
BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
82+
BucketEntry entry = cache.getBackingMap().get(cacheKey);
83+
if (entry != null) {
84+
cacheKey = new BlockCacheKey(name, offset);
85+
entry = cache.getBackingMap().get(cacheKey);
86+
if (entry == null) {
87+
LOG.debug("No cache key {}, we'll read and cache it", cacheKey);
88+
} else {
89+
offset += entry.getOnDiskSizeWithHeader();
90+
LOG.debug("Found cache key {}. Skipping prefetch, the block is already cached.",
91+
cacheKey);
92+
continue;
93+
}
94+
} else {
95+
LOG.debug("No entry in the backing map for cache key {}", cacheKey);
96+
}
97+
}
6498
// Perhaps we got our block from cache? Unlikely as this may be, if it happens, then
6599
// the internal-to-hfileblock thread local which holds the overread that gets the
66100
// next header, will not have happened...so, pass in the onDiskSize gotten from the
@@ -77,12 +111,15 @@ public void run() {
77111
block.release();
78112
}
79113
}
114+
BucketCache.getBuckedCacheFromCacheConfig(cacheConf)
115+
.ifPresent(bc -> bc.fileCacheCompleted(path.getName()));
116+
80117
} catch (IOException e) {
81118
// IOExceptions are probably due to region closes (relocation, etc.)
82119
if (LOG.isTraceEnabled()) {
83120
LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
84121
}
85-
} catch (Exception e) {
122+
} catch (Throwable e) {
86123
// Other exceptions are interesting
87124
LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
88125
} finally {

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java

Lines changed: 3 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile;
1919

20-
import java.io.File;
21-
import java.io.FileInputStream;
22-
import java.io.FileOutputStream;
23-
import java.io.IOException;
24-
import java.util.HashMap;
2520
import java.util.Map;
2621
import java.util.concurrent.ConcurrentSkipListMap;
2722
import java.util.concurrent.Future;
@@ -42,25 +37,19 @@
4237
import org.slf4j.Logger;
4338
import org.slf4j.LoggerFactory;
4439

45-
import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;
46-
4740
@InterfaceAudience.Private
4841
public final class PrefetchExecutor {
4942

5043
private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
5144

5245
/** Futures for tracking block prefetch activity */
5346
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
54-
/** Set of files for which prefetch is completed */
55-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
56-
private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
5747
/** Executor pool shared among all HFiles for block prefetch */
5848
private static final ScheduledExecutorService prefetchExecutorPool;
5949
/** Delay before beginning prefetch */
6050
private static final int prefetchDelayMillis;
6151
/** Variation in prefetch delay times, to mitigate stampedes */
6252
private static final float prefetchDelayVariation;
63-
static String prefetchedFileListPath;
6453
static {
6554
// Consider doing this on demand with a configuration passed in rather
6655
// than in a static initializer.
@@ -90,13 +79,6 @@ public Thread newThread(Runnable r) {
9079
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");
9180

9281
public static void request(Path path, Runnable runnable) {
93-
if (prefetchCompleted != null) {
94-
if (isFilePrefetched(path.getName())) {
95-
LOG.info(
96-
"File has already been prefetched before the restart, so skipping prefetch : " + path);
97-
return;
98-
}
99-
}
10082
if (!prefetchPathExclude.matcher(path.toString()).find()) {
10183
long delay;
10284
if (prefetchDelayMillis > 0) {
@@ -122,8 +104,9 @@ public static void request(Path path, Runnable runnable) {
122104

123105
public static void complete(Path path) {
124106
prefetchFutures.remove(path);
125-
prefetchCompleted.put(path.getName(), true);
126-
LOG.debug("Prefetch completed for {}", path.getName());
107+
if (LOG.isDebugEnabled()) {
108+
LOG.debug("Prefetch completed for {}", path.getName());
109+
}
127110
}
128111

129112
public static void cancel(Path path) {
@@ -134,8 +117,6 @@ public static void cancel(Path path) {
134117
prefetchFutures.remove(path);
135118
LOG.debug("Prefetch cancelled for {}", path);
136119
}
137-
LOG.debug("Removing filename from the prefetched persistence list: {}", path.getName());
138-
removePrefetchedFileWhileEvict(path.getName());
139120
}
140121

141122
public static boolean isCompleted(Path path) {
@@ -146,70 +127,6 @@ public static boolean isCompleted(Path path) {
146127
return true;
147128
}
148129

149-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
150-
justification = "false positive, try-with-resources ensures close is called.")
151-
public static void persistToFile(String path) throws IOException {
152-
prefetchedFileListPath = path;
153-
if (prefetchedFileListPath == null) {
154-
LOG.info("Exception while persisting prefetch!");
155-
throw new IOException("Error persisting prefetched HFiles set!");
156-
}
157-
if (!prefetchCompleted.isEmpty()) {
158-
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) {
159-
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
160-
}
161-
}
162-
}
163-
164-
public static void retrieveFromFile(String path) throws IOException {
165-
prefetchedFileListPath = path;
166-
File prefetchPersistenceFile = new File(prefetchedFileListPath);
167-
if (!prefetchPersistenceFile.exists()) {
168-
LOG.warn("Prefetch persistence file does not exist!");
169-
return;
170-
}
171-
LOG.info("Retrieving from prefetch persistence file " + path);
172-
assert (prefetchedFileListPath != null);
173-
try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
174-
PersistentPrefetchProtos.PrefetchedHfileName proto =
175-
PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
176-
Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap();
177-
prefetchCompleted.putAll(protoPrefetchedFilesMap);
178-
}
179-
}
180-
181-
private static FileInputStream deleteFileOnClose(final File file) throws IOException {
182-
return new FileInputStream(file) {
183-
private File myFile;
184-
185-
private FileInputStream init(File file) {
186-
myFile = file;
187-
return this;
188-
}
189-
190-
@Override
191-
public void close() throws IOException {
192-
if (myFile == null) {
193-
return;
194-
}
195-
196-
super.close();
197-
if (!myFile.delete()) {
198-
throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
199-
}
200-
myFile = null;
201-
}
202-
}.init(file);
203-
}
204-
205-
public static void removePrefetchedFileWhileEvict(String hfileName) {
206-
prefetchCompleted.remove(hfileName);
207-
}
208-
209-
public static boolean isFilePrefetched(String hfileName) {
210-
return prefetchCompleted.containsKey(hfileName);
211-
}
212-
213130
private PrefetchExecutor() {
214131
}
215132
}

0 commit comments

Comments
 (0)