|
18 | 18 | package org.apache.hadoop.hbase.io.hfile.bucket; |
19 | 19 |
|
20 | 20 | import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; |
| 21 | +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; |
21 | 22 | import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; |
| 23 | +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; |
| 24 | +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; |
22 | 25 | import static org.junit.Assert.assertEquals; |
23 | 26 | import static org.junit.Assert.assertNull; |
24 | 27 |
|
@@ -141,6 +144,64 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { |
141 | 144 | TEST_UTIL.cleanupTestDir(); |
142 | 145 | } |
143 | 146 |
|
| 147 | + @Test |
| 148 | + public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception { |
| 149 | + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); |
| 150 | + Path testDir = TEST_UTIL.getDataTestDir(); |
| 151 | + TEST_UTIL.getTestFileSystem().mkdirs(testDir); |
| 152 | + Configuration conf = HBaseConfiguration.create(); |
| 153 | + // Disables the persister thread by setting its interval to MAX_VALUE |
| 154 | + conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); |
| 155 | + conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); |
| 156 | + conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); |
| 157 | + conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); |
| 158 | + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; |
| 159 | + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192, |
| 160 | + bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", |
| 161 | + DEFAULT_ERROR_TOLERATION_DURATION, conf); |
| 162 | + |
| 163 | + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5); |
| 164 | + |
| 165 | + // Add four blocks |
| 166 | + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); |
| 167 | + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); |
| 168 | + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); |
| 169 | + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); |
| 170 | + |
| 171 | + // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it |
| 172 | + // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency |
| 173 | + BucketEntry bucketEntry = |
| 174 | + new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(), |
| 175 | + blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(), |
| 176 | + 0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator()); |
| 177 | + bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer()); |
| 178 | + bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry); |
| 179 | + |
| 180 | + // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The |
| 181 | + // 5th block has same cache offset as the first |
| 182 | + bucketCache.persistToFile(); |
| 183 | + |
| 184 | + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, |
| 185 | + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", |
| 186 | + DEFAULT_ERROR_TOLERATION_DURATION, conf); |
| 187 | + while (!newBucketCache.getBackingMapValidated().get()) { |
| 188 | + Thread.sleep(10); |
| 189 | + } |
| 190 | + |
| 191 | + assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, false)); |
| 192 | + // The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry |
| 193 | + // or null based on different ordering of the keys in the backing map. |
| 194 | + // Hence, skipping the check for that key. |
| 195 | + assertEquals(blocks[1].getBlock(), |
| 196 | + newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); |
| 197 | + assertEquals(blocks[2].getBlock(), |
| 198 | + newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); |
| 199 | + assertEquals(blocks[3].getBlock(), |
| 200 | + newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); |
| 201 | + assertEquals(4, newBucketCache.backingMap.size()); |
| 202 | + TEST_UTIL.cleanupTestDir(); |
| 203 | + } |
| 204 | + |
144 | 205 | private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) |
145 | 206 | throws InterruptedException { |
146 | 207 | while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { |
|
0 commit comments