Skip to content

Commit fe24099

Browse files
committed
HBASE-22539 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used (#437)
Signed-off-by: Zheng Hu <[email protected]>
1 parent 90f27fe commit fe24099

File tree

11 files changed

+379
-33
lines changed

11 files changed

+379
-33
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.ArrayList;
2424
import java.util.List;
2525
import java.util.Optional;
26-
26+
import java.util.concurrent.atomic.AtomicInteger;
2727
import org.apache.hadoop.hbase.CellScanner;
2828
import org.apache.hadoop.hbase.DoNotRetryIOException;
2929
import org.apache.yetus.audience.InterfaceAudience;
@@ -51,7 +51,7 @@
5151
* the result.
5252
*/
5353
@InterfaceAudience.Private
54-
abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
54+
public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
5555

5656
protected final int id; // the client's call id
5757
protected final BlockingService service;
@@ -91,8 +91,14 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
9191
private long exceptionSize = 0;
9292
private final boolean retryImmediatelySupported;
9393

94-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
95-
justification="Can't figure why this complaint is happening... see below")
94+
// This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
95+
// second bit is for WAL reference. We can only call release if both of them are zero. The reason
96+
// why we can not use a general reference counting is that, we may call cleanup multiple times in
97+
// the current implementation. We should fix this in the future.
98+
private final AtomicInteger reference = new AtomicInteger(0b01);
99+
100+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
101+
justification = "Can't figure why this complaint is happening... see below")
96102
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
97103
Message param, CellScanner cellScanner, T connection, long size,
98104
InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
@@ -141,14 +147,43 @@ public void done() {
141147
cleanup();
142148
}
143149

150+
private void release(int mask) {
151+
for (;;) {
152+
int ref = reference.get();
153+
if ((ref & mask) == 0) {
154+
return;
155+
}
156+
int nextRef = ref & (~mask);
157+
if (reference.compareAndSet(ref, nextRef)) {
158+
if (nextRef == 0) {
159+
if (this.reqCleanup != null) {
160+
this.reqCleanup.run();
161+
}
162+
}
163+
return;
164+
}
165+
}
166+
}
167+
144168
@Override
145169
public void cleanup() {
146-
if (this.reqCleanup != null) {
147-
this.reqCleanup.run();
148-
this.reqCleanup = null;
170+
release(0b01);
171+
}
172+
173+
public void retainByWAL() {
174+
for (;;) {
175+
int ref = reference.get();
176+
int nextRef = ref | 0b10;
177+
if (reference.compareAndSet(ref, nextRef)) {
178+
return;
179+
}
149180
}
150181
}
151182

183+
public void releaseByWAL() {
184+
release(0b10);
185+
}
186+
152187
@Override
153188
public String toString() {
154189
return toShortString() + " param: " +

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import org.apache.hadoop.hbase.client.RegionInfo;
5757
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
5858
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
59+
import org.apache.hadoop.hbase.ipc.RpcServer;
60+
import org.apache.hadoop.hbase.ipc.ServerCall;
5961
import org.apache.hadoop.hbase.log.HBaseMarkers;
6062
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
6163
import org.apache.hadoop.hbase.trace.TraceUtil;
@@ -901,7 +903,7 @@ boolean isUnflushedEntries() {
901903
* Exposed for testing only. Use to tricks like halt the ring buffer appending.
902904
*/
903905
@VisibleForTesting
904-
void atHeadOfRingBufferEventHandlerAppend() {
906+
protected void atHeadOfRingBufferEventHandlerAppend() {
905907
// Noop
906908
}
907909

@@ -977,8 +979,10 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
977979
txidHolder.setValue(ringBuffer.next());
978980
});
979981
long txid = txidHolder.longValue();
982+
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
983+
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
980984
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
981-
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
985+
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
982986
entry.stampRegionSequenceId(we);
983987
ringBuffer.get(txid).load(entry);
984988
} finally {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,9 @@ private void syncFailed(long epochWhenSync, Throwable error) {
321321
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
322322
highestSyncedTxid.set(processedTxid);
323323
for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
324-
if (iter.next().getTxid() <= processedTxid) {
324+
FSWALEntry entry = iter.next();
325+
if (entry.getTxid() <= processedTxid) {
326+
entry.release();
325327
iter.remove();
326328
} else {
327329
break;

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.lmax.disruptor.TimeoutException;
2525
import com.lmax.disruptor.dsl.Disruptor;
2626
import com.lmax.disruptor.dsl.ProducerType;
27-
2827
import java.io.IOException;
2928
import java.io.OutputStream;
3029
import java.util.Arrays;
@@ -34,7 +33,6 @@
3433
import java.util.concurrent.LinkedBlockingQueue;
3534
import java.util.concurrent.TimeUnit;
3635
import java.util.concurrent.atomic.AtomicInteger;
37-
3836
import org.apache.hadoop.conf.Configuration;
3937
import org.apache.hadoop.fs.FSDataOutputStream;
4038
import org.apache.hadoop.fs.FileSystem;
@@ -59,6 +57,7 @@
5957
import org.apache.yetus.audience.InterfaceAudience;
6058
import org.slf4j.Logger;
6159
import org.slf4j.LoggerFactory;
60+
6261
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
6362

6463
/**
@@ -963,7 +962,6 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
963962
//TODO handle htrace API change, see HBASE-18895
964963
//TraceScope scope = Trace.continueSpan(entry.detachSpan());
965964
try {
966-
967965
if (this.exception != null) {
968966
// Return to keep processing events coming off the ringbuffer
969967
return;
@@ -980,6 +978,8 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
980978
: new DamagedWALException("On sync", this.exception));
981979
// Return to keep processing events coming off the ringbuffer
982980
return;
981+
} finally {
982+
entry.release();
983983
}
984984
} else {
985985
// What is this if not an append or sync. Fail all up to this!!!

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717
*/
1818
package org.apache.hadoop.hbase.regionserver.wal;
1919

20-
import static java.util.stream.Collectors.toCollection;
21-
2220
import java.io.IOException;
2321
import java.util.Collections;
2422
import java.util.List;
23+
import java.util.Optional;
2524
import java.util.Set;
2625
import java.util.TreeSet;
27-
2826
import org.apache.hadoop.hbase.Cell;
29-
import org.apache.hadoop.hbase.CellComparator;
3027
import org.apache.hadoop.hbase.CellUtil;
3128
import org.apache.hadoop.hbase.PrivateCellUtil;
3229
import org.apache.hadoop.hbase.client.RegionInfo;
30+
import org.apache.hadoop.hbase.ipc.ServerCall;
3331
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
3432
import org.apache.hadoop.hbase.util.Bytes;
3533
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -56,19 +54,24 @@ class FSWALEntry extends Entry {
5654
private final transient boolean inMemstore;
5755
private final transient RegionInfo regionInfo;
5856
private final transient Set<byte[]> familyNames;
57+
private final transient Optional<ServerCall<?>> rpcCall;
5958

60-
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit,
61-
final RegionInfo regionInfo, final boolean inMemstore) {
59+
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
60+
final boolean inMemstore, ServerCall<?> rpcCall) {
6261
super(key, edit);
6362
this.inMemstore = inMemstore;
6463
this.regionInfo = regionInfo;
6564
this.txid = txid;
6665
if (inMemstore) {
6766
// construct familyNames here to reduce the work of log sinker.
68-
Set<byte []> families = edit.getFamilies();
69-
this.familyNames = families != null? families: collectFamilies(edit.getCells());
67+
Set<byte[]> families = edit.getFamilies();
68+
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
7069
} else {
71-
this.familyNames = Collections.<byte[]>emptySet();
70+
this.familyNames = Collections.<byte[]> emptySet();
71+
}
72+
this.rpcCall = Optional.ofNullable(rpcCall);
73+
if (rpcCall != null) {
74+
rpcCall.retainByWAL();
7275
}
7376
}
7477

@@ -77,12 +80,13 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
7780
if (CollectionUtils.isEmpty(cells)) {
7881
return Collections.emptySet();
7982
} else {
80-
return cells.stream()
81-
.filter(v -> !CellUtil.matchingFamily(v, WALEdit.METAFAMILY))
82-
.collect(toCollection(() -> new TreeSet<>(CellComparator.getInstance()::compareFamilies)))
83-
.stream()
84-
.map(CellUtil::cloneFamily)
85-
.collect(toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
83+
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
84+
for (Cell cell: cells) {
85+
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
86+
set.add(CellUtil.cloneFamily(cell));
87+
}
88+
}
89+
return set;
8690
}
8791
}
8892

@@ -129,4 +133,8 @@ long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws
129133
Set<byte[]> getFamilyNames() {
130134
return familyNames;
131135
}
136+
137+
void release() {
138+
rpcCall.ifPresent(ServerCall::releaseByWAL);
139+
}
132140
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,9 +1158,8 @@ private WALEdit createWALEdit(final byte[] rowName, final byte[] family, Environ
11581158
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
11591159
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
11601160
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
1161-
FSWALEntry entry =
1162-
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
1163-
rowName, family, ee, index), hri, true);
1161+
FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
1162+
createWALEdit(rowName, family, ee, index), hri, true, null);
11641163
entry.stampRegionSequenceId(mvcc.begin());
11651164
return entry;
11661165
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir
112112
AsyncFSWAL asyncFSWAL = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners,
113113
failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) {
114114
@Override
115-
void atHeadOfRingBufferEventHandlerAppend() {
115+
protected void atHeadOfRingBufferEventHandlerAppend() {
116116
action.run();
117117
super.atHeadOfRingBufferEventHandlerAppend();
118118
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir
8989
conf, listeners, failIfWALExists, prefix, suffix) {
9090

9191
@Override
92-
void atHeadOfRingBufferEventHandlerAppend() {
92+
protected void atHeadOfRingBufferEventHandlerAppend() {
9393
action.run();
9494
super.atHeadOfRingBufferEventHandlerAppend();
9595
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.wal;
19+
20+
import java.io.IOException;
21+
import java.util.List;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.hbase.HBaseClassTestRule;
26+
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
27+
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
28+
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
29+
import org.apache.hadoop.hbase.testclassification.MediumTests;
30+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
31+
import org.apache.hadoop.hbase.util.CommonFSUtils;
32+
import org.apache.hadoop.hbase.util.Pair;
33+
import org.junit.AfterClass;
34+
import org.junit.BeforeClass;
35+
import org.junit.ClassRule;
36+
import org.junit.experimental.categories.Category;
37+
38+
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
39+
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
40+
41+
/**
42+
* Testcase for HBASE-22539
43+
*/
44+
@Category({ RegionServerTests.class, MediumTests.class })
45+
public class TestAsyncFSWALCorruptionDueToDanglingByteBuffer
46+
extends WALCorruptionDueToDanglingByteBufferTestBase {
47+
48+
@ClassRule
49+
public static final HBaseClassTestRule CLASS_RULE =
50+
HBaseClassTestRule.forClass(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class);
51+
52+
public static final class PauseWAL extends AsyncFSWAL {
53+
54+
public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
55+
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
56+
String prefix, String suffix, EventLoopGroup eventLoopGroup,
57+
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
58+
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
59+
eventLoopGroup, channelClass);
60+
}
61+
62+
@Override
63+
protected void atHeadOfRingBufferEventHandlerAppend() {
64+
if (ARRIVE != null) {
65+
ARRIVE.countDown();
66+
try {
67+
RESUME.await();
68+
} catch (InterruptedException e) {
69+
}
70+
}
71+
}
72+
}
73+
74+
public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> {
75+
76+
private EventLoopGroup eventLoopGroup;
77+
78+
private Class<? extends Channel> channelClass;
79+
80+
@Override
81+
protected PauseWAL createWAL() throws IOException {
82+
return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
83+
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
84+
conf, listeners, true, logPrefix,
85+
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
86+
channelClass);
87+
}
88+
89+
@Override
90+
protected void doInit(Configuration conf) throws IOException {
91+
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
92+
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
93+
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
94+
channelClass = eventLoopGroupAndChannelClass.getSecond();
95+
}
96+
}
97+
98+
@BeforeClass
99+
public static void setUp() throws Exception {
100+
UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class,
101+
WALProvider.class);
102+
UTIL.startMiniCluster(1);
103+
UTIL.createTable(TABLE_NAME, CF);
104+
UTIL.waitTableAvailable(TABLE_NAME);
105+
}
106+
107+
@AfterClass
108+
public static void tearDown() throws Exception {
109+
UTIL.shutdownMiniCluster();
110+
}
111+
}

0 commit comments

Comments
 (0)