Skip to content

Commit c8e1785

Browse files
committed
HBASE-24871 Replication may loss data when refresh recovered replication sources
1 parent 8646ac1 commit c8e1785

File tree

3 files changed

+166
-5
lines changed

3 files changed

+166
-5
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -513,12 +513,12 @@ public void refreshSources(String peerId) throws IOException {
513513
}
514514
}
515515
for (String queueId : previousQueueIds) {
516-
ReplicationSourceInterface replicationSource = createSource(queueId, peer);
517-
this.oldsources.add(replicationSource);
516+
ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer);
517+
this.oldsources.add(recoveredReplicationSource);
518518
for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
519-
walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal)));
519+
walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
520520
}
521-
toStartup.add(replicationSource);
521+
toStartup.add(recoveredReplicationSource);
522522
}
523523
}
524524
for (ReplicationSourceInterface replicationSource : toStartup) {

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public class TestReplicationBase {
7878
protected static Configuration CONF1 = UTIL1.getConfiguration();
7979
protected static Configuration CONF2 = UTIL2.getConfiguration();
8080

81-
protected static final int NUM_SLAVES1 = 1;
81+
protected static int NUM_SLAVES1 = 1;
8282
protected static final int NUM_SLAVES2 = 1;
8383
protected static final int NB_ROWS_IN_BATCH = 100;
8484
protected static final int NB_ROWS_IN_BIG_BATCH =
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication.regionserver;
19+
20+
import java.io.IOException;
21+
import java.util.Optional;
22+
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.hbase.HBaseClassTestRule;
25+
import org.apache.hadoop.hbase.HConstants;
26+
import org.apache.hadoop.hbase.TableName;
27+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
28+
import org.apache.hadoop.hbase.client.Put;
29+
import org.apache.hadoop.hbase.client.Result;
30+
import org.apache.hadoop.hbase.client.ResultScanner;
31+
import org.apache.hadoop.hbase.client.Scan;
32+
import org.apache.hadoop.hbase.client.Table;
33+
import org.apache.hadoop.hbase.client.TableDescriptor;
34+
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
35+
import org.apache.hadoop.hbase.replication.TestReplicationBase;
36+
import org.apache.hadoop.hbase.testclassification.MediumTests;
37+
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
38+
import org.apache.hadoop.hbase.util.Bytes;
39+
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
40+
import org.junit.After;
41+
import org.junit.AfterClass;
42+
import org.junit.Assert;
43+
import org.junit.Before;
44+
import org.junit.BeforeClass;
45+
import org.junit.ClassRule;
46+
import org.junit.Rule;
47+
import org.junit.Test;
48+
import org.junit.experimental.categories.Category;
49+
import org.junit.rules.TestName;
50+
import org.slf4j.Logger;
51+
import org.slf4j.LoggerFactory;
52+
53+
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
54+
55+
/**
56+
* Testcase for HBASE-24871.
57+
*/
58+
@Category({ ReplicationTests.class, MediumTests.class })
59+
public class TestRefreshRecoveredReplication extends TestReplicationBase {
60+
61+
@ClassRule
62+
public static final HBaseClassTestRule CLASS_RULE =
63+
HBaseClassTestRule.forClass(TestRefreshRecoveredReplication.class);
64+
65+
private static final Logger LOG = LoggerFactory.getLogger(TestRefreshRecoveredReplication.class);
66+
67+
private static final int BATCH = 50;
68+
69+
@Rule
70+
public TestName name = new TestName();
71+
72+
private TableName tablename;
73+
private Table table1;
74+
private Table table2;
75+
76+
@BeforeClass
77+
public static void setUpBeforeClass() throws Exception {
78+
NUM_SLAVES1 = 2;
79+
// replicate slowly
80+
Configuration conf1 = UTIL1.getConfiguration();
81+
conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 100);
82+
TestReplicationBase.setUpBeforeClass();
83+
}
84+
85+
@AfterClass
86+
public static void tearDownAfterClass() throws Exception {
87+
TestReplicationBase.tearDownAfterClass();
88+
}
89+
90+
@Before
91+
public void setup() throws Exception {
92+
setUpBase();
93+
94+
tablename = TableName.valueOf(name.getMethodName());
95+
TableDescriptor table = TableDescriptorBuilder.newBuilder(tablename)
96+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
97+
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
98+
.build();
99+
100+
UTIL1.getAdmin().createTable(table);
101+
UTIL2.getAdmin().createTable(table);
102+
UTIL1.waitTableAvailable(tablename);
103+
UTIL2.waitTableAvailable(tablename);
104+
table1 = UTIL1.getConnection().getTable(tablename);
105+
table2 = UTIL2.getConnection().getTable(tablename);
106+
}
107+
108+
@After
109+
public void teardown() throws Exception {
110+
tearDownBase();
111+
112+
UTIL1.deleteTableIfAny(tablename);
113+
UTIL2.deleteTableIfAny(tablename);
114+
}
115+
116+
@Test
117+
public void testReplicationRefreshSource() throws Exception {
118+
// put some data
119+
for (int i = 0; i < BATCH; i++) {
120+
byte[] r = Bytes.toBytes(i);
121+
table1.put(new Put(r).addColumn(famName, famName, r));
122+
}
123+
124+
// kill rs holding table region
125+
Optional<RegionServerThread> server = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads()
126+
.stream()
127+
.filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename)))
128+
.findAny();
129+
Assert.assertTrue(server.isPresent());
130+
server.get().getRegionServer().abort("stopping for test");
131+
UTIL1.waitFor(60000, () ->
132+
UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1);
133+
UTIL1.waitTableAvailable(tablename);
134+
135+
// waiting for recovered peer to start
136+
Replication replication = (Replication) UTIL1.getMiniHBaseCluster()
137+
.getLiveRegionServerThreads().get(0).getRegionServer().getReplicationSourceService();
138+
UTIL1.waitFor(60000, () ->
139+
!replication.getReplicationManager().getOldSources().isEmpty());
140+
141+
// disable peer to trigger refreshSources
142+
hbaseAdmin.disableReplicationPeer(PEER_ID2);
143+
LOG.info("has replicated {} rows before disable peer", checkReplicationData());
144+
hbaseAdmin.enableReplicationPeer(PEER_ID2);
145+
// waiting to replicate all data to slave
146+
UTIL2.waitFor(30000, () -> {
147+
int count = checkReplicationData();
148+
LOG.info("Waiting all logs pushed to slave. Expected {} , actual {}", BATCH, count);
149+
return count == BATCH;
150+
});
151+
}
152+
153+
private int checkReplicationData() throws IOException {
154+
int count = 0;
155+
ResultScanner results = table2.getScanner(new Scan().setCaching(BATCH));
156+
for (Result r : results) {
157+
count++;
158+
}
159+
return count;
160+
}
161+
}

0 commit comments

Comments
 (0)