Skip to content

Commit 864b9c7

Browse files
committed
HBASE-29003 Proper bulk load tracking
The HBase backup mechanism keeps track of which HFiles were bulk loaded, so they can be included in incremental backups. Before this ticket, these bulk load records were only deleted when an incremental backup is created. This commit adds 2 more locations: 1) after a full backup. Since a full backup already captures all data, this meant that unnecessary HFiles were included in the next incremental backup. 2) after a table delete/truncate. Previously, if an HFile was loaded before a table was cleared, the next incremental backup would effectively still include the HFile. This lead to incorrect data being restored. This commit also completely refactors & simplifies the test for this functionality.
1 parent 1f6ad4d commit 864b9c7

File tree

8 files changed

+226
-123
lines changed

8 files changed

+226
-123
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,29 @@
1919

2020
import java.io.IOException;
2121
import java.util.ArrayList;
22+
import java.util.Arrays;
2223
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Optional;
2627
import java.util.Set;
28+
import java.util.function.Predicate;
29+
import java.util.stream.Collectors;
2730
import org.apache.hadoop.conf.Configuration;
2831
import org.apache.hadoop.fs.Path;
2932
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
3033
import org.apache.hadoop.hbase.TableName;
3134
import org.apache.hadoop.hbase.backup.impl.BackupManager;
3235
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
36+
import org.apache.hadoop.hbase.backup.impl.BulkLoad;
37+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
3338
import org.apache.hadoop.hbase.client.Connection;
3439
import org.apache.hadoop.hbase.client.ConnectionFactory;
3540
import org.apache.hadoop.hbase.client.RegionInfo;
41+
import org.apache.hadoop.hbase.client.TableDescriptor;
42+
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
43+
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
44+
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
3645
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
3746
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
3847
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -42,18 +51,26 @@
4251
import org.slf4j.Logger;
4352
import org.slf4j.LoggerFactory;
4453

54+
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
55+
4556
/**
4657
* An Observer to facilitate backup operations
4758
*/
4859
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
49-
public class BackupObserver implements RegionCoprocessor, RegionObserver {
60+
public class BackupObserver
61+
implements RegionCoprocessor, RegionObserver, MasterCoprocessor, MasterObserver {
5062
private static final Logger LOG = LoggerFactory.getLogger(BackupObserver.class);
5163

5264
@Override
5365
public Optional<RegionObserver> getRegionObserver() {
5466
return Optional.of(this);
5567
}
5668

69+
@Override
70+
public Optional<MasterObserver> getMasterObserver() {
71+
return Optional.of(this);
72+
}
73+
5774
@Override
5875
public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
5976
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
@@ -106,4 +123,62 @@ private void registerBulkLoad(ObserverContext<? extends RegionCoprocessorEnviron
106123
}
107124
}
108125
}
126+
127+
@Override
128+
public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
129+
TableName tableName) throws IOException {
130+
Configuration cfg = ctx.getEnvironment().getConfiguration();
131+
if (!BackupManager.isBackupEnabled(cfg)) {
132+
LOG.debug("Skipping postDeleteTable hook since backup is disabled");
133+
return;
134+
}
135+
deleteBulkLoads(cfg, tableName, (ignored) -> true);
136+
}
137+
138+
@Override
139+
public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
140+
TableName tableName) throws IOException {
141+
Configuration cfg = ctx.getEnvironment().getConfiguration();
142+
if (!BackupManager.isBackupEnabled(cfg)) {
143+
LOG.debug("Skipping postTruncateTable hook since backup is disabled");
144+
return;
145+
}
146+
deleteBulkLoads(cfg, tableName, (ignored) -> true);
147+
}
148+
149+
@Override
150+
public void postModifyTable(final ObserverContext<MasterCoprocessorEnvironment> ctx,
151+
final TableName tableName, TableDescriptor oldDescriptor, TableDescriptor currentDescriptor)
152+
throws IOException {
153+
Configuration cfg = ctx.getEnvironment().getConfiguration();
154+
if (!BackupManager.isBackupEnabled(cfg)) {
155+
LOG.debug("Skipping postModifyTable hook since backup is disabled");
156+
return;
157+
}
158+
159+
Set<String> oldFamilies = Arrays.stream(oldDescriptor.getColumnFamilies())
160+
.map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet());
161+
Set<String> newFamilies = Arrays.stream(currentDescriptor.getColumnFamilies())
162+
.map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet());
163+
164+
Set<String> removedFamilies = Sets.difference(oldFamilies, newFamilies);
165+
if (!removedFamilies.isEmpty()) {
166+
Predicate<BulkLoad> filter = bulkload -> removedFamilies.contains(bulkload.getColumnFamily());
167+
deleteBulkLoads(cfg, tableName, filter);
168+
}
169+
}
170+
171+
/**
172+
* Deletes all bulk load entries for the given table, matching the provided predicate.
173+
*/
174+
private void deleteBulkLoads(Configuration config, TableName tableName,
175+
Predicate<BulkLoad> filter) throws IOException {
176+
try (Connection connection = ConnectionFactory.createConnection(config);
177+
BackupSystemTable tbl = new BackupSystemTable(connection)) {
178+
List<BulkLoad> bulkLoads = tbl.readBulkloadRows(List.of(tableName));
179+
List<byte[]> rowsToDelete =
180+
bulkLoads.stream().filter(filter).map(BulkLoad::getRowKey).toList();
181+
tbl.deleteBulkLoadedRows(rowsToDelete);
182+
}
183+
}
109184
}

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.backup;
1919

2020
import org.apache.hadoop.hbase.HConstants;
21+
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
2122
import org.apache.yetus.audience.InterfaceAudience;
2223

2324
/**
@@ -105,8 +106,9 @@ public interface BackupRestoreConstants {
105106
+ "org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager\n"
106107
+ "hbase.procedure.regionserver.classes=YOUR_CLASSES,"
107108
+ "org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager\n"
108-
+ "hbase.coprocessor.region.classes=YOUR_CLASSES,"
109-
+ "org.apache.hadoop.hbase.backup.BackupObserver\n" + "and restart the cluster\n"
109+
+ CoprocessorHost.REGION_COPROCESSOR_CONF_KEY + "=YOUR_CLASSES,"
110+
+ BackupObserver.class.getSimpleName() + "\n" + CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY
111+
+ "=YOUR_CLASSES," + BackupObserver.class.getSimpleName() + "\nand restart the cluster\n"
110112
+ "For more information please see http://hbase.apache.org/book.html#backuprestore\n";
111113
String ENABLE_BACKUP = "Backup is not enabled. To enable backup, " + "in hbase-site.xml, set:\n "
112114
+ BACKUP_CONFIG_STRING;

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,17 @@ public static void decorateRegionServerConfiguration(Configuration conf) {
146146
classes + "," + regionProcedureClass);
147147
}
148148
String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
149-
String regionObserverClass = BackupObserver.class.getName();
149+
String observerClass = BackupObserver.class.getName();
150150
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
151-
(coproc == null ? "" : coproc + ",") + regionObserverClass);
152-
if (LOG.isDebugEnabled()) {
153-
LOG.debug("Added region procedure manager: {}. Added region observer: {}",
154-
regionProcedureClass, regionObserverClass);
155-
}
151+
(coproc == null ? "" : coproc + ",") + observerClass);
152+
153+
String masterCoProc = conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
154+
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
155+
(masterCoProc == null ? "" : masterCoProc + ",") + observerClass);
156+
157+
LOG.debug(
158+
"Added region procedure manager: {}. Added region observer: {}. Added master observer: {}",
159+
regionProcedureClass, observerClass, observerClass);
156160
}
157161

158162
public static boolean isBackupEnabled(Configuration conf) {

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -411,25 +411,24 @@ public void registerBulkLoad(TableName tableName, byte[] region,
411411
try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
412412
List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath);
413413
bufferedMutator.mutate(puts);
414-
LOG.debug("Written {} rows for bulk load of {}", puts.size(), tableName);
414+
LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName);
415415
}
416416
}
417417

418-
/*
419-
* Removes rows recording bulk loaded hfiles from backup table
420-
* @param lst list of table names
421-
* @param rows the rows to be deleted
418+
/**
419+
* Removes entries from the table that tracks all bulk loaded hfiles.
420+
* @param rows the row keys of the entries to be deleted
422421
*/
423422
public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
424423
try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
425-
List<Delete> lstDels = new ArrayList<>();
424+
List<Delete> deletes = new ArrayList<>();
426425
for (byte[] row : rows) {
427426
Delete del = new Delete(row);
428-
lstDels.add(del);
429-
LOG.debug("orig deleting the row: " + Bytes.toString(row));
427+
deletes.add(del);
428+
LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row));
430429
}
431-
bufferedMutator.mutate(lstDels);
432-
LOG.debug("deleted " + rows.size() + " original bulkload rows");
430+
bufferedMutator.mutate(deletes);
431+
LOG.debug("Deleted {} bulk load entries.", rows.size());
433432
}
434433
}
435434

@@ -1522,16 +1521,6 @@ public static void deleteSnapshot(Connection conn) throws IOException {
15221521
}
15231522
}
15241523

1525-
public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
1526-
List<Delete> lstDels = new ArrayList<>(lst.size());
1527-
for (TableName table : lst) {
1528-
Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
1529-
del.addFamily(BackupSystemTable.META_FAMILY);
1530-
lstDels.add(del);
1531-
}
1532-
return lstDels;
1533-
}
1534-
15351524
private Put createPutForDeleteOperation(String[] backupIdList) {
15361525
byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
15371526
Put put = new Put(DELETE_OP_ROW);

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.io.IOException;
2727
import java.util.ArrayList;
2828
import java.util.HashMap;
29+
import java.util.List;
2930
import java.util.Map;
3031
import org.apache.hadoop.hbase.TableName;
3132
import org.apache.hadoop.hbase.backup.BackupCopyJob;
@@ -152,6 +153,11 @@ public void execute() throws IOException {
152153
// the snapshot.
153154
LOG.info("Execute roll log procedure for full backup ...");
154155

156+
// Gather the bulk loads being tracked by the system, which can be deleted (since their data
157+
// will be part of the snapshot being taken). We gather this list before taking the actual
158+
// snapshots for the same reason as the log rolls.
159+
List<BulkLoad> bulkLoadsToDelete = backupManager.readBulkloadRows(tableList);
160+
155161
Map<String, String> props = new HashMap<>();
156162
props.put("backupRoot", backupInfo.getBackupRootDir());
157163
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
@@ -192,6 +198,9 @@ public void execute() throws IOException {
192198
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
193199
backupManager.writeBackupStartCode(newStartCode);
194200

201+
backupManager
202+
.deleteBulkLoadedRows(bulkLoadsToDelete.stream().map(BulkLoad::getRowKey).toList());
203+
195204
// backup complete
196205
completeBackup(conn, backupInfo, BackupType.FULL, conf);
197206
} catch (Exception e) {

0 commit comments

Comments
 (0)