Skip to content

Commit cae0e1e

Browse files
committed
HBASE-26913 Replication Observability Framework
1 parent 5cf728d commit cae0e1e

File tree

40 files changed

+2036
-66
lines changed

40 files changed

+2036
-66
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,10 @@
2121
import java.util.ArrayList;
2222
import java.util.List;
2323
import java.util.concurrent.ThreadLocalRandom;
24-
import org.apache.hadoop.conf.Configuration;
2524
import org.apache.hadoop.hbase.HConstants;
2625
import org.apache.hadoop.hbase.NamespaceDescriptor;
2726
import org.apache.hadoop.hbase.TableName;
2827
import org.apache.hadoop.hbase.client.Connection;
29-
import org.apache.hadoop.hbase.client.ConnectionFactory;
3028
import org.apache.hadoop.hbase.client.Durability;
3129
import org.apache.hadoop.hbase.client.Put;
3230
import org.apache.hadoop.hbase.client.Table;
@@ -48,8 +46,6 @@ public class SlowLogTableAccessor {
4846

4947
private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class);
5048

51-
private static Connection connection;
52-
5349
/**
5450
* hbase:slowlog table name - can be enabled with config -
5551
* hbase.regionserver.slowlog.systable.enabled
@@ -66,10 +62,10 @@ private static void doPut(final Connection connection, final List<Put> puts) thr
6662
/**
6763
* Add slow/large log records to hbase:slowlog table
6864
* @param slowLogPayloads List of SlowLogPayload to process
69-
* @param configuration Configuration to use for connection
65+
* @param connection connection
7066
*/
7167
public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads,
72-
final Configuration configuration) {
68+
Connection connection) {
7369
List<Put> puts = new ArrayList<>(slowLogPayloads.size());
7470
for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) {
7571
final byte[] rowKey = getRowKey(slowLogPayload);
@@ -102,26 +98,12 @@ public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowL
10298
puts.add(put);
10399
}
104100
try {
105-
if (connection == null) {
106-
createConnection(configuration);
107-
}
108101
doPut(connection, puts);
109102
} catch (Exception e) {
110103
LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e);
111104
}
112105
}
113106

114-
private static synchronized void createConnection(Configuration configuration)
115-
throws IOException {
116-
Configuration conf = new Configuration(configuration);
117-
// rpc timeout: 20s
118-
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000);
119-
// retry count: 5
120-
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
121-
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
122-
connection = ConnectionFactory.createConnection(conf);
123-
}
124-
125107
/**
126108
* Create rowKey: currentTime APPEND slowLogPayload.hashcode Scan on slowlog table should keep
127109
* records with sorted order of time, however records added at the very same time could be in
@@ -140,5 +122,4 @@ private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload)
140122
final long rowKeyLong = Long.parseLong(timeAndHashcode);
141123
return Bytes.toBytes(rowKeyLong);
142124
}
143-
144125
}

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1552,6 +1552,14 @@ public enum OperationStatusCode {
15521552
"hbase.regionserver.slowlog.systable.enabled";
15531553
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
15541554

1555+
@Deprecated
1556+
// since <need to know the version number> and will be removed in <version number>
1557+
// Instead use hbase.regionserver.named.queue.chore.duration config property
1558+
public static final String SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY =
1559+
"hbase.slowlog.systable.chore.duration";
1560+
// Default 10 mins.
1561+
public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000;
1562+
15551563
public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY =
15561564
"hbase.shell.timestamp.format.epoch";
15571565

@@ -1567,6 +1575,22 @@ public enum OperationStatusCode {
15671575
*/
15681576
public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
15691577

1578+
public static final String WAL_EVENT_TRACKER_ENABLED_KEY =
1579+
"hbase.regionserver.wal.event.tracker.enabled";
1580+
public static final boolean WAL_EVENT_TRACKER_ENABLED_DEFAULT = false;
1581+
1582+
public static final String NAMED_QUEUE_CHORE_DURATION_KEY =
1583+
"hbase.regionserver.named.queue.chore.duration";
1584+
// 10 mins default.
1585+
public static final int NAMED_QUEUE_CHORE_DURATION_DEFAULT = 10 * 60 * 1000;
1586+
1587+
/** The walEventTracker info family as a string */
1588+
private static final String WAL_EVENT_TRACKER_INFO_FAMILY_STR = "info";
1589+
1590+
/** The walEventTracker info family in array of bytes */
1591+
public static final byte[] WAL_EVENT_TRACKER_INFO_FAMILY =
1592+
Bytes.toBytes(WAL_EVENT_TRACKER_INFO_FAMILY_STR);
1593+
15701594
private HConstants() {
15711595
// Can't be instantiated with this ctor.
15721596
}

hbase-common/src/main/resources/hbase-default.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2022,7 +2022,7 @@ possible configurations would overwhelm and obscure the important.
20222022
</property>
20232023
<property>
20242024
<name>hbase.namedqueue.provider.classes</name>
2025-
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService</value>
2025+
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService,org.apache.hadoop.hbase.namequeues.WALEventTrackerQueueService</value>
20262026
<description>
20272027
Default values for NamedQueueService implementors. This comma separated full class names
20282028
represent all implementors of NamedQueueService that we would like to be invoked by
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.namequeues;
19+
20+
import org.apache.hadoop.hbase.metrics.BaseSource;
21+
import org.apache.yetus.audience.InterfaceAudience;
22+
23+
@InterfaceAudience.Private
24+
public interface MetricsWALEventTrackerSource extends BaseSource {
25+
/**
26+
* The name of the metrics
27+
*/
28+
String METRICS_NAME = "WALEventTracker";
29+
30+
/**
31+
* The name of the metrics context that metrics will be under.
32+
*/
33+
String METRICS_CONTEXT = "regionserver";
34+
35+
/**
36+
* Description
37+
*/
38+
String METRICS_DESCRIPTION = "Metrics about HBase RegionServer WALEventTracker";
39+
40+
/**
41+
* The name of the metrics context that metrics will be under in jmx
42+
*/
43+
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
44+
45+
String NUM_FAILED_PUTS = "numFailedPuts";
46+
String NUM_FAILED_PUTS_DESC = "Number of put requests that failed";
47+
48+
String NUM_RECORDS_FAILED_PUTS = "numRecordsFailedPuts";
49+
String NUM_RECORDS_FAILED_PUTS_DESC = "number of records in failed puts";
50+
51+
/*
52+
* Increment 2 counters, numFailedPuts and numRecordsFailedPuts
53+
*/
54+
void incrFailedPuts(long numRecords);
55+
56+
/*
57+
* Get the failed puts counter.
58+
*/
59+
long getFailedPuts();
60+
61+
/*
62+
* Get the number of records in failed puts.
63+
*/
64+
long getNumRecordsFailedPuts();
65+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.namequeues;
19+
20+
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
21+
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
22+
import org.apache.yetus.audience.InterfaceAudience;
23+
24+
@InterfaceAudience.Private
25+
public class MetricsWALEventTrackerSourceImpl extends BaseSourceImpl
26+
implements MetricsWALEventTrackerSource {
27+
28+
private final MutableFastCounter numFailedPutsCount;
29+
private final MutableFastCounter numRecordsFailedPutsCount;
30+
31+
public MetricsWALEventTrackerSourceImpl() {
32+
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
33+
}
34+
35+
public MetricsWALEventTrackerSourceImpl(String metricsName, String metricsDescription,
36+
String metricsContext, String metricsJmxContext) {
37+
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
38+
numFailedPutsCount =
39+
this.getMetricsRegistry().newCounter(NUM_FAILED_PUTS, NUM_FAILED_PUTS_DESC, 0L);
40+
numRecordsFailedPutsCount = this.getMetricsRegistry().newCounter(NUM_RECORDS_FAILED_PUTS,
41+
NUM_RECORDS_FAILED_PUTS_DESC, 0L);
42+
}
43+
44+
@Override
45+
public void incrFailedPuts(long numRecords) {
46+
numFailedPutsCount.incr();
47+
numRecordsFailedPutsCount.incr(numRecords);
48+
}
49+
50+
@Override
51+
public long getFailedPuts() {
52+
return numFailedPutsCount.value();
53+
}
54+
55+
@Override
56+
public long getNumRecordsFailedPuts() {
57+
return numRecordsFailedPutsCount.value();
58+
}
59+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSourceImpl

hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,12 @@ message RegionEventDescriptor {
182182
*/
183183
message WALTrailer {
184184
}
185+
186+
/**
187+
* Special WAL entry for replication marker event.
188+
*/
189+
message ReplicationMarkerDescriptor {
190+
required string region_server_name = 1;
191+
required string wal_name = 2;
192+
required uint64 offset = 3;
193+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@
175175
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
176176
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
177177
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
178+
import org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator;
178179
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
179180
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
180181
import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
@@ -217,6 +218,7 @@
217218
import org.apache.hadoop.hbase.replication.SyncReplicationState;
218219
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
219220
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
221+
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
220222
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
221223
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
222224
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
@@ -1245,6 +1247,10 @@ private void finishActiveMasterInitialization(MonitoredTask status)
12451247
final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
12461248
slowLogMasterService.init();
12471249

1250+
WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this);
1251+
// Create REPLICATION.SINK_TRACKER table if needed.
1252+
ReplicationSinkTrackerTableCreator.createIfNeededAndNotExists(conf, this);
1253+
12481254
// clear the dead servers with same host name and port of online server because we are not
12491255
// removing dead server with same hostname and port of rs which is trying to check in before
12501256
// master initialization. See HBASE-5916.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.waleventtracker;
19+
20+
import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
21+
import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME_STR;
22+
23+
import java.io.IOException;
24+
import java.util.concurrent.TimeUnit;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.hbase.HConstants;
27+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
28+
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
29+
import org.apache.hadoop.hbase.master.MasterServices;
30+
import org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor;
31+
import org.apache.yetus.audience.InterfaceAudience;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
/**
36+
* WALEventTracker Table creation to be used by HMaster
37+
*/
38+
@InterfaceAudience.Private
39+
public final class WALEventTrackerTableCreator {
40+
private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableCreator.class);
41+
private static final Long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds
42+
43+
private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER =
44+
TableDescriptorBuilder.newBuilder(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME)
45+
.setRegionReplication(1).setColumnFamily(
46+
ColumnFamilyDescriptorBuilder.newBuilder(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY)
47+
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false)
48+
.setMaxVersions(1).setTimeToLive(TTL.intValue()).build());
49+
50+
/* Private default constructor */
51+
private WALEventTrackerTableCreator() {
52+
}
53+
54+
/*
55+
* We will create this table only if hbase.regionserver.wal.event.tracker.enabled is enabled and
56+
* table doesn't exists already.
57+
*/
58+
public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices)
59+
throws IOException {
60+
boolean walEventTrackerEnabled = conf.getBoolean(HConstants.WAL_EVENT_TRACKER_ENABLED_KEY,
61+
HConstants.WAL_EVENT_TRACKER_ENABLED_DEFAULT);
62+
if (!walEventTrackerEnabled) {
63+
LOG.info("wal event tracker requests logging to table " + WAL_EVENT_TRACKER_TABLE_NAME_STR
64+
+ " is disabled. Quitting.");
65+
return;
66+
}
67+
if (
68+
!masterServices.getTableDescriptors()
69+
.exists(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME)
70+
) {
71+
LOG.info(WAL_EVENT_TRACKER_TABLE_NAME_STR + " table not found. Creating.");
72+
masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE);
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)