1818
1919package org .apache .hadoop .hbase .replication ;
2020
21+ import static org .apache .hadoop .hbase .HConstants .DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT ;
22+ import static org .apache .hadoop .hbase .HConstants .HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY ;
23+
2124import java .io .IOException ;
2225import java .util .ArrayList ;
2326import java .util .Collections ;
2427import java .util .List ;
2528import java .util .UUID ;
2629
27- import org .apache .hadoop .hbase .zookeeper .ZKListener ;
28- import org .apache .yetus .audience .InterfaceAudience ;
30+ import org .apache .hadoop .conf .Configuration ;
2931import org .apache .hadoop .hbase .Abortable ;
32+ import org .apache .hadoop .hbase .ChoreService ;
33+ import org .apache .hadoop .hbase .ScheduledChore ;
34+ import org .apache .hadoop .hbase .Server ;
3035import org .apache .hadoop .hbase .ServerName ;
36+ import org .apache .hadoop .hbase .client .AsyncClusterConnection ;
37+ import org .apache .hadoop .hbase .client .ClusterConnectionFactory ;
38+ import org .apache .hadoop .hbase .security .User ;
39+ import org .apache .hadoop .hbase .security .UserProvider ;
40+ import org .apache .hadoop .hbase .util .FutureUtils ;
3141import org .apache .hadoop .hbase .zookeeper .ZKClusterId ;
42+ import org .apache .hadoop .hbase .zookeeper .ZKListener ;
3243import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
3344import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
45+ import org .apache .yetus .audience .InterfaceAudience ;
3446import org .apache .zookeeper .KeeperException ;
3547import org .apache .zookeeper .KeeperException .AuthFailedException ;
3648import org .apache .zookeeper .KeeperException .ConnectionLossException ;
3749import org .apache .zookeeper .KeeperException .SessionExpiredException ;
3850import org .slf4j .Logger ;
3951import org .slf4j .LoggerFactory ;
4052
53+ import org .apache .hbase .thirdparty .com .google .protobuf .ServiceException ;
54+
55+ import org .apache .hadoop .hbase .shaded .protobuf .ProtobufUtil ;
56+ import org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProtos .ListReplicationSinkServersRequest ;
57+ import org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProtos .ListReplicationSinkServersResponse ;
58+ import org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProtos .MasterService ;
59+
4160/**
4261 * A {@link BaseReplicationEndpoint} for replication endpoints whose
4362 * target cluster is an HBase cluster.
@@ -48,15 +67,39 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
4867
4968 private static final Logger LOG = LoggerFactory .getLogger (HBaseReplicationEndpoint .class );
5069
70+ public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
71+ "hbase.replication.fetch.servers.usezk" ;
72+
73+ public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
74+ "hbase.replication.fetch.servers.interval" ;
75+ public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000 ; // 10 mins
76+
5177 private ZKWatcher zkw = null ;
5278
5379 private List <ServerName > regionServers = new ArrayList <>(0 );
5480 private long lastRegionServerUpdate ;
81+ private AsyncClusterConnection peerConnection ;
82+ private boolean fetchServersUseZk = false ;
83+ private FetchServersChore fetchServersChore ;
84+ private int shortOperationTimeout ;
5585
5686 protected synchronized void disconnect () {
5787 if (zkw != null ) {
5888 zkw .close ();
5989 }
90+ if (fetchServersChore != null ) {
91+ ChoreService choreService = ctx .getServer ().getChoreService ();
92+ if (null != choreService ) {
93+ choreService .cancelChore (fetchServersChore );
94+ }
95+ }
96+ if (peerConnection != null ) {
97+ try {
98+ peerConnection .close ();
99+ } catch (IOException e ) {
100+ LOG .warn ("Attempt to close peerConnection failed." , e );
101+ }
102+ }
60103 }
61104
62105 /**
@@ -87,8 +130,27 @@ public void stop() {
87130 }
88131
89132 @ Override
90- protected void doStart () {
133+ protected synchronized void doStart () {
134+ this .shortOperationTimeout = ctx .getLocalConfiguration ().getInt (
135+ HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY , DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT );
91136 try {
137+ if (ctx .getLocalConfiguration ().getBoolean (FETCH_SERVERS_USE_ZK_CONF_KEY , false )) {
138+ fetchServersUseZk = true ;
139+ } else {
140+ try {
141+ if (ReplicationUtils .isPeerClusterSupportReplicationOffload (getPeerConnection ())) {
142+ fetchServersChore = new FetchServersChore (ctx .getServer (), this );
143+ ctx .getServer ().getChoreService ().scheduleChore (fetchServersChore );
144+ fetchServersUseZk = false ;
145+ } else {
146+ fetchServersUseZk = true ;
147+ }
148+ } catch (Throwable t ) {
149+ fetchServersUseZk = true ;
150+ LOG .warn ("Peer {} try to fetch servers by admin failed. Using zk impl." ,
151+ ctx .getPeerId (), t );
152+ }
153+ }
92154 reloadZkWatcher ();
93155 notifyStarted ();
94156 } catch (IOException e ) {
@@ -130,10 +192,14 @@ protected synchronized ZKWatcher getZkw() {
130192 * @throws IOException If anything goes wrong connecting
131193 */
132194 synchronized void reloadZkWatcher () throws IOException {
133- if (zkw != null ) zkw .close ();
195+ if (zkw != null ) {
196+ zkw .close ();
197+ }
134198 zkw = new ZKWatcher (ctx .getConfiguration (),
135199 "connection to cluster: " + ctx .getPeerId (), this );
136- getZkw ().registerListener (new PeerRegionServerListener (this ));
200+ if (fetchServersUseZk ) {
201+ getZkw ().registerListener (new PeerRegionServerListener (this ));
202+ }
137203 }
138204
139205 @ Override
@@ -148,15 +214,48 @@ public boolean isAborted() {
148214 return false ;
149215 }
150216
217+ /**
218+ * Get the connection to peer cluster
219+ * @return connection to peer cluster
220+ * @throws IOException If anything goes wrong connecting
221+ */
222+ protected synchronized AsyncClusterConnection getPeerConnection () throws IOException {
223+ if (peerConnection == null ) {
224+ Configuration conf = ctx .getConfiguration ();
225+ peerConnection = ClusterConnectionFactory .createAsyncClusterConnection (conf , null ,
226+ UserProvider .instantiate (conf ).getCurrent ());
227+ }
228+ return peerConnection ;
229+ }
230+
231+ /**
232+ * Get the list of all the servers that are responsible for replication sink
233+ * from the specified peer master
234+ * @return list of server addresses or an empty list if the slave is unavailable
235+ */
236+ protected List <ServerName > fetchSlavesAddresses () throws IOException {
237+ AsyncClusterConnection peerConn = getPeerConnection ();
238+ ServerName master = FutureUtils .get (peerConn .getAdmin ().getMaster ());
239+ MasterService .BlockingInterface masterStub = MasterService .newBlockingStub (
240+ peerConn .getRpcClient ()
241+ .createBlockingRpcChannel (master , User .getCurrent (), shortOperationTimeout ));
242+ try {
243+ ListReplicationSinkServersResponse resp = masterStub .listReplicationSinkServers (null ,
244+ ListReplicationSinkServersRequest .newBuilder ().build ());
245+ return ProtobufUtil .toServerNameList (resp .getServerNameList ());
246+ } catch (ServiceException se ) {
247+ throw ProtobufUtil .getRemoteException (se );
248+ }
249+ }
250+
151251 /**
152252 * Get the list of all the region servers from the specified peer
153- * @param zkw zk connection to use
154253 * @return list of region server addresses or an empty list if the slave is unavailable
155254 */
156- protected static List <ServerName > fetchSlavesAddresses ( ZKWatcher zkw )
157- throws KeeperException {
158- List <String > children = ZKUtil .listChildrenAndWatchForNewChildren (zkw ,
159- zkw .getZNodePaths ().rsZNode );
255+ protected List <ServerName > fetchSlavesAddressesByZK () throws KeeperException {
256+ ZKWatcher zk = getZkw ();
257+ List <String > children = ZKUtil .listChildrenAndWatchForNewChildren (zk ,
258+ zk .getZNodePaths ().rsZNode );
160259 if (children == null ) {
161260 return Collections .emptyList ();
162261 }
@@ -168,22 +267,31 @@ protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
168267 }
169268
170269 /**
171- * Get a list of all the addresses of all the available region servers
172- * for this peer cluster, or an empty list if no region servers available at peer cluster.
270+ * Get a list of all the addresses of all the available servers that are responsible for
271+ * replication sink for this peer cluster, or an empty list if no servers available at peer
272+ * cluster.
173273 * @return list of addresses
174274 */
175275 // Synchronize peer cluster connection attempts to avoid races and rate
176276 // limit connections when multiple replication sources try to connect to
177277 // the peer cluster. If the peer cluster is down we can get out of control
178278 // over time.
179279 public synchronized List <ServerName > getRegionServers () {
180- try {
181- setRegionServers (fetchSlavesAddresses (this .getZkw ()));
182- } catch (KeeperException ke ) {
183- if (LOG .isDebugEnabled ()) {
184- LOG .debug ("Fetch slaves addresses failed" , ke );
280+ if (fetchServersUseZk ) {
281+ try {
282+ setRegionServers (fetchSlavesAddressesByZK ());
283+ } catch (KeeperException ke ) {
284+ if (LOG .isDebugEnabled ()) {
285+ LOG .debug ("Fetch slaves addresses failed" , ke );
286+ }
287+ reconnect (ke );
288+ }
289+ } else {
290+ try {
291+ setRegionServers (fetchSlavesAddresses ());
292+ } catch (IOException e ) {
293+ LOG .warn ("Fetch slaves addresses failed" , e );
185294 }
186- reconnect (ke );
187295 }
188296 return regionServers ;
189297 }
@@ -225,11 +333,35 @@ public synchronized void nodeChildrenChanged(String path) {
225333 if (path .equals (regionServerListNode )) {
226334 try {
227335 LOG .info ("Detected change to peer region servers, fetching updated list" );
228- replicationEndpoint .setRegionServers (fetchSlavesAddresses ( replicationEndpoint .getZkw () ));
336+ replicationEndpoint .setRegionServers (replicationEndpoint .fetchSlavesAddressesByZK ( ));
229337 } catch (KeeperException e ) {
230338 LOG .error ("Error reading slave addresses" , e );
231339 }
232340 }
233341 }
234342 }
343+
344+ /**
345+ * Chore that will fetch the list of servers from peer master.
346+ */
347+ public static class FetchServersChore extends ScheduledChore {
348+
349+ private HBaseReplicationEndpoint endpoint ;
350+
351+ public FetchServersChore (Server server , HBaseReplicationEndpoint endpoint ) {
352+ super ("Peer-" + endpoint .ctx .getPeerId () + "-FetchServersChore" , server ,
353+ server .getConfiguration ().getInt (FETCH_SERVERS_INTERVAL_CONF_KEY ,
354+ DEFAULT_FETCH_SERVERS_INTERVAL ));
355+ this .endpoint = endpoint ;
356+ }
357+
358+ @ Override
359+ protected void chore () {
360+ try {
361+ endpoint .setRegionServers (endpoint .fetchSlavesAddresses ());
362+ } catch (Throwable t ) {
363+ LOG .error ("Peer {} fetches servers failed" , endpoint .ctx .getPeerId (), t );
364+ }
365+ }
366+ }
235367}
0 commit comments