2626import java .util .Map ;
2727import java .util .Optional ;
2828import java .util .concurrent .ExecutionException ;
29- import java .util .concurrent .ForkJoinPool ;
30- import java .util .concurrent .ForkJoinTask ;
3129import java .util .concurrent .RecursiveTask ;
3230import java .util .concurrent .atomic .AtomicBoolean ;
33-
3431import org .apache .hadoop .conf .Configuration ;
3532import org .apache .hadoop .fs .FileStatus ;
3633import org .apache .hadoop .fs .FileSystem ;
3734import org .apache .hadoop .fs .Path ;
3835import org .apache .hadoop .fs .PathIsNotEmptyDirectoryException ;
3936import org .apache .hadoop .hbase .ScheduledChore ;
4037import org .apache .hadoop .hbase .Stoppable ;
41- import org .apache .hadoop .hbase .conf .ConfigurationObserver ;
4238import org .apache .hadoop .hbase .util .FSUtils ;
4339import org .apache .hadoop .ipc .RemoteException ;
4440import org .apache .yetus .audience .InterfaceAudience ;
5652 * Abstract Cleaner that uses a chain of delegates to clean a directory of files
5753 * @param <T> Cleaner delegate class that is dynamically loaded from configuration
5854 */
59- @ edu .umd .cs .findbugs .annotations .SuppressWarnings (value ="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" ,
60- justification ="Static pool will be only updated once." )
6155@ InterfaceAudience .Private
62- public abstract class CleanerChore <T extends FileCleanerDelegate > extends ScheduledChore
63- implements ConfigurationObserver {
56+ public abstract class CleanerChore <T extends FileCleanerDelegate > extends ScheduledChore {
6457
6558 private static final Logger LOG = LoggerFactory .getLogger (CleanerChore .class );
6659 private static final int AVAIL_PROCESSORS = Runtime .getRuntime ().availableProcessors ();
@@ -72,84 +65,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
7265 * while latter will use only 1 thread for chore to scan dir.
7366 */
7467 public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size" ;
75- private static final String DEFAULT_CHORE_POOL_SIZE = "0.25" ;
76-
77- private static class DirScanPool {
78- int size ;
79- ForkJoinPool pool ;
80- int cleanerLatch ;
81- AtomicBoolean reconfigNotification ;
82-
83- DirScanPool (Configuration conf ) {
84- String poolSize = conf .get (CHORE_POOL_SIZE , DEFAULT_CHORE_POOL_SIZE );
85- size = calculatePoolSize (poolSize );
86- // poolSize may be 0 or 0.0 from a careless configuration,
87- // double check to make sure.
88- size = size == 0 ? calculatePoolSize (DEFAULT_CHORE_POOL_SIZE ) : size ;
89- pool = new ForkJoinPool (size );
90- LOG .info ("Cleaner pool size is {}" , size );
91- reconfigNotification = new AtomicBoolean (false );
92- cleanerLatch = 0 ;
93- }
94-
95- /**
96- * Checks if pool can be updated. If so, mark for update later.
97- * @param conf configuration
98- */
99- synchronized void markUpdate (Configuration conf ) {
100- int newSize = calculatePoolSize (conf .get (CHORE_POOL_SIZE , DEFAULT_CHORE_POOL_SIZE ));
101- if (newSize == size ) {
102- LOG .trace ("Size from configuration is same as previous={}, no need to update." , newSize );
103- return ;
104- }
105- size = newSize ;
106- // Chore is working, update it later.
107- reconfigNotification .set (true );
108- }
109-
110- /**
111- * Update pool with new size.
112- */
113- synchronized void updatePool (long timeout ) {
114- long stopTime = System .currentTimeMillis () + timeout ;
115- while (cleanerLatch != 0 && timeout > 0 ) {
116- try {
117- wait (timeout );
118- timeout = stopTime - System .currentTimeMillis ();
119- } catch (InterruptedException ie ) {
120- Thread .currentThread ().interrupt ();
121- break ;
122- }
123- }
124- shutDownNow ();
125- LOG .info ("Update chore's pool size from {} to {}" , pool .getParallelism (), size );
126- pool = new ForkJoinPool (size );
127- }
68+ static final String DEFAULT_CHORE_POOL_SIZE = "0.25" ;
12869
129- synchronized void latchCountUp () {
130- cleanerLatch ++;
131- }
132-
133- synchronized void latchCountDown () {
134- cleanerLatch --;
135- notifyAll ();
136- }
137-
138- @ SuppressWarnings ("FutureReturnValueIgnored" )
139- synchronized void submit (ForkJoinTask task ) {
140- pool .submit (task );
141- }
142-
143- synchronized void shutDownNow () {
144- if (pool == null || pool .isShutdown ()) {
145- return ;
146- }
147- pool .shutdownNow ();
148- }
149- }
150- // It may be waste resources for each cleaner chore own its pool,
151- // so let's make pool for all cleaner chores.
152- private static volatile DirScanPool POOL ;
70+ private final DirScanPool pool ;
15371
15472 protected final FileSystem fs ;
15573 private final Path oldFileDir ;
@@ -158,22 +76,9 @@ synchronized void shutDownNow() {
15876 private final AtomicBoolean enabled = new AtomicBoolean (true );
15977 protected List <T > cleanersChain ;
16078
161- public static void initChorePool (Configuration conf ) {
162- if (POOL == null ) {
163- POOL = new DirScanPool (conf );
164- }
165- }
166-
167- public static void shutDownChorePool () {
168- if (POOL != null ) {
169- POOL .shutDownNow ();
170- POOL = null ;
171- }
172- }
173-
17479 public CleanerChore (String name , final int sleepPeriod , final Stoppable s , Configuration conf ,
175- FileSystem fs , Path oldFileDir , String confKey ) {
176- this (name , sleepPeriod , s , conf , fs , oldFileDir , confKey , null );
80+ FileSystem fs , Path oldFileDir , String confKey , DirScanPool pool ) {
81+ this (name , sleepPeriod , s , conf , fs , oldFileDir , confKey , pool , null );
17782 }
17883
17984 /**
@@ -184,14 +89,15 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
18489 * @param fs handle to the FS
18590 * @param oldFileDir the path to the archived files
18691 * @param confKey configuration key for the classes to instantiate
92+ * @param pool the thread pool used to scan directories
18793 * @param params members could be used in cleaner
18894 */
18995 public CleanerChore (String name , final int sleepPeriod , final Stoppable s , Configuration conf ,
190- FileSystem fs , Path oldFileDir , String confKey , Map <String , Object > params ) {
96+ FileSystem fs , Path oldFileDir , String confKey , DirScanPool pool , Map <String , Object > params ) {
19197 super (name , s , sleepPeriod );
19298
193- Preconditions .checkNotNull (POOL , "Chore's pool isn't initialized, please call"
194- + "CleanerChore.initChorePool(Configuration) before new a cleaner chore." ) ;
99+ Preconditions .checkNotNull (pool , "Chore's pool can not be null" );
100+ this . pool = pool ;
195101 this .fs = fs ;
196102 this .oldFileDir = oldFileDir ;
197103 this .conf = conf ;
@@ -255,11 +161,6 @@ private void initCleanerChain(String confKey) {
255161 }
256162 }
257163
258- @ Override
259- public void onConfigurationChange (Configuration conf ) {
260- POOL .markUpdate (conf );
261- }
262-
263164 /**
264165 * A utility method to create new instances of LogCleanerDelegate based on the class name of the
265166 * LogCleanerDelegate.
@@ -287,22 +188,20 @@ private T newFileCleaner(String className, Configuration conf) {
287188 protected void chore () {
288189 if (getEnabled ()) {
289190 try {
290- POOL .latchCountUp ();
191+ pool .latchCountUp ();
291192 if (runCleaner ()) {
292193 LOG .trace ("Cleaned all WALs under {}" , oldFileDir );
293194 } else {
294195 LOG .trace ("WALs outstanding under {}" , oldFileDir );
295196 }
296197 } finally {
297- POOL .latchCountDown ();
198+ pool .latchCountDown ();
298199 }
299200 // After each cleaner chore, checks if received reconfigure notification while cleaning.
300201 // First in cleaner turns off notification, to avoid another cleaner updating pool again.
301- if (POOL .reconfigNotification .compareAndSet (true , false )) {
302- // This cleaner is waiting for other cleaners finishing their jobs.
303- // To avoid missing next chore, only wait 0.8 * period, then shutdown.
304- POOL .updatePool ((long ) (0.8 * getTimeUnit ().toMillis (getPeriod ())));
305- }
202+ // This cleaner is waiting for other cleaners finishing their jobs.
203+ // To avoid missing next chore, only wait 0.8 * period, then shutdown.
204+ pool .tryUpdatePoolSize ((long ) (0.8 * getTimeUnit ().toMillis (getPeriod ())));
306205 } else {
307206 LOG .trace ("Cleaner chore disabled! Not cleaning." );
308207 }
@@ -315,7 +214,7 @@ private void preRunCleaner() {
315214 public Boolean runCleaner () {
316215 preRunCleaner ();
317216 CleanerTask task = new CleanerTask (this .oldFileDir , true );
318- POOL . submit (task );
217+ pool . execute (task );
319218 return task .join ();
320219 }
321220
@@ -447,7 +346,7 @@ public synchronized void cleanup() {
447346
448347 @ VisibleForTesting
449348 int getChorePoolSize () {
450- return POOL . size ;
349+ return pool . getSize () ;
451350 }
452351
453352 /**
@@ -465,10 +364,13 @@ private interface Action<T> {
465364 }
466365
467366 /**
468- * Attemps to clean up a directory, its subdirectories, and files.
469- * Return value is true if everything was deleted. false on partial / total failures.
367+ * Attemps to clean up a directory, its subdirectories, and files. Return value is true if
368+ * everything was deleted. false on partial / total failures.
470369 */
471- private class CleanerTask extends RecursiveTask <Boolean > {
370+ private final class CleanerTask extends RecursiveTask <Boolean > {
371+
372+ private static final long serialVersionUID = -5444212174088754172L ;
373+
472374 private final Path dir ;
473375 private final boolean root ;
474376
0 commit comments