2020import java .io .IOException ;
2121import java .time .Duration ;
2222import java .util .ArrayList ;
23+ import java .util .Calendar ;
2324import java .util .Collections ;
2425import java .util .List ;
2526import java .util .concurrent .atomic .AtomicLong ;
3334import org .apache .hadoop .hbase .conf .ConfigurationObserver ;
3435import org .apache .hadoop .hbase .conf .PropagatingConfigurationObserver ;
3536import org .apache .hadoop .hbase .master .MasterServices ;
37+ import org .apache .hadoop .hbase .regionserver .compactions .OffPeakHours ;
3638import org .apache .yetus .audience .InterfaceAudience ;
3739import org .slf4j .Logger ;
3840import org .slf4j .LoggerFactory ;
@@ -68,6 +70,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
6870 private long splitPlanCount ;
6971 private long mergePlanCount ;
7072 private final AtomicLong cumulativePlansSizeLimitMb ;
73+ private final OffPeakHours offPeakHours ;
7174
7275 RegionNormalizerWorker (final Configuration configuration , final MasterServices masterServices ,
7376 final RegionNormalizer regionNormalizer , final RegionNormalizerWorkQueue <TableName > workQueue ) {
@@ -81,6 +84,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
8184 this .defaultNormalizerTableLevel = extractDefaultNormalizerValue (configuration );
8285 this .cumulativePlansSizeLimitMb = new AtomicLong (
8386 configuration .getLong (CUMULATIVE_SIZE_LIMIT_MB_KEY , DEFAULT_CUMULATIVE_SIZE_LIMIT_MB ));
87+ this .offPeakHours = OffPeakHours .getInstance (configuration );
8488 }
8589
8690 private boolean extractDefaultNormalizerValue (final Configuration configuration ) {
@@ -186,6 +190,10 @@ public void run() {
186190 LOG .debug ("interrupt detected. terminating." );
187191 break ;
188192 }
193+ if (!offPeakHours .equals (OffPeakHours .DISABLED ) && !offPeakHours .isOffPeakHour ()) {
194+ sleepToNextHour ();
195+ continue ;
196+ }
189197 final TableName tableName ;
190198 try {
191199 tableName = workQueue .take ();
@@ -199,6 +207,22 @@ public void run() {
199207 }
200208 }
201209
210+ private void sleepToNextHour () {
211+ LOG .info ("offpeak hours is configured and we'll wait for offpeak hours to continue normalising." );
212+ Calendar now = Calendar .getInstance ();
213+ Calendar nextHour = (Calendar ) now .clone ();
214+ nextHour .add (Calendar .HOUR_OF_DAY , 1 );
215+ nextHour .set (Calendar .MINUTE , 0 );
216+ nextHour .set (Calendar .SECOND , 0 );
217+ nextHour .set (Calendar .MILLISECOND , 0 );
218+ try {
219+ Thread .sleep (nextHour .getTimeInMillis () - now .getTimeInMillis ());
220+ } catch (InterruptedException e ) {
221+ LOG .warn ("Interrupted while waiting to next hour." );
222+ e .printStackTrace ();
223+ }
224+ }
225+
202226 private List <NormalizationPlan > calculatePlans (final TableName tableName ) {
203227 if (masterServices .skipRegionManagementAction ("region normalizer" )) {
204228 return Collections .emptyList ();
0 commit comments