Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2606,6 +2606,11 @@ protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
try {
if (wal != null) {
// Get the next sequenceid. The method will not return until mvcc has caught up to the
// returned flushOpSeqId. This means all edits have been committed up to this point.
// This must be done before we call wal#startCacheFlush else edits may complete AFTER
// the call to startCacheFlush.
flushOpSeqId = getNextSequenceId(wal);
Long earliestUnflushedSequenceIdForTheRegion =
wal.startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq);
if (earliestUnflushedSequenceIdForTheRegion == null) {
Expand All @@ -2616,7 +2621,6 @@ protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
myseqid);
}
flushOpSeqId = getNextSequenceId(wal);
// Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit
flushedSeqId =
earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
Expand Down Expand Up @@ -2686,7 +2690,8 @@ private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId
}
}
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
LOG.info("Flushing " + storesToFlush.size() + "/" + stores.size() + " column families," +
LOG.info("Flushing " + this.getRegionInfo().getEncodedName() + " " +
storesToFlush.size() + "/" + stores.size() + " column families," +
" dataSize=" + StringUtils.byteDesc(mss.getDataSize()) +
" heapSize=" + StringUtils.byteDesc(mss.getHeapSize()) +
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1917,7 +1917,7 @@ private void startServices() throws IOException {
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
}

this.walRoller = new LogRoller(this, this);
this.walRoller = new LogRoller(this);
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
this.procedureResultReporter = new RemoteProcedureResultReporter(this);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -27,8 +27,8 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
Expand Down Expand Up @@ -56,7 +56,6 @@
public class LogRoller extends HasThread implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
private final Server server;
protected final RegionServerServices services;
private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
Expand Down Expand Up @@ -99,16 +98,14 @@ public void requestRollAll() {
}
}

/** @param server */
public LogRoller(final Server server, final RegionServerServices services) {
public LogRoller(RegionServerServices services) {
super("LogRoller");
this.server = server;
this.services = services;
this.rollPeriod = this.server.getConfiguration().
this.rollPeriod = this.services.getConfiguration().
getLong("hbase.regionserver.logroll.period", 3600000);
this.threadWakeFrequency = this.server.getConfiguration().
this.threadWakeFrequency = this.services.getConfiguration().
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.checkLowReplicationInterval = this.server.getConfiguration().getLong(
this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
}

Expand Down Expand Up @@ -144,7 +141,7 @@ private void abort(String reason, Throwable cause) {
LOG.warn("Failed to shutdown wal", e);
}
}
server.abort(reason, cause);
this.services.abort(reason, cause);
}

@Override
Expand Down Expand Up @@ -183,12 +180,22 @@ public void run() {
WAL wal = entry.getKey();
// reset the flag in front to avoid missing roll request before we return from rollWriter.
walNeedsRoll.put(wal, Boolean.FALSE);
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
if (regionsToFlush != null) {
for (byte[] r : regionsToFlush) {
scheduleFlush(Bytes.toString(r));
try {
scheduleFlush(Bytes.toString(r));
} catch (NotOnlineException e) {
if (wal instanceof AbstractFSWAL) {
LOG.warn(e.toString() + " ... running a purge of sequenceidaccounting");
AbstractFSWAL awal = (AbstractFSWAL)wal;
awal.purge(r);
} else {
LOG.warn(e.toString());
}
}
}
}
}
Expand All @@ -206,19 +213,27 @@ public void run() {
LOG.info("LogRoller exiting.");
}

/**
* Used internally. Thrown if we failed to schedule a flush because Region was
* not online.
*/
private class NotOnlineException extends HBaseIOException {
NotOnlineException(String message) {
super(message);
}
}

/**
* @param encodedRegionName Encoded name of region to flush.
*/
private void scheduleFlush(String encodedRegionName) {
HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
private void scheduleFlush(String encodedRegionName) throws NotOnlineException {
HRegion r = (HRegion)this.services.getRegion(encodedRegionName);
if (r == null) {
LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName);
return;
throw new NotOnlineException(encodedRegionName);
}
FlushRequester requester = this.services.getFlushRequester();
if (requester == null) {
LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null",
encodedRegionName, r);
LOG.warn("Failed to schedule flush of {} because FlushRequester is null", encodedRegionName);
return;
}
// force flushing all stores to clean old logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
private static final class WalProps {

/**
* Map the encoded region name to the highest sequence id. Contain all the regions it has
* entries of
* Map the encoded region name to the highest sequence id.
* Contains all the regions it has an entry for.
*/
public final Map<byte[], Long> encodedName2HighestSequenceId;

Expand Down Expand Up @@ -498,6 +498,15 @@ public void abortCacheFlush(byte[] encodedRegionName) {
this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
}

/**
* Temporary. To be removed. Used debugging an accounting problem in
* this.sequenceIdAccounting. Called when we are asked to force flush
* a region already closed.
*/
public void purge(byte [] encodedRegionName) {
this.sequenceIdAccounting.purge(encodedRegionName);
}

@Override
public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
// Used by tests. Deprecated as too subtle for general usage.
Expand Down Expand Up @@ -610,9 +619,9 @@ public int getNumLogFiles() {
}

/**
* If the number of un-archived WAL files is greater than maximum allowed, check the first
* (oldest) WAL file, and returns those regions which should be flushed so that it can be
* archived.
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
* check the first (oldest) WAL, and return those regions which should be flushed so that
* it can be let-go/'archived'.
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
*/
byte[][] findRegionsToForceFlush() throws IOException {
Expand Down Expand Up @@ -888,10 +897,6 @@ public void close() throws IOException {
/**
* updates the sequence number of a specific store. depending on the flag: replaces current seq
* number if the given seq id is bigger, or even if it is lower than existing one
* @param encodedRegionName
* @param familyName
* @param sequenceid
* @param onlyIfGreater
*/
@Override
public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
* <p>
* Accounting of sequence ids per region and then by column family. So we can our accounting
* Accounting of sequence ids per region and then by column family. So we can keep our accounting
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can
* keep abreast of the state of sequence id persistence. Also call update per append.
* </p>
* <p>
* For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by
* For the implementation, we assume that all the {@code encodedRegionName} passed in are gotten by
* {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use
* it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because
* hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See
Expand All @@ -52,8 +50,8 @@
*/
@InterfaceAudience.Private
class SequenceIdAccounting {

private static final Logger LOG = LoggerFactory.getLogger(SequenceIdAccounting.class);

/**
* This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and
* {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the
Expand Down Expand Up @@ -109,7 +107,6 @@ class SequenceIdAccounting {

/**
* Returns the lowest unflushed sequence id for the region.
* @param encodedRegionName
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will
* return {@link HConstants#NO_SEQNUM} when none.
*/
Expand All @@ -124,8 +121,6 @@ long getLowestSequenceId(final byte[] encodedRegionName) {
}

/**
* @param encodedRegionName
* @param familyName
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionname</code> and
* <code>familyName</code>. Returned sequenceid may be for an edit currently being
* flushed.
Expand Down Expand Up @@ -336,6 +331,37 @@ void completeCacheFlush(final byte[] encodedRegionName) {
}
}

/**
* Workaround while there is a bug in accounting.
* TO BE REMOVED. Called when we try to flush a Region that has been closed.
*/
void purge(byte [] encodeRegionName) {
Map<ImmutableByteArray, Long> flushingSequenceId = null;
Map<ImmutableByteArray, Long> lowestUnflushedSequenceId = null;
Long highestSequenceId = null;
synchronized (tieLock) {
flushingSequenceId = this.flushingSequenceIds.remove(encodeRegionName);
lowestUnflushedSequenceId = this.lowestUnflushedSequenceIds.remove(encodeRegionName);
highestSequenceId = this.highestSequenceIds.remove(encodeRegionName);
}
String name = Bytes.toString(encodeRegionName);
printByteLongMapWARN(flushingSequenceId, "FlushingSequenceIds", name);
printByteLongMapWARN(lowestUnflushedSequenceId, "LowestUnflushedSequenceId", name);
if (highestSequenceId != null) {
LOG.warn("PURGE {} HighestSequenceId leftover {}", name, highestSequenceId);
}
}

private static void printByteLongMapWARN(Map<ImmutableByteArray, Long> m,
String label, String encodedRegionNameStr) {
if (m != null && !m.isEmpty()) {
for (Map.Entry<ImmutableByteArray, Long> e : m.entrySet()) {
LOG.warn("PURGE {} {} leftover {} {} {}", encodedRegionNameStr,
label, e.getKey().toStringUtf8(), e.getValue());
}
}
}

void abortCacheFlush(final byte[] encodedRegionName) {
// Method is called when we are crashing down because failed write flush AND it is called
// if we fail prepare. The below is for the fail prepare case; we restore the old sequence ids.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -31,7 +31,6 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -162,18 +161,17 @@ public long getLength() {
}

// Make up mocked server and services.
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(CONF);
when(server.isStopped()).thenReturn(false);
when(server.isAborted()).thenReturn(false);
RegionServerServices services = mock(RegionServerServices.class);
when(services.getConfiguration()).thenReturn(CONF);
when(services.isStopped()).thenReturn(false);
when(services.isAborted()).thenReturn(false);
// OK. Now I have my mocked up Server and RegionServerServices and my dodgy WAL, go ahead with
// the test.
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
dodgyWAL.init();
LogRoller logRoller = new LogRoller(server, services);
LogRoller logRoller = new LogRoller(services);
logRoller.addWAL(dodgyWAL);
logRoller.start();

Expand Down Expand Up @@ -224,7 +222,7 @@ public long getLength() {
// to just continue.

// So, should be no abort at this stage. Verify.
Mockito.verify(server, Mockito.atLeast(0)).
Mockito.verify(services, Mockito.atLeast(0)).
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
try {
dodgyWAL.throwAppendException = false;
Expand All @@ -240,7 +238,7 @@ public long getLength() {
// happens. If it don't we'll timeout the whole test. That is fine.
while (true) {
try {
Mockito.verify(server, Mockito.atLeast(1)).
Mockito.verify(services, Mockito.atLeast(1)).
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
break;
} catch (WantedButNotInvoked t) {
Expand All @@ -249,7 +247,7 @@ public long getLength() {
}
} finally {
// To stop logRoller, its server has to say it is stopped.
Mockito.when(server.isStopped()).thenReturn(true);
Mockito.when(services.isStopped()).thenReturn(true);
if (logRoller != null) logRoller.close();
if (region != null) {
try {
Expand Down
Loading