Skip to content

Commit fd69753

Browse files
committed
HBASE-27686: Recovery of BucketCache and Prefetched data after RS Crash
1 parent c2b64e7 commit fd69753

File tree

6 files changed

+282
-19
lines changed

6 files changed

+282
-19
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ public class CacheConfig {
9595

9696
public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path";
9797

98+
/**
99+
* Configuration key to set interval for persisting bucket cache to disk.
100+
* */
101+
public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY = "hbase.bucketcache.persist.intervalinmillis";
102+
98103
// Defaults
99104
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
100105
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public static void persistToFile(String path) throws IOException {
154154
throw new IOException("Error persisting prefetched HFiles set!");
155155
}
156156
if (!prefetchCompleted.isEmpty()) {
157-
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, true)) {
157+
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) {
158158
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
159159
}
160160
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.io.hfile.bucket;
1919

2020
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
21+
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
2122

2223
import java.io.File;
2324
import java.io.FileInputStream;
@@ -178,6 +179,7 @@ public class BucketCache implements BlockCache, HeapSize {
178179
private final BucketCacheStats cacheStats = new BucketCacheStats();
179180

180181
private final String persistencePath;
182+
static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
181183
private final long cacheCapacity;
182184
/** Approximate block size */
183185
private final long blockSize;
@@ -237,6 +239,8 @@ public class BucketCache implements BlockCache, HeapSize {
237239

238240
private String prefetchedFileListPath;
239241

242+
private long bucketcachePersistInterval;
243+
240244
private static final String FILE_VERIFY_ALGORITHM =
241245
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
242246
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
@@ -288,6 +292,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
288292
this.queueAdditionWaitTime =
289293
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
290294
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
295+
this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
291296

292297
sanityCheckConfigs();
293298

@@ -314,6 +319,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
314319
this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
315320

316321
if (ioEngine.isPersistent() && persistencePath != null) {
322+
BucketCachePersister cachePersister = new BucketCachePersister(this, bucketcachePersistInterval);
323+
cachePersister.start();
317324
try {
318325
retrieveFromFile(bucketSizes);
319326
} catch (IOException ioex) {
@@ -721,6 +728,14 @@ protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
721728
});
722729
}
723730

731+
public boolean isCacheInconsistent() {
732+
return isCacheInconsistent.get();
733+
}
734+
735+
public void setCacheInconsistent (boolean setCacheInconsistent) {
736+
isCacheInconsistent.set(setCacheInconsistent);
737+
}
738+
724739
/*
725740
* Statistics thread. Periodically output cache statistics to the log.
726741
*/
@@ -1167,6 +1182,9 @@ void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws Inte
11671182
// Only add if non-null entry.
11681183
if (bucketEntries[i] != null) {
11691184
putIntoBackingMap(key, bucketEntries[i]);
1185+
if (ioEngine.isPersistent()) {
1186+
setCacheInconsistent(true);
1187+
}
11701188
}
11711189
// Always remove from ramCache even if we failed adding it to the block cache above.
11721190
boolean existed = ramCache.remove(key, re -> {
@@ -1216,8 +1234,7 @@ static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
12161234
*/
12171235
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
12181236
justification = "false positive, try-with-resources ensures close is called.")
1219-
private void persistToFile() throws IOException {
1220-
assert !cacheEnabled;
1237+
void persistToFile() throws IOException {
12211238
if (!ioEngine.isPersistent()) {
12221239
throw new IOException("Attempt to persist non-persistent cache mappings!");
12231240
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hbase.io.hfile.bucket;
20+
21+
import org.apache.yetus.audience.InterfaceAudience;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
import java.io.IOException;
25+
26+
@InterfaceAudience.Private
27+
public class BucketCachePersister extends Thread {
28+
private final BucketCache cache;
29+
private final long intervalMillis;
30+
private static final Logger LOG = LoggerFactory.getLogger(BucketCachePersister.class);
31+
32+
public BucketCachePersister(BucketCache cache, long intervalMillis) {
33+
this.cache = cache;
34+
this.intervalMillis = intervalMillis;
35+
LOG.info("BucketCachePersister started with interval: " + intervalMillis);
36+
}
37+
38+
public void run() {
39+
while(true) {
40+
try {
41+
Thread.sleep(intervalMillis);
42+
if(cache.isCacheInconsistent()){
43+
LOG.debug("Cache is inconsistent, persisting to disk");
44+
cache.persistToFile();
45+
cache.setCacheInconsistent(false);
46+
}
47+
} catch (IOException | InterruptedException e) {
48+
LOG.info("Exception in BucketCachePersister" + e.getMessage());
49+
}
50+
}
51+
}
52+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public void setup() throws Exception {
8383
}
8484

8585
@Test
86-
public void testRegionClosePrefetchPersistence() throws Exception {
86+
public void testPrefetchPersistence() throws Exception {
87+
8788
// Write to table and flush
8889
TableName tableName = TableName.valueOf("table1");
8990
byte[] row0 = Bytes.toBytes("row1");
@@ -107,8 +108,13 @@ public void testRegionClosePrefetchPersistence() throws Exception {
107108
table.put(put1);
108109
TEST_UTIL.flush(tableName);
109110
} finally {
110-
Thread.sleep(1000);
111+
Thread.sleep(1500);
111112
}
113+
114+
// Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files should exist.
115+
assertTrue(new File(testDir + "/bucket.persistence").exists());
116+
assertTrue(new File(testDir + "/prefetch.persistence").exists());
117+
112118
// Stop the RS
113119
cluster.stopRegionServer(0);
114120
LOG.info("Stopped Region Server 0.");
@@ -118,27 +124,14 @@ public void testRegionClosePrefetchPersistence() throws Exception {
118124

119125
// Start the RS and validate
120126
cluster.startRegionServer();
121-
Thread.sleep(1000);
122-
assertFalse(new File(testDir + "/prefetch.persistence").exists());
123-
assertFalse(new File(testDir + "/bucket.persistence").exists());
124-
}
125-
126-
@Test
127-
public void testPrefetchPersistenceNegative() throws Exception {
128-
cluster.stopRegionServer(0);
129-
LOG.info("Stopped Region Server 0.");
130-
Thread.sleep(1000);
131-
assertFalse(new File(testDir + "/prefetch.persistence").exists());
132-
assertTrue(new File(testDir + "/bucket.persistence").exists());
133-
cluster.startRegionServer();
134-
Thread.sleep(1000);
135127
assertFalse(new File(testDir + "/prefetch.persistence").exists());
136128
assertFalse(new File(testDir + "/bucket.persistence").exists());
137129
}
138130

139131
@After
140132
public void tearDown() throws Exception {
141133
TEST_UTIL.shutdownMiniCluster();
134+
TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
142135
if (zkCluster != null) {
143136
zkCluster.shutdown();
144137
}

0 commit comments

Comments
 (0)