Skip to content

Commit 9d73df6

Browse files
raghu55n02mokai87
authored andcommitted
HBASE-28268 Provide option to skip wal while using TableOutputFormat (apache#6472)
Signed-off-by: Viraj Jasani <[email protected]>
1 parent a0bd41a commit 9d73df6

File tree

2 files changed

+145
-0
lines changed

2 files changed

+145
-0
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.hbase.client.Connection;
3131
import org.apache.hadoop.hbase.client.ConnectionFactory;
3232
import org.apache.hadoop.hbase.client.Delete;
33+
import org.apache.hadoop.hbase.client.Durability;
3334
import org.apache.hadoop.hbase.client.Mutation;
3435
import org.apache.hadoop.hbase.client.Put;
3536
import org.apache.hadoop.mapreduce.JobContext;
@@ -53,6 +54,15 @@ public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation> implemen
5354
/** Job parameter that specifies the output table. */
5455
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
5556

57+
/** Property value to use write-ahead logging */
58+
public static final boolean WAL_ON = true;
59+
60+
/** Property value to disable write-ahead logging */
61+
public static final boolean WAL_OFF = false;
62+
63+
/** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
64+
public static final String WAL_PROPERTY = "hbase.mapreduce.tableoutputformat.write.wal";
65+
5666
/**
5767
* Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}. For
5868
* keys matching this prefix, the prefix is stripped, and the value is set in the configuration
@@ -98,6 +108,7 @@ protected class TableRecordWriter extends RecordWriter<KEY, Mutation> {
98108

99109
private Connection connection;
100110
private BufferedMutator mutator;
111+
boolean useWriteAheadLogging;
101112

102113
/**
103114
*
@@ -108,6 +119,7 @@ public TableRecordWriter() throws IOException {
108119
this.connection = ConnectionFactory.createConnection(conf);
109120
this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
110121
LOG.info("Created table instance for " + tableName);
122+
this.useWriteAheadLogging = conf.getBoolean(WAL_PROPERTY, WAL_ON);
111123
}
112124

113125
/**
@@ -141,6 +153,9 @@ public void write(KEY key, Mutation value) throws IOException {
141153
if (!(value instanceof Put) && !(value instanceof Delete)) {
142154
throw new IOException("Pass a Delete or a Put");
143155
}
156+
if (!useWriteAheadLogging) {
157+
value.setDurability(Durability.SKIP_WAL);
158+
}
144159
mutator.mutate(value);
145160
}
146161
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.mapreduce;
19+
20+
import java.io.IOException;
21+
import javax.validation.constraints.Null;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hbase.HBaseClassTestRule;
24+
import org.apache.hadoop.hbase.HBaseTestingUtility;
25+
import org.apache.hadoop.hbase.TableName;
26+
import org.apache.hadoop.hbase.client.Delete;
27+
import org.apache.hadoop.hbase.client.Durability;
28+
import org.apache.hadoop.hbase.client.Mutation;
29+
import org.apache.hadoop.hbase.client.Put;
30+
import org.apache.hadoop.hbase.testclassification.MediumTests;
31+
import org.apache.hadoop.hbase.util.Bytes;
32+
import org.apache.hadoop.mapreduce.RecordWriter;
33+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
34+
import org.junit.After;
35+
import org.junit.AfterClass;
36+
import org.junit.Assert;
37+
import org.junit.BeforeClass;
38+
import org.junit.ClassRule;
39+
import org.junit.Test;
40+
import org.junit.experimental.categories.Category;
41+
import org.mockito.Mockito;
42+
43+
/**
44+
* Simple Tests to check whether the durability of the Mutation is changed or not, for
45+
* {@link TableOutputFormat} if {@link TableOutputFormat#WAL_PROPERTY} is set to false.
46+
*/
47+
@Category(MediumTests.class)
48+
public class TestTableOutputFormat {
49+
@ClassRule
50+
public static final HBaseClassTestRule CLASS_RULE =
51+
HBaseClassTestRule.forClass(TestTableOutputFormat.class);
52+
53+
private static final HBaseTestingUtility util = new HBaseTestingUtility();
54+
private static final TableName TABLE_NAME = TableName.valueOf("TEST_TABLE");
55+
private static final byte[] columnFamily = Bytes.toBytes("f");
56+
private static Configuration conf;
57+
private static RecordWriter<Null, Mutation> writer;
58+
private static TaskAttemptContext context;
59+
private static TableOutputFormat<Null> tableOutputFormat;
60+
61+
@BeforeClass
62+
public static void setUp() throws Exception {
63+
util.startMiniCluster();
64+
util.createTable(TABLE_NAME, columnFamily);
65+
66+
conf = new Configuration(util.getConfiguration());
67+
context = Mockito.mock(TaskAttemptContext.class);
68+
tableOutputFormat = new TableOutputFormat<>();
69+
conf.set(TableOutputFormat.OUTPUT_TABLE, "TEST_TABLE");
70+
}
71+
72+
@AfterClass
73+
public static void tearDown() throws Exception {
74+
util.shutdownMiniCluster();
75+
}
76+
77+
@After
78+
public void close() throws IOException, InterruptedException {
79+
if (writer != null && context != null) {
80+
writer.close(context);
81+
}
82+
}
83+
84+
@Test
85+
public void testTableOutputFormatWhenWalIsOFFForPut() throws IOException, InterruptedException {
86+
// setting up the configuration for the TableOutputFormat, with writing to the WAL off.
87+
conf.setBoolean(TableOutputFormat.WAL_PROPERTY, TableOutputFormat.WAL_OFF);
88+
tableOutputFormat.setConf(conf);
89+
90+
writer = tableOutputFormat.getRecordWriter(context);
91+
92+
// creating mutation of the type put
93+
Put put = new Put("row1".getBytes());
94+
put.addColumn(columnFamily, Bytes.toBytes("aa"), Bytes.toBytes("value"));
95+
96+
// verifying whether durability of mutation is USE_DEFAULT or not, before commiting write.
97+
Assert.assertEquals("Durability of the mutation should be USE_DEFAULT", Durability.USE_DEFAULT,
98+
put.getDurability());
99+
100+
writer.write(null, put);
101+
102+
// verifying whether durability of mutation got changed to the SKIP_WAL or not.
103+
Assert.assertEquals("Durability of the mutation should be SKIP_WAL", Durability.SKIP_WAL,
104+
put.getDurability());
105+
}
106+
107+
@Test
108+
public void testTableOutputFormatWhenWalIsOFFForDelete()
109+
throws IOException, InterruptedException {
110+
// setting up the configuration for the TableOutputFormat, with writing to the WAL off.
111+
conf.setBoolean(TableOutputFormat.WAL_PROPERTY, TableOutputFormat.WAL_OFF);
112+
tableOutputFormat.setConf(conf);
113+
114+
writer = tableOutputFormat.getRecordWriter(context);
115+
116+
// creating mutation of the type delete
117+
Delete delete = new Delete("row2".getBytes());
118+
delete.addColumn(columnFamily, Bytes.toBytes("aa"));
119+
120+
// verifying whether durability of mutation is USE_DEFAULT or not, before commiting write.
121+
Assert.assertEquals("Durability of the mutation should be USE_DEFAULT", Durability.USE_DEFAULT,
122+
delete.getDurability());
123+
124+
writer.write(null, delete);
125+
126+
// verifying whether durability of mutation got changed from USE_DEFAULT to the SKIP_WAL or not.
127+
Assert.assertEquals("Durability of the mutation should be SKIP_WAL", Durability.SKIP_WAL,
128+
delete.getDurability());
129+
}
130+
}

0 commit comments

Comments
 (0)