Skip to content

Commit e48c448

Browse files
authored
HBASE-27632 Refactor WAL.Reader implementation so we can better support WAL splitting and replication (#5055)
Signed-off-by: GeorryHuang <[email protected]>
1 parent c013c7c commit e48c448

File tree

63 files changed

+2568
-2096
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+2568
-2096
lines changed

hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,9 @@
2626
import org.apache.hadoop.hbase.client.TableDescriptor;
2727
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
2828
import org.apache.hadoop.hbase.io.hfile.HFile;
29-
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
3029
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
3130
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
3231
import org.apache.hadoop.hbase.util.EncryptionTest;
33-
import org.apache.hadoop.hbase.wal.WAL.Reader;
3432
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
3533
import org.apache.hadoop.util.ToolRunner;
3634
import org.junit.Before;
@@ -54,8 +52,6 @@ public void setUpCluster() throws Exception {
5452
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
5553
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
5654
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
57-
conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
58-
Reader.class);
5955
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
6056
Writer.class);
6157
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@
3232
import org.apache.hadoop.fs.LocatedFileStatus;
3333
import org.apache.hadoop.fs.Path;
3434
import org.apache.hadoop.fs.RemoteIterator;
35+
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
3536
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
3637
import org.apache.hadoop.hbase.wal.WAL;
3738
import org.apache.hadoop.hbase.wal.WAL.Entry;
38-
import org.apache.hadoop.hbase.wal.WAL.Reader;
3939
import org.apache.hadoop.hbase.wal.WALEdit;
40+
import org.apache.hadoop.hbase.wal.WALFactory;
4041
import org.apache.hadoop.hbase.wal.WALKey;
42+
import org.apache.hadoop.hbase.wal.WALStreamReader;
4143
import org.apache.hadoop.io.Writable;
4244
import org.apache.hadoop.mapreduce.InputFormat;
4345
import org.apache.hadoop.mapreduce.InputSplit;
@@ -135,7 +137,7 @@ public String toString() {
135137
* HLogInputFormat.
136138
*/
137139
static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
138-
private Reader reader = null;
140+
private WALStreamReader reader = null;
139141
// visible until we can remove the deprecated HLogInputFormat
140142
Entry currentEntry = new Entry();
141143
private long startTime;
@@ -144,6 +146,47 @@ static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K,
144146
private Path logFile;
145147
private long currentPos;
146148

149+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
150+
justification = "HDFS-4380")
151+
private WALStreamReader openReader(Path path, long startPosition) throws IOException {
152+
long retryInterval = 2000; // 2 sec
153+
int maxAttempts = 30;
154+
int attempt = 0;
155+
Exception ee = null;
156+
WALStreamReader reader = null;
157+
while (reader == null && attempt++ < maxAttempts) {
158+
try {
159+
// Detect if this is a new file, if so get a new reader else
160+
// reset the current reader so that we see the new data
161+
reader =
162+
WALFactory.createStreamReader(path.getFileSystem(conf), path, conf, startPosition);
163+
return reader;
164+
} catch (LeaseNotRecoveredException lnre) {
165+
// HBASE-15019 the WAL was not closed due to some hiccup.
166+
LOG.warn("Try to recover the WAL lease " + path, lnre);
167+
AbstractFSWALProvider.recoverLease(conf, path);
168+
reader = null;
169+
ee = lnre;
170+
} catch (NullPointerException npe) {
171+
// Workaround for race condition in HDFS-4380
172+
// which throws a NPE if we open a file before any data node has the most recent block
173+
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
174+
LOG.warn("Got NPE opening reader, will retry.");
175+
reader = null;
176+
ee = npe;
177+
}
178+
if (reader == null) {
179+
// sleep before next attempt
180+
try {
181+
Thread.sleep(retryInterval);
182+
} catch (InterruptedException e) {
183+
Thread.currentThread().interrupt();
184+
}
185+
}
186+
}
187+
throw new IOException("Could not open reader", ee);
188+
}
189+
147190
@Override
148191
public void initialize(InputSplit split, TaskAttemptContext context)
149192
throws IOException, InterruptedException {
@@ -158,8 +201,7 @@ public void initialize(InputSplit split, TaskAttemptContext context)
158201

159202
private void openReader(Path path) throws IOException {
160203
closeReader();
161-
reader = AbstractFSWALProvider.openReader(path, conf);
162-
seek();
204+
reader = openReader(path, currentPos > 0 ? currentPos : -1);
163205
setCurrentPath(path);
164206
}
165207

@@ -174,12 +216,6 @@ private void closeReader() throws IOException {
174216
}
175217
}
176218

177-
private void seek() throws IOException {
178-
if (currentPos != 0) {
179-
reader.seek(currentPos);
180-
}
181-
}
182-
183219
@Override
184220
public boolean nextKeyValue() throws IOException, InterruptedException {
185221
if (reader == null) {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,9 @@ private static HRegion open(Configuration conf, TableDescriptor td, RegionInfo r
271271
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
272272
conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
273273
replayEditsDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()).toString());
274+
// we do not do WAL splitting here so it is possible to have uncleanly closed WAL files, so we
275+
// need to ignore EOFException.
276+
conf.setBoolean(HRegion.RECOVERED_EDITS_IGNORE_EOF, true);
274277
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
275278
}
276279

hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hbase.wal.WALFactory;
3838
import org.apache.hadoop.hbase.wal.WALKey;
3939
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
40+
import org.apache.hadoop.hbase.wal.WALStreamReader;
4041
import org.apache.yetus.audience.InterfaceAudience;
4142
import org.apache.yetus.audience.InterfaceStability;
4243

@@ -88,7 +89,7 @@ protected void processOptions(CommandLine cmd) {
8889
protected int doWork() throws Exception {
8990
Path path = new Path(file);
9091
FileSystem fs = path.getFileSystem(conf);
91-
try (WAL.Reader reader = WALFactory.createReader(fs, path, conf)) {
92+
try (WALStreamReader reader = WALFactory.createStreamReader(fs, path, conf)) {
9293
for (;;) {
9394
WAL.Entry entry = reader.next();
9495
if (entry == null) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@
182182
import org.apache.hadoop.hbase.wal.WALKeyImpl;
183183
import org.apache.hadoop.hbase.wal.WALSplitUtil;
184184
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
185+
import org.apache.hadoop.hbase.wal.WALStreamReader;
185186
import org.apache.hadoop.util.StringUtils;
186187
import org.apache.yetus.audience.InterfaceAudience;
187188
import org.slf4j.Logger;
@@ -268,6 +269,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
268269
public static final String SPECIAL_RECOVERED_EDITS_DIR =
269270
"hbase.hregion.special.recovered.edits.dir";
270271

272+
/**
273+
* Mainly used for master local region, where we will replay the WAL file directly without
274+
* splitting, so it is possible to have WAL files which are not closed cleanly, in this way,
275+
* hitting EOF is expected so should not consider it as a critical problem.
276+
*/
277+
public static final String RECOVERED_EDITS_IGNORE_EOF =
278+
"hbase.hregion.recovered.edits.ignore.eof";
279+
271280
/**
272281
* Whether to use {@link MetaCellComparator} even if we are not meta region. Used when creating
273282
* master local region.
@@ -5533,9 +5542,7 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
55335542
MonitoredTask status = TaskMonitor.get().createStatus(msg);
55345543

55355544
status.setStatus("Opening recovered edits");
5536-
WAL.Reader reader = null;
5537-
try {
5538-
reader = WALFactory.createReader(fs, edits, conf);
5545+
try (WALStreamReader reader = WALFactory.createStreamReader(fs, edits, conf)) {
55395546
long currentEditSeqId = -1;
55405547
long currentReplaySeqId = -1;
55415548
long firstSeqIdInLog = -1;
@@ -5689,12 +5696,17 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
56895696
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
56905697
}
56915698
} catch (EOFException eof) {
5692-
Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
5693-
msg = "EnLongAddered EOF. Most likely due to Master failure during "
5694-
+ "wal splitting, so we have this data in another edit. Continuing, but renaming " + edits
5695-
+ " as " + p + " for region " + this;
5696-
LOG.warn(msg, eof);
5697-
status.abort(msg);
5699+
if (!conf.getBoolean(RECOVERED_EDITS_IGNORE_EOF, false)) {
5700+
Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
5701+
msg = "EnLongAddered EOF. Most likely due to Master failure during "
5702+
+ "wal splitting, so we have this data in another edit. Continuing, but renaming "
5703+
+ edits + " as " + p + " for region " + this;
5704+
LOG.warn(msg, eof);
5705+
status.abort(msg);
5706+
} else {
5707+
LOG.warn("EOF while replaying recover edits and config '{}' is true so "
5708+
+ "we will ignore it and continue", RECOVERED_EDITS_IGNORE_EOF, eof);
5709+
}
56985710
} catch (IOException ioe) {
56995711
// If the IOE resulted from bad file format,
57005712
// then this problem is idempotent and retrying won't help
@@ -5721,9 +5733,6 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
57215733
return currentEditSeqId;
57225734
} finally {
57235735
status.cleanup();
5724-
if (reader != null) {
5725-
reader.close();
5726-
}
57275736
}
57285737
}
57295738

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,9 @@
110110
* <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
111111
* (smaller) than the most-recent flush.
112112
* <p>
113-
* To read an WAL, call
114-
* {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
113+
* To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read,
114+
* call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for
115+
* replication where we may want to tail the active WAL file.
115116
* <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
116117
* is now a lame duck; any more appends or syncs will fail also with the same original exception. If
117118
* we have made successful appends to the WAL and we then are unable to sync them, our current

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
*/
1818
package org.apache.hadoop.hbase.regionserver.wal;
1919

20-
import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
21-
import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
20+
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
21+
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC;
22+
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC;
23+
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.WAL_TRAILER_WARN_SIZE;
2224

2325
import java.io.IOException;
2426
import java.io.OutputStream;
@@ -185,8 +187,7 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita
185187
headerBuilder.setValueCompressionAlgorithm(
186188
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
187189
}
188-
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
189-
buildWALHeader(conf, headerBuilder)));
190+
length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildWALHeader(conf, headerBuilder)));
190191

191192
initAfterHeader(doCompress);
192193

@@ -257,7 +258,7 @@ protected void writeWALTrailer() {
257258
LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize
258259
+ " > " + this.trailerWarnSize);
259260
}
260-
length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC));
261+
length.set(writeWALTrailerAndMagic(trailer, PB_WAL_COMPLETE_MAGIC));
261262
this.trailerWritten = true;
262263
} catch (IOException ioe) {
263264
LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);

0 commit comments

Comments
 (0)