From 25ef3826060816b8a18887da58642fdec2553d7f Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Tue, 9 Apr 2019 12:46:47 -0700 Subject: [PATCH 01/12] HDDS-1406. Avoid using of commonPool in RatisPipelineUtils. --- .../hdds/scm/pipeline/RatisPipelineUtils.java | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 0af34fb8563ca..e4b949ae85c3b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -41,6 +41,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ForkJoinPool; /** * Utility class for Ratis pipelines. Contains methods to create and destroy @@ -51,6 +52,15 @@ final class RatisPipelineUtils { private static final Logger LOG = LoggerFactory.getLogger(RatisPipelineUtils.class); + // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. + private static int parallelismForPool = + (Runtime.getRuntime().availableProcessors() > 3) ? 3 : + Runtime.getRuntime().availableProcessors(); + + private static ForkJoinPool pool = new ForkJoinPool(parallelismForPool); + + + private RatisPipelineUtils() { } @@ -146,18 +156,21 @@ private static void callRatisRpc(List datanodes, SecurityConfig(ozoneConf)); final TimeDuration requestTimeout = RatisHelper.getClientRequestTimeout(ozoneConf); - datanodes.parallelStream().forEach(d -> { - final RaftPeer p = RatisHelper.toRaftPeer(d); - try (RaftClient client = RatisHelper - .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) { - rpc.accept(client, p); - } catch (IOException ioe) { - String errMsg = - "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid(); - LOG.error(errMsg, ioe); - exceptions.add(new IOException(errMsg, ioe)); - } + pool.submit(() -> { + datanodes.parallelStream().forEach(d -> { + final RaftPeer p = RatisHelper.toRaftPeer(d); + try (RaftClient client = RatisHelper + .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, + retryPolicy, maxOutstandingRequests, tlsConfig, + requestTimeout)) { + rpc.accept(client, p); + } catch (IOException ioe) { + String errMsg = + "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid(); + LOG.error(errMsg, ioe); + exceptions.add(new IOException(errMsg, ioe)); + } + }); }); if (!exceptions.isEmpty()) { throw MultipleIOException.createIOException(exceptions); From 0a72fc6604a440952a081ecf6514ee6b5bcf38c6 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Thu, 11 Apr 2019 10:10:30 -0700 Subject: [PATCH 02/12] fix review comments --- .../hdds/scm/pipeline/RatisPipelineUtils.java | 53 ++++++++++++------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index e4b949ae85c3b..08ba262ffeee0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -41,7 +41,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; /** * Utility class for Ratis pipelines. Contains methods to create and destroy @@ -53,12 +55,21 @@ final class RatisPipelineUtils { LoggerFactory.getLogger(RatisPipelineUtils.class); // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. - private static int parallelismForPool = + private final static int PARALLELISIM_FOR_POOL = (Runtime.getRuntime().availableProcessors() > 3) ? 3 : Runtime.getRuntime().availableProcessors(); - private static ForkJoinPool pool = new ForkJoinPool(parallelismForPool); + private final static ForkJoinPool.ForkJoinWorkerThreadFactory FACTORY = + (forkJoinPool -> { + final ForkJoinWorkerThread worker = ForkJoinPool. + defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool); + worker.setName("ratisCreatePipeline" + worker.getPoolIndex()); + return worker; + }); + + private final static ForkJoinPool POOL = new ForkJoinPool( + PARALLELISIM_FOR_POOL, FACTORY, null, false); private RatisPipelineUtils() { @@ -156,22 +167,28 @@ private static void callRatisRpc(List datanodes, SecurityConfig(ozoneConf)); final TimeDuration requestTimeout = RatisHelper.getClientRequestTimeout(ozoneConf); - pool.submit(() -> { - datanodes.parallelStream().forEach(d -> { - final RaftPeer p = RatisHelper.toRaftPeer(d); - try (RaftClient client = RatisHelper - .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy, maxOutstandingRequests, tlsConfig, - requestTimeout)) { - rpc.accept(client, p); - } catch (IOException ioe) { - String errMsg = - "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid(); - LOG.error(errMsg, ioe); - exceptions.add(new IOException(errMsg, ioe)); - } - }); - }); + try { + POOL.submit(() -> { + datanodes.parallelStream().forEach(d -> { + final RaftPeer p = RatisHelper.toRaftPeer(d); + try (RaftClient client = RatisHelper + .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, + retryPolicy, maxOutstandingRequests, tlsConfig, + requestTimeout)) { + rpc.accept(client, p); + } catch (IOException ioe) { + String errMsg = + "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid(); + LOG.error(errMsg, ioe); + exceptions.add(new IOException(errMsg, ioe)); + } + }); + }).get(); + } catch (ExecutionException ex) { + LOG.error("Execution exception occurred during createPipeline", ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } if (!exceptions.isEmpty()) { throw MultipleIOException.createIOException(exceptions); } From 457190b5d667a4e48e71b897a790bd99cf063045 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Fri, 12 Apr 2019 10:38:14 -0700 Subject: [PATCH 03/12] fix review comments --- .../hadoop/hdds/scm/pipeline/RatisPipelineUtils.java | 11 ++++------- .../hdds/scm/server/StorageContainerManager.java | 4 ++++ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 08ba262ffeee0..e0fba6cc8ed98 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -49,18 +49,15 @@ * Utility class for Ratis pipelines. Contains methods to create and destroy * ratis pipelines. */ -final class RatisPipelineUtils { +public final class RatisPipelineUtils { private static final Logger LOG = LoggerFactory.getLogger(RatisPipelineUtils.class); // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. - private final static int PARALLELISIM_FOR_POOL = - (Runtime.getRuntime().availableProcessors() > 3) ? 3 : - Runtime.getRuntime().availableProcessors(); + private static final int PARALLELISIM_FOR_POOL = 3; - - private final static ForkJoinPool.ForkJoinWorkerThreadFactory FACTORY = + private static final ForkJoinPool.ForkJoinWorkerThreadFactory FACTORY = (forkJoinPool -> { final ForkJoinWorkerThread worker = ForkJoinPool. defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool); @@ -68,7 +65,7 @@ final class RatisPipelineUtils { return worker; }); - private final static ForkJoinPool POOL = new ForkJoinPool( + public static final ForkJoinPool POOL = new ForkJoinPool( PARALLELISIM_FOR_POOL, FACTORY, null, false); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 8c4a514649cc0..e7b60f929ece2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler; +import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer; @@ -1017,6 +1018,9 @@ public void stop() { } catch (Exception ex) { LOG.error("SCM Metadata store stop failed", ex); } + + // shutdown RatisPipelineUtils pool. + RatisPipelineUtils.POOL.shutdown(); } /** From e03e4891e2c8d92a897bcdb713ad1bd59fcee91c Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Thu, 25 Apr 2019 15:39:19 -0700 Subject: [PATCH 04/12] fix review comments --- .../hadoop/hdds/scm/pipeline/RatisPipelineUtils.java | 12 +++++++++++- .../hdds/scm/server/StorageContainerManager.java | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index e0fba6cc8ed98..e6dc7c6c66e5a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -44,6 +44,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.RejectedExecutionException; /** * Utility class for Ratis pipelines. Contains methods to create and destroy @@ -183,8 +184,17 @@ private static void callRatisRpc(List datanodes, }).get(); } catch (ExecutionException ex) { LOG.error("Execution exception occurred during createPipeline", ex); - } catch (InterruptedException ex) { + throw new IOException("Execution exception occurred during " + + "createPipeline", ex); + } catch (RejectedExecutionException ex) { + LOG.error("RejectedExecutionException, occurred during " + + "createPipeline", ex); + throw new IOException("RejectedExecutionException occurred during " + + "createPipeline", ex); + } catch(InterruptedException ex) { Thread.currentThread().interrupt(); + throw new IOException("Interrupt exception occurred during " + + "createPipeline", ex); } if (!exceptions.isEmpty()) { throw MultipleIOException.createIOException(exceptions); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index e7b60f929ece2..2e0dc67ccea6d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -1020,7 +1020,7 @@ public void stop() { } // shutdown RatisPipelineUtils pool. - RatisPipelineUtils.POOL.shutdown(); + RatisPipelineUtils.POOL.shutdownNow(); } /** From 3eaca7ef694c92c0e82ddfe551cd4b5fceef80af Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Thu, 25 Apr 2019 16:29:47 -0700 Subject: [PATCH 05/12] fix review comment --- .../hadoop/hdds/scm/pipeline/RatisPipelineUtils.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index e6dc7c6c66e5a..6365b638679b2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -182,15 +182,11 @@ private static void callRatisRpc(List datanodes, } }); }).get(); - } catch (ExecutionException ex) { - LOG.error("Execution exception occurred during createPipeline", ex); - throw new IOException("Execution exception occurred during " + - "createPipeline", ex); - } catch (RejectedExecutionException ex) { - LOG.error("RejectedExecutionException, occurred during " + - "createPipeline", ex); - throw new IOException("RejectedExecutionException occurred during " + + } catch (ExecutionException | RejectedExecutionException ex) { + LOG.error(ex.getClass().getName() + " exception occurred during " + "createPipeline", ex); + throw new IOException(ex.getClass().getName() + " exception occurred " + + "during createPipeline", ex); } catch(InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException("Interrupt exception occurred during " + From 9c0fa8a35b5b6cbcef9e996925f0355273bb8285 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Thu, 9 May 2019 17:11:09 -0700 Subject: [PATCH 06/12] fix review comments --- .../hdds/scm/pipeline/PipelineFactory.java | 4 ++-- .../scm/pipeline/RatisPipelineProvider.java | 14 ++++++++++-- .../hdds/scm/pipeline/RatisPipelineUtils.java | 22 +++++++++---------- .../hdds/scm/pipeline/SCMPipelineManager.java | 12 ++++++++-- .../scm/server/StorageContainerManager.java | 8 +++++-- .../scm/pipeline/TestRatisPipelineUtils.java | 5 ++++- 6 files changed, 45 insertions(+), 20 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index 89349761bfcad..af9fe97e16f19 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -38,12 +38,12 @@ public final class PipelineFactory { private Map providers; PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager, - Configuration conf) { + Configuration conf, RatisPipelineUtils ratisPipelineUtils) { providers = new HashMap<>(); providers.put(ReplicationType.STAND_ALONE, new SimplePipelineProvider(nodeManager)); providers.put(ReplicationType.RATIS, - new RatisPipelineProvider(nodeManager, stateManager, conf)); + new RatisPipelineProvider(nodeManager, stateManager, conf, ratisPipelineUtils)); } @VisibleForTesting diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index df21420be1d67..190f8ea245a9d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -45,12 +46,21 @@ public class RatisPipelineProvider implements PipelineProvider { private final NodeManager nodeManager; private final PipelineStateManager stateManager; private final Configuration conf; + private final RatisPipelineUtils ratisPipelineUtils; RatisPipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager, Configuration conf) { + PipelineStateManager stateManager, Configuration conf, + RatisPipelineUtils ratisPipelineUtils) { this.nodeManager = nodeManager; this.stateManager = stateManager; this.conf = conf; + this.ratisPipelineUtils = ratisPipelineUtils; + } + + @VisibleForTesting + RatisPipelineProvider(NodeManager nodeManager, + PipelineStateManager stateManager, Configuration conf) { + this(nodeManager, stateManager, conf, null); } /** @@ -134,6 +144,6 @@ public Pipeline create(ReplicationFactor factor, } protected void initializePipeline(Pipeline pipeline) throws IOException { - RatisPipelineUtils.createPipeline(pipeline, conf); + ratisPipelineUtils.createPipeline(pipeline, conf); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 6365b638679b2..115c63c2fd155 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -56,9 +56,9 @@ public final class RatisPipelineUtils { LoggerFactory.getLogger(RatisPipelineUtils.class); // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. - private static final int PARALLELISIM_FOR_POOL = 3; + private final int parallelisimForPool = 3; - private static final ForkJoinPool.ForkJoinWorkerThreadFactory FACTORY = + private final ForkJoinPool.ForkJoinWorkerThreadFactory factory = (forkJoinPool -> { final ForkJoinWorkerThread worker = ForkJoinPool. defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool); @@ -66,12 +66,8 @@ public final class RatisPipelineUtils { return worker; }); - public static final ForkJoinPool POOL = new ForkJoinPool( - PARALLELISIM_FOR_POOL, FACTORY, null, false); - - - private RatisPipelineUtils() { - } + public final ForkJoinPool forkJoinPool = new ForkJoinPool( + parallelisimForPool, factory, null, false); /** * Sends ratis command to create pipeline on all the datanodes. @@ -80,7 +76,7 @@ private RatisPipelineUtils() { * @param ozoneConf - Ozone Confinuration * @throws IOException if creation fails */ - public static void createPipeline(Pipeline pipeline, Configuration ozoneConf) + public void createPipeline(Pipeline pipeline, Configuration ozoneConf) throws IOException { final RaftGroup group = RatisHelper.newRaftGroup(pipeline); LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); @@ -145,7 +141,7 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, .groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId()); } - private static void callRatisRpc(List datanodes, + private void callRatisRpc(List datanodes, Configuration ozoneConf, CheckedBiConsumer rpc) throws IOException { @@ -166,7 +162,7 @@ private static void callRatisRpc(List datanodes, final TimeDuration requestTimeout = RatisHelper.getClientRequestTimeout(ozoneConf); try { - POOL.submit(() -> { + forkJoinPool.submit(() -> { datanodes.parallelStream().forEach(d -> { final RaftPeer p = RatisHelper.toRaftPeer(d); try (RaftClient client = RatisHelper @@ -196,4 +192,8 @@ private static void callRatisRpc(List datanodes, throw MultipleIOException.createIOException(exceptions); } } + + public void shutdown() { + forkJoinPool.shutdownNow(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index c72a52886c825..b6dec0517e17a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -83,11 +84,12 @@ public class SCMPipelineManager implements PipelineManager { private ObjectName pmInfoBean; public SCMPipelineManager(Configuration conf, NodeManager nodeManager, - EventPublisher eventPublisher) throws IOException { + EventPublisher eventPublisher, RatisPipelineUtils ratisPipelineUtils) throws IOException { this.lock = new ReentrantReadWriteLock(); this.conf = conf; this.stateManager = new PipelineStateManager(conf); - this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, conf); + this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, + conf, ratisPipelineUtils); // TODO: See if thread priority needs to be set for these threads scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); this.backgroundPipelineCreator = @@ -111,6 +113,12 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager, initializePipelineState(); } + @VisibleForTesting + public SCMPipelineManager(Configuration conf, NodeManager nodeManager, + EventPublisher eventPublisher) throws IOException { + this(conf, nodeManager, eventPublisher, null); + } + public PipelineStateManager getStateManager() { return stateManager; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 2e0dc67ccea6d..32cce78624bfc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -207,6 +207,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private final SafeModeHandler safeModeHandler; private SCMContainerMetrics scmContainerMetrics; + private RatisPipelineUtils ratisPipelineUtils; + /** * Creates a new StorageContainerManager. Configuration will be * updated with information on the actual listening addresses used @@ -399,8 +401,10 @@ private void initalizeSystemManagers(OzoneConfiguration conf, if (configurator.getPipelineManager() != null) { pipelineManager = configurator.getPipelineManager(); } else { + ratisPipelineUtils = new RatisPipelineUtils(); pipelineManager = - new SCMPipelineManager(conf, scmNodeManager, eventQueue); + new SCMPipelineManager(conf, scmNodeManager, eventQueue, + ratisPipelineUtils); } if (configurator.getContainerManager() != null) { @@ -1020,7 +1024,7 @@ public void stop() { } // shutdown RatisPipelineUtils pool. - RatisPipelineUtils.POOL.shutdownNow(); + ratisPipelineUtils.shutdown(); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java index b653e7a2b9284..7cfa9417a72e9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java @@ -97,12 +97,15 @@ public void testPipelineCreationOnNodeRestart() throws Exception { } // try creating another pipeline now + RatisPipelineUtils ratisPipelineUtils = new RatisPipelineUtils(); try { - RatisPipelineUtils.createPipeline(pipelines.get(0), conf); + ratisPipelineUtils.createPipeline(pipelines.get(0), conf); Assert.fail("pipeline creation should fail after shutting down pipeline"); } catch (IOException ioe) { // in case the pipeline creation fails, MultipleIOException is thrown Assert.assertTrue(ioe instanceof MultipleIOException); + } finally { + ratisPipelineUtils.shutdown(); } // make sure pipelines is destroyed From cd751ad82f3f9767fc3bf8b9e15420c78bb622d2 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Mon, 13 May 2019 10:36:21 -0700 Subject: [PATCH 07/12] fix review comments. --- .../hdds/scm/pipeline/PipelineFactory.java | 13 +- .../hdds/scm/pipeline/PipelineManager.java | 2 + .../hdds/scm/pipeline/PipelineProvider.java | 1 + .../scm/pipeline/RatisPipelineProvider.java | 134 ++++++++++++++++-- .../hdds/scm/pipeline/RatisPipelineUtils.java | 95 +------------ .../hdds/scm/pipeline/SCMPipelineManager.java | 15 +- .../scm/pipeline/SimplePipelineProvider.java | 5 + .../scm/server/StorageContainerManager.java | 8 +- .../pipeline/MockRatisPipelineProvider.java | 5 + ...=> TestRatisPipelineCreateAndDestory.java} | 10 +- 10 files changed, 162 insertions(+), 126 deletions(-) rename hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/{TestRatisPipelineUtils.java => TestRatisPipelineCreateAndDestory.java} (94%) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index af9fe97e16f19..866ebe190c926 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -38,12 +38,12 @@ public final class PipelineFactory { private Map providers; PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager, - Configuration conf, RatisPipelineUtils ratisPipelineUtils) { + Configuration conf) { providers = new HashMap<>(); providers.put(ReplicationType.STAND_ALONE, new SimplePipelineProvider(nodeManager)); providers.put(ReplicationType.RATIS, - new RatisPipelineProvider(nodeManager, stateManager, conf, ratisPipelineUtils)); + new RatisPipelineProvider(nodeManager, stateManager, conf)); } @VisibleForTesting @@ -61,4 +61,13 @@ public Pipeline create(ReplicationType type, ReplicationFactor factor, List nodes) { return providers.get(type).create(factor, nodes); } + + @VisibleForTesting + public PipelineProvider getProvider(ReplicationType type) { + return providers.get(type); + } + + public void shutdown() { + providers.values().forEach(provider -> provider.shutdown()); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 2793647b7f415..5b865e2f19b58 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -75,4 +75,6 @@ void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) void startPipelineCreator(); void triggerPipelineCreation(); + + PipelineFactory getPipelineFactory(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java index bb16533751177..a0ce216267237 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java @@ -33,4 +33,5 @@ public interface PipelineProvider { Pipeline create(ReplicationFactor factor, List nodes); + void shutdown(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 190f8ea245a9d..d846e9e01dc17 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -18,24 +18,45 @@ package org.apache.hadoop.hdds.scm.pipeline; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.function.CheckedBiConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.RejectedExecutionException; import java.util.stream.Collectors; /** @@ -43,25 +64,35 @@ */ public class RatisPipelineProvider implements PipelineProvider { + private static final Logger LOG = + LoggerFactory.getLogger(RatisPipelineProvider.class); + private final NodeManager nodeManager; private final PipelineStateManager stateManager; private final Configuration conf; - private final RatisPipelineUtils ratisPipelineUtils; + + // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. + private final int parallelisimForPool = 3; + + private final ForkJoinPool.ForkJoinWorkerThreadFactory factory = + (forkJoinPool -> { + final ForkJoinWorkerThread worker = ForkJoinPool. + defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool); + worker.setName("ratisCreatePipeline" + worker.getPoolIndex()); + return worker; + }); + + public final ForkJoinPool forkJoinPool = new ForkJoinPool( + parallelisimForPool, factory, null, false); + RatisPipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager, Configuration conf, - RatisPipelineUtils ratisPipelineUtils) { + PipelineStateManager stateManager, Configuration conf) { this.nodeManager = nodeManager; this.stateManager = stateManager; this.conf = conf; - this.ratisPipelineUtils = ratisPipelineUtils; } - @VisibleForTesting - RatisPipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager, Configuration conf) { - this(nodeManager, stateManager, conf, null); - } /** * Create pluggable container placement policy implementation instance. @@ -143,7 +174,88 @@ public Pipeline create(ReplicationFactor factor, .build(); } + + @Override + public void shutdown() { + forkJoinPool.shutdownNow(); + } + protected void initializePipeline(Pipeline pipeline) throws IOException { - ratisPipelineUtils.createPipeline(pipeline, conf); + createPipeline(pipeline); + } + + /** + * Sends ratis command to create pipeline on all the datanodes. + * + * @param pipeline - Pipeline to be created + * @throws IOException if creation fails + */ + public void createPipeline(Pipeline pipeline) + throws IOException { + if (pipeline.getFactor() == HddsProtos.ReplicationFactor.ONE) + return; + final RaftGroup group = RatisHelper.newRaftGroup(pipeline); + LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); + callRatisRpc(pipeline.getNodes(), + (raftClient, peer) -> { + RaftClientReply reply = raftClient.groupAdd(group, peer.getId()); + if (reply == null || !reply.isSuccess()) { + String msg = "Pipeline initialization failed for pipeline:" + + pipeline.getId() + " node:" + peer.getId(); + LOG.error(msg); + throw new IOException(msg); + } + }); + } + + private void callRatisRpc(List datanodes, + CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc) + throws IOException { + if (datanodes.isEmpty()) { + return; + } + + final String rpcType = conf + .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf); + final List< IOException > exceptions = + Collections.synchronizedList(new ArrayList<>()); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(conf); + final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new + SecurityConfig(conf)); + final TimeDuration requestTimeout = + RatisHelper.getClientRequestTimeout(conf); + try { + forkJoinPool.submit(() -> { + datanodes.parallelStream().forEach(d -> { + final RaftPeer p = RatisHelper.toRaftPeer(d); + try (RaftClient client = RatisHelper + .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, + retryPolicy, maxOutstandingRequests, tlsConfig, + requestTimeout)) { + rpc.accept(client, p); + } catch (IOException ioe) { + String errMsg = + "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid(); + LOG.error(errMsg, ioe); + exceptions.add(new IOException(errMsg, ioe)); + } + }); + }).get(); + } catch (ExecutionException | RejectedExecutionException ex) { + LOG.error(ex.getClass().getName() + " exception occurred during " + + "createPipeline", ex); + throw new IOException(ex.getClass().getName() + " exception occurred " + + "during createPipeline", ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupt exception occurred during " + + "createPipeline", ex); + } + if (!exceptions.isEmpty()) { + throw MultipleIOException.createIOException(exceptions); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 115c63c2fd155..59c6a4d6f1437 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.security.x509.SecurityConfig; @@ -54,44 +55,6 @@ public final class RatisPipelineUtils { private static final Logger LOG = LoggerFactory.getLogger(RatisPipelineUtils.class); - - // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. - private final int parallelisimForPool = 3; - - private final ForkJoinPool.ForkJoinWorkerThreadFactory factory = - (forkJoinPool -> { - final ForkJoinWorkerThread worker = ForkJoinPool. - defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool); - worker.setName("ratisCreatePipeline" + worker.getPoolIndex()); - return worker; - }); - - public final ForkJoinPool forkJoinPool = new ForkJoinPool( - parallelisimForPool, factory, null, false); - - /** - * Sends ratis command to create pipeline on all the datanodes. - * - * @param pipeline - Pipeline to be created - * @param ozoneConf - Ozone Confinuration - * @throws IOException if creation fails - */ - public void createPipeline(Pipeline pipeline, Configuration ozoneConf) - throws IOException { - final RaftGroup group = RatisHelper.newRaftGroup(pipeline); - LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); - callRatisRpc(pipeline.getNodes(), ozoneConf, - (raftClient, peer) -> { - RaftClientReply reply = raftClient.groupAdd(group, peer.getId()); - if (reply == null || !reply.isSuccess()) { - String msg = "Pipeline initialization failed for pipeline:" - + pipeline.getId() + " node:" + peer.getId(); - LOG.error(msg); - throw new IOException(msg); - } - }); - } - /** * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all * the datanodes. @@ -140,60 +103,4 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, client .groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId()); } - - private void callRatisRpc(List datanodes, - Configuration ozoneConf, - CheckedBiConsumer rpc) - throws IOException { - if (datanodes.isEmpty()) { - return; - } - - final String rpcType = ozoneConf - .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); - final List exceptions = - Collections.synchronizedList(new ArrayList<>()); - final int maxOutstandingRequests = - HddsClientUtils.getMaxOutstandingRequests(ozoneConf); - final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new - SecurityConfig(ozoneConf)); - final TimeDuration requestTimeout = - RatisHelper.getClientRequestTimeout(ozoneConf); - try { - forkJoinPool.submit(() -> { - datanodes.parallelStream().forEach(d -> { - final RaftPeer p = RatisHelper.toRaftPeer(d); - try (RaftClient client = RatisHelper - .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy, maxOutstandingRequests, tlsConfig, - requestTimeout)) { - rpc.accept(client, p); - } catch (IOException ioe) { - String errMsg = - "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid(); - LOG.error(errMsg, ioe); - exceptions.add(new IOException(errMsg, ioe)); - } - }); - }).get(); - } catch (ExecutionException | RejectedExecutionException ex) { - LOG.error(ex.getClass().getName() + " exception occurred during " + - "createPipeline", ex); - throw new IOException(ex.getClass().getName() + " exception occurred " + - "during createPipeline", ex); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupt exception occurred during " + - "createPipeline", ex); - } - if (!exceptions.isEmpty()) { - throw MultipleIOException.createIOException(exceptions); - } - } - - public void shutdown() { - forkJoinPool.shutdownNow(); - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index b6dec0517e17a..00a4fdb33895d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -84,12 +84,12 @@ public class SCMPipelineManager implements PipelineManager { private ObjectName pmInfoBean; public SCMPipelineManager(Configuration conf, NodeManager nodeManager, - EventPublisher eventPublisher, RatisPipelineUtils ratisPipelineUtils) throws IOException { + EventPublisher eventPublisher) throws IOException { this.lock = new ReentrantReadWriteLock(); this.conf = conf; this.stateManager = new PipelineStateManager(conf); this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, - conf, ratisPipelineUtils); + conf); // TODO: See if thread priority needs to be set for these threads scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); this.backgroundPipelineCreator = @@ -113,12 +113,6 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager, initializePipelineState(); } - @VisibleForTesting - public SCMPipelineManager(Configuration conf, NodeManager nodeManager, - EventPublisher eventPublisher) throws IOException { - this(conf, nodeManager, eventPublisher, null); - } - public PipelineStateManager getStateManager() { return stateManager; } @@ -354,6 +348,11 @@ public void triggerPipelineCreation() { backgroundPipelineCreator.triggerPipelineCreation(); } + @Override + public PipelineFactory getPipelineFactory() { + return pipelineFactory; + } + /** * Moves the pipeline to CLOSED state and sends close container command for * all the containers in the pipeline. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index 3e42df332682c..0ad27391cedd6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -72,4 +72,9 @@ public Pipeline create(ReplicationFactor factor, .setNodes(nodes) .build(); } + + @Override + public void shutdown() { + + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 32cce78624bfc..1f9cced1239fd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -207,8 +207,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private final SafeModeHandler safeModeHandler; private SCMContainerMetrics scmContainerMetrics; - private RatisPipelineUtils ratisPipelineUtils; - /** * Creates a new StorageContainerManager. Configuration will be * updated with information on the actual listening addresses used @@ -401,10 +399,8 @@ private void initalizeSystemManagers(OzoneConfiguration conf, if (configurator.getPipelineManager() != null) { pipelineManager = configurator.getPipelineManager(); } else { - ratisPipelineUtils = new RatisPipelineUtils(); pipelineManager = - new SCMPipelineManager(conf, scmNodeManager, eventQueue, - ratisPipelineUtils); + new SCMPipelineManager(conf, scmNodeManager, eventQueue); } if (configurator.getContainerManager() != null) { @@ -1024,7 +1020,7 @@ public void stop() { } // shutdown RatisPipelineUtils pool. - ratisPipelineUtils.shutdown(); + pipelineManager.getPipelineFactory().shutdown(); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index 22828046910e5..b4b472686d2bc 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -37,4 +37,9 @@ public MockRatisPipelineProvider(NodeManager nodeManager, protected void initializePipeline(Pipeline pipeline) throws IOException { // do nothing as the datanodes do not exists } + + @Override + public void shutdown() { + + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestory.java similarity index 94% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestory.java index 7cfa9417a72e9..b4ca103e43a87 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestory.java @@ -40,7 +40,7 @@ /** * Tests for RatisPipelineUtils. */ -public class TestRatisPipelineUtils { +public class TestRatisPipelineCreateAndDestory { private static MiniOzoneCluster cluster; private OzoneConfiguration conf = new OzoneConfiguration(); @@ -97,15 +97,15 @@ public void testPipelineCreationOnNodeRestart() throws Exception { } // try creating another pipeline now - RatisPipelineUtils ratisPipelineUtils = new RatisPipelineUtils(); + RatisPipelineProvider ratisPipelineProvider = (RatisPipelineProvider) + pipelineManager.getPipelineFactory().getProvider( + HddsProtos.ReplicationType.RATIS); try { - ratisPipelineUtils.createPipeline(pipelines.get(0), conf); + ratisPipelineProvider.createPipeline(pipelines.get(0)); Assert.fail("pipeline creation should fail after shutting down pipeline"); } catch (IOException ioe) { // in case the pipeline creation fails, MultipleIOException is thrown Assert.assertTrue(ioe instanceof MultipleIOException); - } finally { - ratisPipelineUtils.shutdown(); } // make sure pipelines is destroyed From 2ddb4cf4f20c2bc1f9f40e7cf5c5c14ab4503c7b Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Mon, 13 May 2019 10:43:17 -0700 Subject: [PATCH 08/12] fix minor comment --- .../apache/hadoop/hdds/scm/server/StorageContainerManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 1f9cced1239fd..ba7662cfd414b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -1019,7 +1019,7 @@ public void stop() { LOG.error("SCM Metadata store stop failed", ex); } - // shutdown RatisPipelineUtils pool. + // shutdown pipeline provider. pipelineManager.getPipelineFactory().shutdown(); } From 8505fcf30643c1fa61977b591ce92d18da14f838 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Mon, 13 May 2019 12:20:54 -0700 Subject: [PATCH 09/12] fix jenkins reported issues. --- .../scm/pipeline/RatisPipelineProvider.java | 9 +++------ .../hdds/scm/pipeline/RatisPipelineUtils.java | 17 +++++------------ .../hdds/scm/pipeline/SCMPipelineManager.java | 1 - .../scm/server/StorageContainerManager.java | 1 - 4 files changed, 8 insertions(+), 20 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index d846e9e01dc17..be7356b96f270 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -20,7 +20,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; @@ -75,14 +74,14 @@ public class RatisPipelineProvider implements PipelineProvider { private final int parallelisimForPool = 3; private final ForkJoinPool.ForkJoinWorkerThreadFactory factory = - (forkJoinPool -> { + (pool -> { final ForkJoinWorkerThread worker = ForkJoinPool. - defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool); + defaultForkJoinWorkerThreadFactory.newThread(pool); worker.setName("ratisCreatePipeline" + worker.getPoolIndex()); return worker; }); - public final ForkJoinPool forkJoinPool = new ForkJoinPool( + private final ForkJoinPool forkJoinPool = new ForkJoinPool( parallelisimForPool, factory, null, false); @@ -192,8 +191,6 @@ protected void initializePipeline(Pipeline pipeline) throws IOException { */ public void createPipeline(Pipeline pipeline) throws IOException { - if (pipeline.getFactor() == HddsProtos.ReplicationFactor.ONE) - return; final RaftGroup group = RatisHelper.newRaftGroup(pipeline); LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); callRatisRpc(pipeline.getNodes(), diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 59c6a4d6f1437..6d2f08b9ca50a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -17,35 +17,25 @@ */ package org.apache.hadoop.hdds.scm.pipeline; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.security.x509.SecurityConfig; -import org.apache.hadoop.io.MultipleIOException; import org.apache.ratis.RatisHelper; import org.apache.ratis.client.RaftClient; import org.apache.ratis.grpc.GrpcTlsConfig; -import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.function.CheckedBiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinWorkerThread; -import java.util.concurrent.RejectedExecutionException; /** * Utility class for Ratis pipelines. Contains methods to create and destroy @@ -55,6 +45,9 @@ public final class RatisPipelineUtils { private static final Logger LOG = LoggerFactory.getLogger(RatisPipelineUtils.class); + + private RatisPipelineUtils() { + } /** * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all * the datanodes. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 00a4fdb33895d..ed63c5b39a4ea 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.pipeline; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index ba7662cfd414b..133c2b8ba28ce 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -72,7 +72,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler; -import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer; From b2100fae96ad7d10ec10afa95f26c1896b4dc345 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Thu, 16 May 2019 11:42:12 -0700 Subject: [PATCH 10/12] fix review comments --- .../hadoop/hdds/scm/pipeline/PipelineManager.java | 2 -- .../hdds/scm/pipeline/RatisPipelineProvider.java | 13 +------------ .../hdds/scm/pipeline/SCMPipelineManager.java | 7 ++----- .../hdds/scm/pipeline/SimplePipelineProvider.java | 2 +- .../hdds/scm/server/StorageContainerManager.java | 3 --- .../scm/pipeline/MockRatisPipelineProvider.java | 2 +- .../pipeline/TestRatisPipelineCreateAndDestory.java | 12 +++++------- 7 files changed, 10 insertions(+), 31 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 5b865e2f19b58..2793647b7f415 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -75,6 +75,4 @@ void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) void startPipelineCreator(); void triggerPipelineCreation(); - - PipelineFactory getPipelineFactory(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index be7356b96f270..73005a2c46266 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -77,7 +77,7 @@ public class RatisPipelineProvider implements PipelineProvider { (pool -> { final ForkJoinWorkerThread worker = ForkJoinPool. defaultForkJoinWorkerThreadFactory.newThread(pool); - worker.setName("ratisCreatePipeline" + worker.getPoolIndex()); + worker.setName("RATISCREATEPIPELINE" + worker.getPoolIndex()); return worker; }); @@ -180,17 +180,6 @@ public void shutdown() { } protected void initializePipeline(Pipeline pipeline) throws IOException { - createPipeline(pipeline); - } - - /** - * Sends ratis command to create pipeline on all the datanodes. - * - * @param pipeline - Pipeline to be created - * @throws IOException if creation fails - */ - public void createPipeline(Pipeline pipeline) - throws IOException { final RaftGroup group = RatisHelper.newRaftGroup(pipeline); LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); callRatisRpc(pipeline.getNodes(), diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index ed63c5b39a4ea..bce396b6a56b9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -347,11 +347,6 @@ public void triggerPipelineCreation() { backgroundPipelineCreator.triggerPipelineCreation(); } - @Override - public PipelineFactory getPipelineFactory() { - return pipelineFactory; - } - /** * Moves the pipeline to CLOSED state and sends close container command for * all the containers in the pipeline. @@ -425,5 +420,7 @@ public void close() throws IOException { if(metrics != null) { metrics.unRegister(); } + // shutdown pipeline provider. + pipelineFactory.shutdown(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index 0ad27391cedd6..ab98dfa3ed7b5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -75,6 +75,6 @@ public Pipeline create(ReplicationFactor factor, @Override public void shutdown() { - + // Do nothing. } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 133c2b8ba28ce..8c4a514649cc0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -1017,9 +1017,6 @@ public void stop() { } catch (Exception ex) { LOG.error("SCM Metadata store stop failed", ex); } - - // shutdown pipeline provider. - pipelineManager.getPipelineFactory().shutdown(); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index b4b472686d2bc..32784a31deac3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -40,6 +40,6 @@ protected void initializePipeline(Pipeline pipeline) throws IOException { @Override public void shutdown() { - + // Do nothing. } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestory.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestory.java index b4ca103e43a87..9fd8aae0f0f13 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestory.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestory.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.test.GenericTestUtils; @@ -97,15 +96,14 @@ public void testPipelineCreationOnNodeRestart() throws Exception { } // try creating another pipeline now - RatisPipelineProvider ratisPipelineProvider = (RatisPipelineProvider) - pipelineManager.getPipelineFactory().getProvider( - HddsProtos.ReplicationType.RATIS); try { - ratisPipelineProvider.createPipeline(pipelines.get(0)); + pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); Assert.fail("pipeline creation should fail after shutting down pipeline"); } catch (IOException ioe) { - // in case the pipeline creation fails, MultipleIOException is thrown - Assert.assertTrue(ioe instanceof MultipleIOException); + // As now all datanodes are shutdown, they move to stale state, there + // will be no sufficient datanodes to create the pipeline. + Assert.assertTrue(ioe instanceof InsufficientDatanodesException); } // make sure pipelines is destroyed From e5bb220d6b2516a45dc801612cf85e28132380f5 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Thu, 16 May 2019 11:43:48 -0700 Subject: [PATCH 11/12] fix review comments --- .../org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index 866ebe190c926..cec688c1a8e36 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -62,11 +62,6 @@ public Pipeline create(ReplicationType type, ReplicationFactor factor, return providers.get(type).create(factor, nodes); } - @VisibleForTesting - public PipelineProvider getProvider(ReplicationType type) { - return providers.get(type); - } - public void shutdown() { providers.values().forEach(provider -> provider.shutdown()); } From c3d6890af6f39133fb413523cb198586c0cbe327 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Mon, 20 May 2019 20:15:32 -0700 Subject: [PATCH 12/12] fix review comments --- .../hdds/scm/pipeline/RatisPipelineProvider.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 73005a2c46266..d3b02e6253aaf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -56,6 +56,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -71,7 +72,7 @@ public class RatisPipelineProvider implements PipelineProvider { private final Configuration conf; // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. - private final int parallelisimForPool = 3; + private final int parallelismForPool = 3; private final ForkJoinPool.ForkJoinWorkerThreadFactory factory = (pool -> { @@ -82,7 +83,7 @@ public class RatisPipelineProvider implements PipelineProvider { }); private final ForkJoinPool forkJoinPool = new ForkJoinPool( - parallelisimForPool, factory, null, false); + parallelismForPool, factory, null, false); RatisPipelineProvider(NodeManager nodeManager, @@ -177,6 +178,12 @@ public Pipeline create(ReplicationFactor factor, @Override public void shutdown() { forkJoinPool.shutdownNow(); + try { + forkJoinPool.awaitTermination(60, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("Unexpected exception occurred during shutdown of " + + "RatisPipelineProvider", e); + } } protected void initializePipeline(Pipeline pipeline) throws IOException {