2323import java .util .Collections ;
2424import java .util .HashMap ;
2525import java .util .HashSet ;
26+ import java .util .Iterator ;
2627import java .util .List ;
2728import java .util .Map ;
2829import java .util .OptionalLong ;
3233import org .apache .hadoop .conf .Configuration ;
3334import org .apache .hadoop .hbase .Coprocessor ;
3435import org .apache .hadoop .hbase .DoNotRetryIOException ;
36+ import org .apache .hadoop .hbase .HConstants ;
3537import org .apache .hadoop .hbase .NamespaceDescriptor ;
3638import org .apache .hadoop .hbase .ServerName ;
3739import org .apache .hadoop .hbase .TableName ;
@@ -164,7 +166,7 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException
164166
165167
166168 private synchronized void init () throws IOException {
167- refresh ();
169+ refresh (false );
168170 serverEventsListenerThread .start ();
169171 masterServices .getServerManager ().registerListener (serverEventsListenerThread );
170172 failedOpenUpdaterThread = new FailedOpenUpdaterThread (masterServices .getConfiguration ());
@@ -356,9 +358,112 @@ private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
356358 return RSGroupInfoList ;
357359 }
358360
359- @ Override
360- public void refresh () throws IOException {
361- refresh (false );
361+ private void waitUntilSomeProcsDone (Set <Long > pendingProcIds ) {
362+ int size = pendingProcIds .size ();
363+ while (!masterServices .isStopped ()) {
364+ for (Iterator <Long > iter = pendingProcIds .iterator (); iter .hasNext ();) {
365+ long procId = iter .next ();
366+ if (masterServices .getMasterProcedureExecutor ().isFinished (procId )) {
367+ iter .remove ();
368+ }
369+ }
370+ if (pendingProcIds .size () < size ) {
371+ return ;
372+ }
373+ try {
374+ Thread .sleep (1000 );
375+ } catch (InterruptedException e ) {
376+ Thread .currentThread ().interrupt ();
377+ }
378+ }
379+ }
380+
381+ private void waitUntilMasterStarted () {
382+ while (!masterServices .isInitialized () && !masterServices .isStopped ()) {
383+ try {
384+ Thread .sleep (1000 );
385+ } catch (InterruptedException e ) {
386+ Thread .currentThread ().interrupt ();
387+ }
388+ }
389+ }
390+
391+ private void migrate (List <RSGroupInfo > groupList , int maxConcurrency ) {
392+ LOG .info ("Start migrating table rs group config" );
393+ waitUntilMasterStarted ();
394+ Set <Long > pendingProcIds = new HashSet <>();
395+ for (RSGroupInfo groupInfo : groupList ) {
396+ if (groupInfo .getName ().equals (RSGroupInfo .DEFAULT_GROUP )) {
397+ continue ;
398+ }
399+ SortedSet <TableName > failedTables = new TreeSet <>();
400+ for (TableName tableName : groupInfo .getTables ()) {
401+ LOG .info ("Migrating {} in group {}" , tableName , groupInfo .getName ());
402+ TableDescriptor oldTd ;
403+ try {
404+ oldTd = masterServices .getTableDescriptors ().get (tableName );
405+ } catch (IOException e ) {
406+ LOG .warn ("Failed to migrate {} in group {}" , tableName , groupInfo .getName (), e );
407+ failedTables .add (tableName );
408+ continue ;
409+ }
410+ if (oldTd == null ) {
411+ continue ;
412+ }
413+ if (oldTd .getRegionServerGroup ().isPresent ()) {
414+ // either we have already migrated it or that user has set the rs group with the new
415+ // code, skip.
416+ LOG .debug ("Skip migrating {} since it is already in group {}" , tableName ,
417+ oldTd .getRegionServerGroup ().get ());
418+ continue ;
419+ }
420+ TableDescriptor newTd = TableDescriptorBuilder .newBuilder (oldTd )
421+ .setRegionServerGroup (groupInfo .getName ()).build ();
422+ try {
423+ pendingProcIds .add (
424+ masterServices .modifyTable (tableName , newTd , HConstants .NO_NONCE , HConstants .NO_NONCE ));
425+ } catch (IOException e ) {
426+ LOG .warn ("Failed to migrate {} in group {}" , tableName , groupInfo .getName (), e );
427+ failedTables .add (tableName );
428+ continue ;
429+ }
430+ if (pendingProcIds .size () >= maxConcurrency ) {
431+ waitUntilSomeProcsDone (pendingProcIds );
432+ }
433+ }
434+ LOG .info ("Done migrating {}, failed tables {}" , groupInfo .getName (), failedTables );
435+ synchronized (RSGroupInfoManagerImpl .this ) {
436+ RSGroupInfo currentInfo = rsGroupMap .get (groupInfo .getName ());
437+ if (currentInfo != null ) {
438+ RSGroupInfo newInfo =
439+ new RSGroupInfo (currentInfo .getName (), currentInfo .getServers (), failedTables );
440+ Map <String , RSGroupInfo > newGroupMap = new HashMap <>(rsGroupMap );
441+ newGroupMap .put (groupInfo .getName (), newInfo );
442+ try {
443+ flushConfig (newGroupMap );
444+ } catch (IOException e ) {
445+ LOG .warn ("Failed to persist rs group" , e );
446+ }
447+ }
448+ }
449+ }
450+ LOG .info ("Done migrating table rs group info" );
451+ }
452+
453+ // Migrate the table rs group info from RSGroupInfo into the table descriptor
454+ // Notice that we do not want to block the initialize so this will be done in background, and
455+ // during the migrating, the rs group info maybe incomplete and cause region to be misplaced.
456+ private void migrate (List <RSGroupInfo > groupList ) {
457+ final int maxConcurrency = 8 ;
458+ Thread migrateThread = new Thread ("Migrate-RSGroup" ) {
459+
460+ @ Override
461+ public void run () {
462+ migrate (groupList , maxConcurrency );
463+ }
464+ };
465+ migrateThread .setDaemon (true );
466+ migrateThread .start ();
362467 }
363468
364469 /**
@@ -389,6 +494,7 @@ private synchronized void refresh(boolean forceOnline) throws IOException {
389494 }
390495 resetRSGroupMap (newGroupMap );
391496 updateCacheOfRSGroups (rsGroupMap .keySet ());
497+ migrate (groupList );
392498 }
393499
394500 private void flushConfigTable (Map <String , RSGroupInfo > groupMap ) throws IOException {
@@ -403,9 +509,9 @@ private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOExcept
403509 }
404510
405511 // populate puts
406- for (RSGroupInfo RSGroupInfo : groupMap .values ()) {
407- RSGroupProtos .RSGroupInfo proto = ProtobufUtil .toProtoGroupInfo (RSGroupInfo );
408- Put p = new Put (Bytes .toBytes (RSGroupInfo .getName ()));
512+ for (RSGroupInfo rsGroupInfo : groupMap .values ()) {
513+ RSGroupProtos .RSGroupInfo proto = ProtobufUtil .toProtoGroupInfo (rsGroupInfo );
514+ Put p = new Put (Bytes .toBytes (rsGroupInfo .getName ()));
409515 p .addColumn (META_FAMILY_BYTES , META_QUALIFIER_BYTES , proto .toByteArray ());
410516 mutations .add (p );
411517 }
0 commit comments