Skip to content

Commit 269586c

Browse files
authored
HBASE-27785 Encapsulate and centralize totalBufferUsed in Replication… (#5168)
Signed-off-by: Duo Zhang <[email protected]>
1 parent f5ee958 commit 269586c

File tree

12 files changed

+136
-128
lines changed

12 files changed

+136
-128
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
137137
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
138138
new ConcurrentHashMap<>();
139139

140-
private AtomicLong totalBufferUsed;
141-
142140
public static final String WAIT_ON_ENDPOINT_SECONDS =
143141
"hbase.replication.wait.on.endpoint.seconds";
144142
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
@@ -220,7 +218,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
220218
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
221219
currentBandwidth = getCurrentBandwidth();
222220
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
223-
this.totalBufferUsed = manager.getTotalBufferUsed();
224221
this.walFileLengthProvider = walFileLengthProvider;
225222

226223
this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true);
@@ -779,14 +776,12 @@ public MetricsSource getSourceMetrics() {
779776

780777
@Override
781778
// offsets totalBufferUsed by deducting shipped batchSize.
782-
public void postShipEdits(List<Entry> entries, int batchSize) {
779+
public void postShipEdits(List<Entry> entries, long batchSize) {
783780
if (throttler.isEnabled()) {
784781
throttler.addPushSize(batchSize);
785782
}
786783
totalReplicatedEdits.addAndGet(entries.size());
787-
long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
788-
// Record the new buffer usage
789-
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
784+
this.manager.releaseBufferQuota(batchSize);
790785
}
791786

792787
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ default boolean isSyncReplication() {
164164
* @param entries pushed
165165
* @param batchSize entries size pushed
166166
*/
167-
void postShipEdits(List<Entry> entries, int batchSize);
167+
void postShipEdits(List<Entry> entries, long batchSize);
168168

169169
/**
170170
* The queue of WALs only belong to one region server. This will return the server name which all

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.hadoop.hbase.util.Pair;
6262
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
6363
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
64+
import org.apache.hadoop.hbase.wal.WAL.Entry;
6465
import org.apache.hadoop.hbase.wal.WALFactory;
6566
import org.apache.yetus.audience.InterfaceAudience;
6667
import org.apache.zookeeper.KeeperException;
@@ -984,8 +985,8 @@ Set<Path> getLastestPath() {
984985
}
985986
}
986987

987-
public AtomicLong getTotalBufferUsed() {
988-
return totalBufferUsed;
988+
public long getTotalBufferUsed() {
989+
return totalBufferUsed.get();
989990
}
990991

991992
/**
@@ -1035,7 +1036,7 @@ public String getStats() {
10351036
StringBuilder stats = new StringBuilder();
10361037
// Print stats that apply across all Replication Sources
10371038
stats.append("Global stats: ");
1038-
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
1039+
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=")
10391040
.append(getTotalBufferLimit()).append("B\n");
10401041
for (ReplicationSourceInterface source : this.sources.values()) {
10411042
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
@@ -1070,4 +1071,80 @@ MetricsReplicationGlobalSourceSource getGlobalMetrics() {
10701071
ReplicationQueueStorage getQueueStorage() {
10711072
return queueStorage;
10721073
}
1074+
1075+
/**
1076+
* Acquire the buffer quota for {@link Entry} which is added to {@link WALEntryBatch}.
1077+
* @param entry the wal entry which is added to {@link WALEntryBatch} and should acquire buffer
1078+
* quota.
1079+
* @return true if we should clear buffer and push all
1080+
*/
1081+
boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, Entry entry) {
1082+
long entrySize = walEntryBatch.incrementUsedBufferSize(entry);
1083+
return this.acquireBufferQuota(entrySize);
1084+
}
1085+
1086+
/**
1087+
* To release the buffer quota of {@link WALEntryBatch} which acquired by
1088+
* {@link ReplicationSourceManager#acquireWALEntryBufferQuota}.
1089+
* @return the released buffer quota size.
1090+
*/
1091+
long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) {
1092+
long usedBufferSize = walEntryBatch.getUsedBufferSize();
1093+
if (usedBufferSize > 0) {
1094+
this.releaseBufferQuota(usedBufferSize);
1095+
}
1096+
return usedBufferSize;
1097+
}
1098+
1099+
/**
1100+
* Add the size to {@link ReplicationSourceManager#totalBufferUsed} and check if it exceeds
1101+
* {@link ReplicationSourceManager#totalBufferLimit}.
1102+
* @return true if {@link ReplicationSourceManager#totalBufferUsed} exceeds
1103+
* {@link ReplicationSourceManager#totalBufferLimit},we should stop increase buffer and
1104+
* ship all.
1105+
*/
1106+
boolean acquireBufferQuota(long size) {
1107+
if (size < 0) {
1108+
throw new IllegalArgumentException("size should not less than 0");
1109+
}
1110+
long newBufferUsed = addTotalBufferUsed(size);
1111+
return newBufferUsed >= totalBufferLimit;
1112+
}
1113+
1114+
/**
1115+
* To release the buffer quota which acquired by
1116+
* {@link ReplicationSourceManager#acquireBufferQuota}.
1117+
*/
1118+
void releaseBufferQuota(long size) {
1119+
if (size < 0) {
1120+
throw new IllegalArgumentException("size should not less than 0");
1121+
}
1122+
addTotalBufferUsed(-size);
1123+
}
1124+
1125+
private long addTotalBufferUsed(long size) {
1126+
if (size == 0) {
1127+
return totalBufferUsed.get();
1128+
}
1129+
long newBufferUsed = totalBufferUsed.addAndGet(size);
1130+
// Record the new buffer usage
1131+
this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed);
1132+
return newBufferUsed;
1133+
}
1134+
1135+
/**
1136+
* Check if {@link ReplicationSourceManager#totalBufferUsed} exceeds
1137+
* {@link ReplicationSourceManager#totalBufferLimit} for peer.
1138+
* @return true if {@link ReplicationSourceManager#totalBufferUsed} not more than
1139+
* {@link ReplicationSourceManager#totalBufferLimit}.
1140+
*/
1141+
boolean checkBufferQuota(String peerId) {
1142+
// try not to go over total quota
1143+
if (totalBufferUsed.get() > totalBufferLimit) {
1144+
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
1145+
peerId, totalBufferUsed.get(), totalBufferLimit);
1146+
return false;
1147+
}
1148+
return true;
1149+
}
10731150
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import java.io.IOException;
2424
import java.util.List;
25-
import java.util.concurrent.atomic.LongAccumulator;
2625
import org.apache.hadoop.conf.Configuration;
2726
import org.apache.hadoop.fs.Path;
2827
import org.apache.hadoop.hbase.Cell;
@@ -150,18 +149,6 @@ private void noMoreData() {
150149
protected void postFinish() {
151150
}
152151

153-
/**
154-
* get batchEntry size excludes bulk load file sizes. Uses ReplicationSourceWALReader's static
155-
* method.
156-
*/
157-
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
158-
int totalSize = 0;
159-
for (Entry entry : entryBatch.getWalEntries()) {
160-
totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry);
161-
}
162-
return totalSize;
163-
}
164-
165152
/**
166153
* Do the shipping logic
167154
*/
@@ -173,7 +160,6 @@ private void shipEdits(WALEntryBatch entryBatch) {
173160
return;
174161
}
175162
int currentSize = (int) entryBatch.getHeapSize();
176-
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
177163
source.getSourceMetrics()
178164
.setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
179165
while (isActive()) {
@@ -217,7 +203,7 @@ private void shipEdits(WALEntryBatch entryBatch) {
217203
// this sizeExcludeBulkLoad has to use same calculation that when calling
218204
// acquireBufferQuota() in ReplicationSourceWALReader because they maintain
219205
// same variable: totalBufferUsed
220-
source.postShipEdits(entries, sizeExcludeBulkLoad);
206+
source.postShipEdits(entries, entryBatch.getUsedBufferSize());
221207
// FIXME check relationship between wal group and overall
222208
source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
223209
entryBatch.getNbHFiles());
@@ -366,20 +352,17 @@ void clearWALEntryBatch() {
366352
return;
367353
}
368354
}
369-
LongAccumulator totalToDecrement = new LongAccumulator((a, b) -> a + b, 0);
370-
entryReader.entryBatchQueue.forEach(w -> {
371-
entryReader.entryBatchQueue.remove(w);
372-
w.getWalEntries().forEach(e -> {
373-
long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
374-
totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
375-
});
376-
});
355+
long totalReleasedBytes = 0;
356+
while (true) {
357+
WALEntryBatch batch = entryReader.entryBatchQueue.poll();
358+
if (batch == null) {
359+
break;
360+
}
361+
totalReleasedBytes += source.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
362+
}
377363
if (LOG.isTraceEnabled()) {
378364
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
379-
totalToDecrement.longValue());
365+
totalReleasedBytes);
380366
}
381-
long newBufferUsed =
382-
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
383-
source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
384367
}
385368
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java

Lines changed: 10 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.concurrent.BlockingQueue;
2424
import java.util.concurrent.LinkedBlockingQueue;
2525
import java.util.concurrent.TimeUnit;
26-
import java.util.concurrent.atomic.AtomicLong;
2726
import org.apache.hadoop.conf.Configuration;
2827
import org.apache.hadoop.fs.FileSystem;
2928
import org.apache.hadoop.fs.Path;
@@ -35,7 +34,6 @@
3534
import org.apache.hadoop.hbase.util.Threads;
3635
import org.apache.hadoop.hbase.wal.WAL.Entry;
3736
import org.apache.hadoop.hbase.wal.WALEdit;
38-
import org.apache.hadoop.hbase.wal.WALKey;
3937
import org.apache.yetus.audience.InterfaceAudience;
4038
import org.apache.yetus.audience.InterfaceStability;
4139
import org.slf4j.Logger;
@@ -75,9 +73,6 @@ class ReplicationSourceWALReader extends Thread {
7573

7674
// Indicates whether this particular worker is running
7775
private boolean isReaderRunning = true;
78-
79-
private AtomicLong totalBufferUsed;
80-
private long totalBufferQuota;
8176
private final String walGroupId;
8277

8378
/**
@@ -105,8 +100,6 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
105100
// memory used will be batchSizeCapacity * (nb.batches + 1)
106101
// the +1 is for the current thread reading before placing onto the queue
107102
int batchCount = conf.getInt("replication.source.nb.batches", 1);
108-
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
109-
this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
110103
// 1 second
111104
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
112105
// 5 minutes @ 1 sec per
@@ -147,7 +140,7 @@ public void run() {
147140
Threads.sleep(sleepForRetries);
148141
continue;
149142
}
150-
if (!checkQuota()) {
143+
if (!checkBufferQuota()) {
151144
continue;
152145
}
153146
Path currentPath = entryStream.getCurrentPath();
@@ -188,7 +181,7 @@ public void run() {
188181
// batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
189182
// decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
190183
// acquired in ReplicationSourceWALReader.acquireBufferQuota.
191-
this.releaseBufferQuota(batch);
184+
this.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
192185
}
193186
}
194187
}
@@ -218,10 +211,9 @@ protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
218211
entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
219212
updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
220213
long entrySize = getEntrySizeIncludeBulkLoad(entry);
221-
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
222214
batch.addEntry(entry, entrySize);
223215
updateBatchStats(batch, entry, entrySize);
224-
boolean totalBufferTooLarge = acquireBufferQuota(batch, entrySizeExcludeBulkLoad);
216+
boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry);
225217

226218
// Stop if too many entries or too big
227219
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
@@ -275,11 +267,9 @@ public Path getCurrentPath() {
275267
}
276268

277269
// returns false if we've already exceeded the global quota
278-
private boolean checkQuota() {
270+
private boolean checkBufferQuota() {
279271
// try not to go over total quota
280-
if (totalBufferUsed.get() > totalBufferQuota) {
281-
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
282-
this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
272+
if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
283273
Threads.sleep(sleepForRetries);
284274
return false;
285275
}
@@ -319,13 +309,7 @@ public WALEntryBatch poll(long timeout) throws InterruptedException {
319309

320310
private long getEntrySizeIncludeBulkLoad(Entry entry) {
321311
WALEdit edit = entry.getEdit();
322-
return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
323-
}
324-
325-
public static long getEntrySizeExcludeBulkLoad(Entry entry) {
326-
WALEdit edit = entry.getEdit();
327-
WALKey key = entry.getKey();
328-
return edit.heapSize() + key.estimatedSerializedSizeOf();
312+
return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
329313
}
330314

331315
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
@@ -435,30 +419,6 @@ private void updateReplicationMarkerEdit(Entry entry, long offset) {
435419
edit.setCells(newCells);
436420
}
437421

438-
/**
439-
* @param size delta size for grown buffer
440-
* @return true if we should clear buffer and push all
441-
*/
442-
private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
443-
long newBufferUsed = totalBufferUsed.addAndGet(size);
444-
// Record the new buffer usage
445-
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
446-
walEntryBatch.incrementUsedBufferSize(size);
447-
return newBufferUsed >= totalBufferQuota;
448-
}
449-
450-
/**
451-
* To release the buffer quota of {@link WALEntryBatch} which acquired by
452-
* {@link ReplicationSourceWALReader#acquireBufferQuota}
453-
*/
454-
private void releaseBufferQuota(WALEntryBatch walEntryBatch) {
455-
long usedBufferSize = walEntryBatch.getUsedBufferSize();
456-
if (usedBufferSize > 0) {
457-
long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize);
458-
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
459-
}
460-
}
461-
462422
/** Returns whether the reader thread is running */
463423
public boolean isReaderRunning() {
464424
return isReaderRunning && !isInterrupted();
@@ -470,4 +430,8 @@ public boolean isReaderRunning() {
470430
public void setReaderRunning(boolean readerRunning) {
471431
this.isReaderRunning = readerRunning;
472432
}
433+
434+
private ReplicationSourceManager getSourceManager() {
435+
return this.source.getSourceManager();
436+
}
473437
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public long getNextSleepInterval(final int size) {
100100
* Add current size to the current cycle's total push size
101101
* @param size is the current size added to the current cycle's total push size
102102
*/
103-
public void addPushSize(final int size) {
103+
public void addPushSize(final long size) {
104104
if (this.enabled) {
105105
this.cyclePushSize += size;
106106
}

0 commit comments

Comments
 (0)