1717 */
1818package org .apache .hadoop .hbase .mapreduce ;
1919
20+ import com .google .errorprone .annotations .RestrictedApi ;
2021import java .io .IOException ;
2122import java .util .ArrayList ;
2223import java .util .List ;
2324import org .apache .commons .lang3 .StringUtils ;
2425import org .apache .hadoop .conf .Configuration ;
26+ import org .apache .hadoop .hbase .Cell ;
2527import org .apache .hadoop .hbase .HConstants ;
2628import org .apache .hadoop .hbase .client .Result ;
2729import org .apache .hadoop .hbase .client .Scan ;
3335import org .apache .hadoop .hbase .util .Bytes ;
3436import org .apache .hadoop .mapreduce .Counter ;
3537import org .apache .hadoop .mapreduce .Job ;
38+ import org .apache .hadoop .mapreduce .Mapper ;
3639import org .apache .hadoop .mapreduce .lib .output .NullOutputFormat ;
3740import org .apache .yetus .audience .InterfaceAudience ;
3841import org .slf4j .Logger ;
@@ -65,22 +68,41 @@ public class RowCounter extends AbstractHBaseTool {
6568 private final static String OPT_END_TIME = "endtime" ;
6669 private final static String OPT_RANGE = "range" ;
6770 private final static String OPT_EXPECTED_COUNT = "expectedCount" ;
71+ private final static String OPT_COUNT_DELETE_MARKERS = "countDeleteMarkers" ;
6872
6973 private String tableName ;
7074 private List <MultiRowRangeFilter .RowRange > rowRangeList ;
7175 private long startTime ;
7276 private long endTime ;
7377 private long expectedCount ;
78+ private boolean countDeleteMarkers ;
7479 private List <String > columns = new ArrayList <>();
7580
81+ private Job job ;
82+
7683 /**
7784 * Mapper that runs the count.
7885 */
7986 static class RowCounterMapper extends TableMapper <ImmutableBytesWritable , Result > {
8087
81- /** Counter enumeration to count the actual rows. */
88+ /** Counter enumeration to count the actual rows, cells and delete markers . */
8289 public static enum Counters {
83- ROWS
90+ ROWS ,
91+ DELETE ,
92+ DELETE_COLUMN ,
93+ DELETE_FAMILY ,
94+ DELETE_FAMILY_VERSION ,
95+ ROWS_WITH_DELETE_MARKER
96+ }
97+
98+ private boolean countDeleteMarkers ;
99+
100+ @ Override
101+ protected void
102+ setup (Mapper <ImmutableBytesWritable , Result , ImmutableBytesWritable , Result >.Context context )
103+ throws IOException , InterruptedException {
104+ Configuration conf = context .getConfiguration ();
105+ countDeleteMarkers = conf .getBoolean (OPT_COUNT_DELETE_MARKERS , false );
84106 }
85107
86108 /**
@@ -95,6 +117,37 @@ public static enum Counters {
95117 public void map (ImmutableBytesWritable row , Result values , Context context ) throws IOException {
96118 // Count every row containing data, whether it's in qualifiers or values
97119 context .getCounter (Counters .ROWS ).increment (1 );
120+
121+ if (countDeleteMarkers ) {
122+ boolean rowContainsDeleteMarker = false ;
123+ for (Cell cell : values .rawCells ()) {
124+ Cell .Type type = cell .getType ();
125+ switch (type ) {
126+ case Delete :
127+ rowContainsDeleteMarker = true ;
128+ context .getCounter (Counters .DELETE ).increment (1 );
129+ break ;
130+ case DeleteColumn :
131+ rowContainsDeleteMarker = true ;
132+ context .getCounter (Counters .DELETE_COLUMN ).increment (1 );
133+ break ;
134+ case DeleteFamily :
135+ rowContainsDeleteMarker = true ;
136+ context .getCounter (Counters .DELETE_FAMILY ).increment (1 );
137+ break ;
138+ case DeleteFamilyVersion :
139+ rowContainsDeleteMarker = true ;
140+ context .getCounter (Counters .DELETE_FAMILY_VERSION ).increment (1 );
141+ break ;
142+ default :
143+ break ;
144+ }
145+ }
146+
147+ if (rowContainsDeleteMarker ) {
148+ context .getCounter (Counters .ROWS_WITH_DELETE_MARKER ).increment (1 );
149+ }
150+ }
98151 }
99152 }
100153
@@ -105,11 +158,14 @@ public void map(ImmutableBytesWritable row, Result values, Context context) thro
105158 * @throws IOException When setting up the job fails.
106159 */
107160 public Job createSubmittableJob (Configuration conf ) throws IOException {
161+ conf .setBoolean (OPT_COUNT_DELETE_MARKERS , this .countDeleteMarkers );
108162 Job job = Job .getInstance (conf , conf .get (JOB_NAME_CONF_KEY , NAME + "_" + tableName ));
109163 job .setJarByClass (RowCounter .class );
110164 Scan scan = new Scan ();
165+ // raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set
166+ scan .setRaw (this .countDeleteMarkers );
111167 scan .setCacheBlocks (false );
112- setScanFilter (scan , rowRangeList );
168+ setScanFilter (scan , rowRangeList , this . countDeleteMarkers );
113169
114170 for (String columnName : this .columns ) {
115171 String family = StringUtils .substringBefore (columnName , ":" );
@@ -147,13 +203,15 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws
147203 List <MultiRowRangeFilter .RowRange > rowRangeList = null ;
148204 long startTime = 0 ;
149205 long endTime = 0 ;
206+ boolean countDeleteMarkers = false ;
150207
151208 StringBuilder sb = new StringBuilder ();
152209
153210 final String rangeSwitch = "--range=" ;
154211 final String startTimeArgKey = "--starttime=" ;
155212 final String endTimeArgKey = "--endtime=" ;
156213 final String expectedCountArg = "--expected-count=" ;
214+ final String countDeleteMarkersArg = "--countDeleteMarkers" ;
157215
158216 // First argument is table name, starting from second
159217 for (int i = 1 ; i < args .length ; i ++) {
@@ -179,10 +237,15 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws
179237 Long .parseLong (args [i ].substring (expectedCountArg .length ())));
180238 continue ;
181239 }
240+ if (args [i ].startsWith (countDeleteMarkersArg )) {
241+ countDeleteMarkers = true ;
242+ continue ;
243+ }
182244 // if no switch, assume column names
183245 sb .append (args [i ]);
184246 sb .append (" " );
185247 }
248+ conf .setBoolean (OPT_COUNT_DELETE_MARKERS , countDeleteMarkers );
186249 if (endTime < startTime ) {
187250 printUsage ("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime );
188251 return null ;
@@ -192,7 +255,9 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws
192255 job .setJarByClass (RowCounter .class );
193256 Scan scan = new Scan ();
194257 scan .setCacheBlocks (false );
195- setScanFilter (scan , rowRangeList );
258+ // raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set
259+ scan .setRaw (countDeleteMarkers );
260+ setScanFilter (scan , rowRangeList , countDeleteMarkers );
196261 if (sb .length () > 0 ) {
197262 for (String columnName : sb .toString ().trim ().split (" " )) {
198263 String family = StringUtils .substringBefore (columnName , ":" );
@@ -250,9 +315,11 @@ private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String
250315 * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. If rowRangeList
251316 * contains exactly one element, startRow and stopRow are set to the scan.
252317 */
253- private static void setScanFilter (Scan scan , List <MultiRowRangeFilter .RowRange > rowRangeList ) {
318+ private static void setScanFilter (Scan scan , List <MultiRowRangeFilter .RowRange > rowRangeList ,
319+ boolean countDeleteMarkers ) {
254320 final int size = rowRangeList == null ? 0 : rowRangeList .size ();
255- if (size <= 1 ) {
321+ // all cells will be needed if --countDeleteMarkers flag is set, hence, skipping filter
322+ if (size <= 1 && !countDeleteMarkers ) {
256323 scan .setFilter (new FirstKeyOnlyFilter ());
257324 }
258325 if (size == 1 ) {
@@ -295,10 +362,15 @@ protected void addOptions() {
295362 .desc ("[startKey],[endKey][;[startKey],[endKey]...]]" ).longOpt (OPT_RANGE ).build ();
296363 Option expectedOption = Option .builder (null ).valueSeparator ('=' ).hasArg (true )
297364 .desc ("expected number of rows to be count." ).longOpt (OPT_EXPECTED_COUNT ).build ();
365+ Option countDeleteMarkersOption = Option .builder (null ).hasArg (false )
366+ .desc ("counts the number of Delete Markers of all types, i.e. "
367+ + "(DELETE, DELETE_COLUMN, DELETE_FAMILY, DELETE_FAMILY_VERSION)" )
368+ .longOpt (OPT_COUNT_DELETE_MARKERS ).build ();
298369 addOption (startTimeOption );
299370 addOption (endTimeOption );
300371 addOption (rangeOption );
301372 addOption (expectedOption );
373+ addOption (countDeleteMarkersOption );
302374 }
303375
304376 @ Override
@@ -316,6 +388,7 @@ protected void processOptions(CommandLine cmd) throws IllegalArgumentException {
316388 this .startTime = cmd .getOptionValue (OPT_START_TIME ) == null
317389 ? 0
318390 : Long .parseLong (cmd .getOptionValue (OPT_START_TIME ));
391+ this .countDeleteMarkers = cmd .hasOption (OPT_COUNT_DELETE_MARKERS );
319392
320393 for (int i = 1 ; i < cmd .getArgList ().size (); i ++) {
321394 String argument = cmd .getArgList ().get (i );
@@ -347,7 +420,7 @@ protected void processOldArgs(List<String> args) {
347420
348421 @ Override
349422 protected int doWork () throws Exception {
350- Job job = createSubmittableJob (getConf ());
423+ job = createSubmittableJob (getConf ());
351424 if (job == null ) {
352425 return -1 ;
353426 }
@@ -388,4 +461,10 @@ protected CommandLineParser newParser() {
388461 return new RowCounterCommandLineParser ();
389462 }
390463
464+ @ RestrictedApi (explanation = "Only visible for testing" , link = "" ,
465+ allowedOnPath = ".*/src/test/.*" )
466+ Job getMapReduceJob () {
467+ return job ;
468+ }
469+
391470}
0 commit comments