From f7d7c54a6b0316d66dd03b3d712df7d33f7cf5b9 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 13 Feb 2017 14:12:42 -0800 Subject: [PATCH] Backport PR #16866 to branch-1.6 --- .../network/client/TransportClientFactory.java | 10 ++++++---- .../network/TransportClientFactorySuite.java | 6 ++++-- .../network/shuffle/ExternalShuffleClient.java | 4 ++-- .../network/shuffle/RetryingBlockFetcher.java | 3 ++- .../mesos/MesosExternalShuffleClient.java | 3 ++- .../spark/network/sasl/SaslIntegrationSuite.java | 4 ++-- .../shuffle/ExternalShuffleIntegrationSuite.java | 2 +- .../shuffle/ExternalShuffleSecuritySuite.java | 7 ++++--- .../shuffle/RetryingBlockFetcherSuite.java | 16 ++++++++-------- 9 files changed, 31 insertions(+), 24 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 61bafc8380049..5b438b78814f9 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -120,7 +120,8 @@ public TransportClientFactory( * * Concurrency: This method is safe to call from multiple threads. */ - public TransportClient createClient(String remoteHost, int remotePort) throws IOException { + public TransportClient createClient(String remoteHost, int remotePort) + throws IOException, InterruptedException { // Get connection from the connection pool first. // If it is not found or not active, create a new one. final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); @@ -176,13 +177,14 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO * As with {@link #createClient(String, int)}, this method is blocking. */ public TransportClient createUnmanagedClient(String remoteHost, int remotePort) - throws IOException { + throws IOException, InterruptedException { final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); return createClient(address); } /** Create a completely new {@link TransportClient} to the remote address. */ - private TransportClient createClient(InetSocketAddress address) throws IOException { + private TransportClient createClient(InetSocketAddress address) + throws IOException, InterruptedException { logger.debug("Creating new connection to " + address); Bootstrap bootstrap = new Bootstrap(); @@ -209,7 +211,7 @@ public void initChannel(SocketChannel ch) { // Connect to the remote server long preConnect = System.nanoTime(); ChannelFuture cf = bootstrap.connect(address); - if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) { + if (!cf.await(conf.connectionTimeoutMs())) { throw new IOException( String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs())); } else if (cf.cause() != null) { diff --git a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java index dac7d4a5b0a09..6b77e34ed3d4a 100644 --- a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java +++ b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java @@ -99,6 +99,8 @@ public void run() { clients.add(client); } catch (IOException e) { failed.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } }; @@ -140,7 +142,7 @@ public void reuseClientsUpToConfigVariableConcurrent() throws Exception { } @Test - public void returnDifferentClientsForDifferentServers() throws IOException { + public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException { TransportClientFactory factory = context.createClientFactory(); TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); @@ -169,7 +171,7 @@ public void neverReturnInactiveClients() throws IOException, InterruptedExceptio } @Test - public void closeBlockClientsWithFactory() throws IOException { + public void closeBlockClientsWithFactory() throws IOException, InterruptedException { TransportClientFactory factory = context.createClientFactory(); TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 58ca87d9d3b13..fa6a06e1dc318 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -101,7 +101,7 @@ public void fetchBlocks( new RetryingBlockFetcher.BlockFetchStarter() { @Override public void createAndStart(String[] blockIds, BlockFetchingListener listener) - throws IOException { + throws IOException, InterruptedException { TransportClient client = clientFactory.createClient(host, port); new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start(); } @@ -136,7 +136,7 @@ public void registerWithShuffleServer( String host, int port, String execId, - ExecutorShuffleInfo executorInfo) throws IOException { + ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException { checkInit(); TransportClient client = clientFactory.createUnmanagedClient(host, port); try { diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index 4bb0498e5d5aa..44a515b7de736 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -57,7 +57,8 @@ public static interface BlockFetchStarter { * {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection * issues. */ - void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException; + void createAndStart(String[] blockIds, BlockFetchingListener listener) + throws IOException, InterruptedException; } /** Shared executor service used for waiting and retrying. */ diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 675820308bd4c..1eac2277cd420 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -53,7 +53,8 @@ public MesosExternalShuffleClient( super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled); } - public void registerDriverWithShuffleService(String host, int port) throws IOException { + public void registerDriverWithShuffleService(String host, int port) + throws IOException, InterruptedException { checkInit(); ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer(); TransportClient client = clientFactory.createClient(host, port); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 19c870aebb023..a94fd18361a1b 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -102,7 +102,7 @@ public void afterEach() { } @Test - public void testGoodClient() throws IOException { + public void testGoodClient() throws IOException, InterruptedException { clientFactory = context.createClientFactory( Lists.newArrayList( new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); @@ -132,7 +132,7 @@ public void testBadClient() { } @Test - public void testNoSaslClient() throws IOException { + public void testNoSaslClient() throws IOException, InterruptedException { clientFactory = context.createClientFactory( Lists.newArrayList()); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 2095f41d79c16..3551a96b49571 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -273,7 +273,7 @@ public void testFetchNoServer() throws Exception { } private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) - throws IOException { + throws IOException, InterruptedException { ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false); client.init(APP_ID); client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index 08ddb3755bd08..50a5c8e0b141f 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -60,7 +60,7 @@ public void afterEach() { } @Test - public void testValid() throws IOException { + public void testValid() throws IOException, InterruptedException { validate("my-app-id", "secret", false); } @@ -83,12 +83,13 @@ public void testBadSecret() { } @Test - public void testEncryption() throws IOException { + public void testEncryption() throws IOException, InterruptedException { validate("my-app-id", "secret", true); } /** Creates an ExternalShuffleClient and attempts to register with the server. */ - private void validate(String appId, String secretKey, boolean encrypt) throws IOException { + private void validate(String appId, String secretKey, boolean encrypt) + throws IOException, InterruptedException { ExternalShuffleClient client = new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt); client.init(appId); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 3a6ef0d3f8476..3e2c532e02a7e 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -66,7 +66,7 @@ public void afterEach() { } @Test - public void testNoFailures() throws IOException { + public void testNoFailures() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List> interactions = Arrays.asList( @@ -85,7 +85,7 @@ public void testNoFailures() throws IOException { } @Test - public void testUnrecoverableFailure() throws IOException { + public void testUnrecoverableFailure() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List> interactions = Arrays.asList( @@ -104,7 +104,7 @@ public void testUnrecoverableFailure() throws IOException { } @Test - public void testSingleIOExceptionOnFirst() throws IOException { + public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List> interactions = Arrays.asList( @@ -127,7 +127,7 @@ public void testSingleIOExceptionOnFirst() throws IOException { } @Test - public void testSingleIOExceptionOnSecond() throws IOException { + public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List> interactions = Arrays.asList( @@ -149,7 +149,7 @@ public void testSingleIOExceptionOnSecond() throws IOException { } @Test - public void testTwoIOExceptions() throws IOException { + public void testTwoIOExceptions() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List> interactions = Arrays.asList( @@ -177,7 +177,7 @@ public void testTwoIOExceptions() throws IOException { } @Test - public void testThreeIOExceptions() throws IOException { + public void testThreeIOExceptions() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List> interactions = Arrays.asList( @@ -209,7 +209,7 @@ public void testThreeIOExceptions() throws IOException { } @Test - public void testRetryAndUnrecoverable() throws IOException { + public void testRetryAndUnrecoverable() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List> interactions = Arrays.asList( @@ -252,7 +252,7 @@ public void testRetryAndUnrecoverable() throws IOException { @SuppressWarnings("unchecked") private static void performInteractions(List> interactions, BlockFetchingListener listener) - throws IOException { + throws IOException, InterruptedException { TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);