Skip to content

Commit 14b3899

Browse files
authored
HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time (#4805)
Co-authored-by: Rajeshbabu Chintaguntla <[email protected]>
1 parent 23a5633 commit 14b3899

File tree

9 files changed

+118
-26
lines changed

9 files changed

+118
-26
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,19 @@ public interface BlockCache extends Iterable<CachedBlock> {
3434
*/
3535
void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory);
3636

37+
/**
38+
* Add block to cache.
39+
* @param cacheKey The block's cache key.
40+
* @param buf The block contents wrapped in a ByteBuffer.
41+
* @param inMemory Whether block should be treated as in-memory
42+
* @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache is
43+
* configured.
44+
*/
45+
default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
46+
boolean waitWhenCache) {
47+
cacheBlock(cacheKey, buf, inMemory);
48+
}
49+
3750
/**
3851
* Add block to cache (defaults to not in-memory).
3952
* @param cacheKey The block's cache key.

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,17 @@ public long heapSize() {
5353

5454
@Override
5555
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
56+
cacheBlock(cacheKey, buf, inMemory, false);
57+
}
58+
59+
@Override
60+
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
61+
boolean waitWhenCache) {
5662
boolean metaBlock = isMetaBlock(buf.getBlockType());
5763
if (metaBlock) {
5864
l1Cache.cacheBlock(cacheKey, buf, inMemory);
5965
} else {
60-
l2Cache.cacheBlock(cacheKey, buf, inMemory);
66+
l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache);
6167
}
6268
}
6369

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,7 +1328,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
13281328
// Cache the block if necessary
13291329
cacheConf.getBlockCache().ifPresent(cache -> {
13301330
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
1331-
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
1331+
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
13321332
}
13331333
});
13341334

@@ -1341,8 +1341,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
13411341
// Cache the block if necessary
13421342
cacheConf.getBlockCache().ifPresent(cache -> {
13431343
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
1344-
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
1345-
cacheConf.isInMemory());
1344+
// Using the wait on cache during compaction and prefetching.
1345+
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheOnly);
13461346
}
13471347
});
13481348
if (unpacked != hfileBlock) {

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ private void doCacheOnWrite(long offset) {
550550
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
551551
try {
552552
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
553-
cacheFormatBlock);
553+
cacheFormatBlock, cacheConf.isInMemory(), true);
554554
} finally {
555555
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
556556
cacheFormatBlock.release();

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,6 @@ public class BucketCache implements BlockCache, HeapSize {
175175

176176
private static final int DEFAULT_CACHE_WAIT_TIME = 50;
177177

178-
/**
179-
* Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip
180-
* some blocks when caching. If the flag is true, we will wait until blocks are flushed to
181-
* IOEngine.
182-
*/
183-
boolean wait_when_cache = false;
184-
185178
private final BucketCacheStats cacheStats = new BucketCacheStats();
186179

187180
private final String persistencePath;
@@ -248,6 +241,10 @@ public class BucketCache implements BlockCache, HeapSize {
248241
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
249242
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
250243

244+
private static final String QUEUE_ADDITION_WAIT_TIME =
245+
"hbase.bucketcache.queue.addition.waittime";
246+
private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
247+
private long queueAdditionWaitTime;
251248
/**
252249
* Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
253250
* integrity, default algorithm is MD5
@@ -288,6 +285,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
288285
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
289286
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
290287
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
288+
this.queueAdditionWaitTime =
289+
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
291290
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
292291

293292
sanityCheckConfigs();
@@ -432,7 +431,19 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
432431
*/
433432
@Override
434433
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
435-
cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
434+
cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
435+
}
436+
437+
/**
438+
* Cache the block with the specified name and buffer.
439+
* @param cacheKey block's cache key
440+
* @param cachedItem block buffer
441+
* @param inMemory if block is in-memory
442+
*/
443+
@Override
444+
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
445+
boolean waitWhenCache) {
446+
cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
436447
}
437448

438449
/**
@@ -491,7 +502,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
491502
boolean successfulAddition = false;
492503
if (wait) {
493504
try {
494-
successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
505+
successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
495506
} catch (InterruptedException e) {
496507
Thread.currentThread().interrupt();
497508
}

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.hadoop.hbase.testclassification.IOTests;
5959
import org.apache.hadoop.hbase.testclassification.LargeTests;
6060
import org.apache.hadoop.hbase.util.Pair;
61+
import org.apache.hadoop.hbase.util.Threads;
6162
import org.junit.After;
6263
import org.junit.Assert;
6364
import org.junit.Before;
@@ -119,7 +120,6 @@ public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[
119120
int writerThreads, int writerQLen, String persistencePath) throws IOException {
120121
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
121122
persistencePath);
122-
super.wait_when_cache = true;
123123
}
124124

125125
@Override
@@ -241,16 +241,16 @@ public static void waitUntilAllFlushedToBucket(BucketCache cache) throws Interru
241241
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
242242
// threads will flush it to the bucket and put reference entry in backingMap.
243243
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
244-
Cacheable block) throws InterruptedException {
245-
cache.cacheBlock(cacheKey, block);
244+
Cacheable block, boolean waitWhenCache) throws InterruptedException {
245+
cache.cacheBlock(cacheKey, block, false, waitWhenCache);
246246
waitUntilFlushedToBucket(cache, cacheKey);
247247
}
248248

249249
@Test
250250
public void testMemoryLeak() throws Exception {
251251
final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
252252
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
253-
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
253+
new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
254254
long lockId = cache.backingMap.get(cacheKey).offset();
255255
ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
256256
lock.writeLock().lock();
@@ -265,7 +265,7 @@ public void run() {
265265
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
266266
assertEquals(0, cache.getBlockCount());
267267
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
268-
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
268+
new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
269269
assertEquals(1, cache.getBlockCount());
270270
lock.writeLock().unlock();
271271
evictThread.join();
@@ -341,7 +341,8 @@ private void testRetrievalUtils(Path testDir, String ioEngineName)
341341
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
342342
}
343343
for (HFileBlockPair block : blocks) {
344-
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
344+
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
345+
false);
345346
}
346347
usedSize = bucketCache.getAllocator().getUsedSize();
347348
assertNotEquals(0, usedSize);
@@ -403,7 +404,8 @@ public void testRetrieveFromFileWithoutPersistence() throws Exception {
403404
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
404405
}
405406
for (HFileBlockPair block : blocks) {
406-
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
407+
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
408+
false);
407409
}
408410
usedSize = bucketCache.getAllocator().getUsedSize();
409411
assertNotEquals(0, usedSize);
@@ -786,7 +788,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception {
786788

787789
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
788790
cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
789-
hfileBlockPair.getBlock());
791+
hfileBlockPair.getBlock(), false);
790792
}
791793
usedByteSize = bucketCache.getAllocator().getUsedSize();
792794
assertNotEquals(0, usedByteSize);
@@ -811,4 +813,63 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception {
811813
}
812814
}
813815

816+
/**
817+
* This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
818+
* could not be freed even if corresponding {@link HFileBlock} is evicted from
819+
* {@link BucketCache}.
820+
*/
821+
@Test
822+
public void testBlockAdditionWaitWhenCache() throws Exception {
823+
try {
824+
final Path dataTestDir = createAndGetTestDir();
825+
826+
String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
827+
String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
828+
829+
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
830+
constructedBlockSizes, 1, 1, persistencePath);
831+
long usedByteSize = bucketCache.getAllocator().getUsedSize();
832+
assertEquals(0, usedByteSize);
833+
834+
HFileBlockPair[] hfileBlockPairs =
835+
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
836+
// Add blocks
837+
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
838+
bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
839+
true);
840+
}
841+
842+
// Max wait for 10 seconds.
843+
long timeout = 10000;
844+
// Wait for blocks size to match the number of blocks.
845+
while (bucketCache.backingMap.size() != 10) {
846+
if (timeout <= 0) break;
847+
Threads.sleep(100);
848+
timeout = -100;
849+
}
850+
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
851+
assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
852+
}
853+
usedByteSize = bucketCache.getAllocator().getUsedSize();
854+
assertNotEquals(0, usedByteSize);
855+
// persist cache to file
856+
bucketCache.shutdown();
857+
assertTrue(new File(persistencePath).exists());
858+
859+
// restore cache from file
860+
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
861+
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
862+
assertFalse(new File(persistencePath).exists());
863+
assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
864+
865+
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
866+
BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
867+
bucketCache.evictBlock(blockCacheKey);
868+
}
869+
assertEquals(0, bucketCache.getAllocator().getUsedSize());
870+
assertEquals(0, bucketCache.backingMap.size());
871+
} finally {
872+
HBASE_TESTING_UTILITY.cleanupTestDir();
873+
}
874+
}
814875
}

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,6 @@ private void disableWriter() {
110110
// Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
111111
public void testBlockInRAMCache() throws IOException {
112112
cache = create(1, 1000);
113-
// Set this to true;
114-
cache.wait_when_cache = true;
115113
disableWriter();
116114
final String prefix = "testBlockInRamCache";
117115
try {

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ public void testPrefetchPersistence() throws Exception {
117117
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
118118
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
119119
testDir + "/bucket.persistence", 60 * 1000, conf);
120-
bucketCache.wait_when_cache = true;
121120
cacheConf = new CacheConfig(conf, bucketCache);
122121

123122
long usedSize = bucketCache.getAllocator().getUsedSize();
@@ -137,7 +136,6 @@ public void testPrefetchPersistence() throws Exception {
137136
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
138137
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
139138
testDir + "/bucket.persistence", 60 * 1000, conf);
140-
bucketCache.wait_when_cache = true;
141139
assertFalse(new File(testDir + "/bucket.persistence").exists());
142140
assertFalse(new File(testDir + "/prefetch.persistence").exists());
143141
assertTrue(usedSize != 0);

hbase-server/src/test/resources/hbase-site.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,4 +277,9 @@
277277
<value>3</value>
278278
<description>Default is unbounded</description>
279279
</property>
280+
<property>
281+
<name>hbase.bucketcache.queue.addition.waittime</name>
282+
<value>100</value>
283+
<description>Default is 0</description>
284+
</property>
280285
</configuration>

0 commit comments

Comments
 (0)