Skip to content

Commit a50ef3d

Browse files
lianchengJoshRosen
authored andcommitted
[SPARK-19529][BRANCH-1.6] Backport PR #16866 to branch-1.6
## What changes were proposed in this pull request? This PR backports PR #16866 to branch-1.6 ## How was this patch tested? Existing tests. Author: Cheng Lian <[email protected]> Closes #16917 from liancheng/spark-19529-1.6-backport.
1 parent e78138a commit a50ef3d

File tree

9 files changed

+31
-24
lines changed

9 files changed

+31
-24
lines changed

network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ public TransportClientFactory(
120120
*
121121
* Concurrency: This method is safe to call from multiple threads.
122122
*/
123-
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
123+
public TransportClient createClient(String remoteHost, int remotePort)
124+
throws IOException, InterruptedException {
124125
// Get connection from the connection pool first.
125126
// If it is not found or not active, create a new one.
126127
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
@@ -176,13 +177,14 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO
176177
* As with {@link #createClient(String, int)}, this method is blocking.
177178
*/
178179
public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
179-
throws IOException {
180+
throws IOException, InterruptedException {
180181
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
181182
return createClient(address);
182183
}
183184

184185
/** Create a completely new {@link TransportClient} to the remote address. */
185-
private TransportClient createClient(InetSocketAddress address) throws IOException {
186+
private TransportClient createClient(InetSocketAddress address)
187+
throws IOException, InterruptedException {
186188
logger.debug("Creating new connection to " + address);
187189

188190
Bootstrap bootstrap = new Bootstrap();
@@ -209,7 +211,7 @@ public void initChannel(SocketChannel ch) {
209211
// Connect to the remote server
210212
long preConnect = System.nanoTime();
211213
ChannelFuture cf = bootstrap.connect(address);
212-
if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
214+
if (!cf.await(conf.connectionTimeoutMs())) {
213215
throw new IOException(
214216
String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
215217
} else if (cf.cause() != null) {

network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ public void run() {
9999
clients.add(client);
100100
} catch (IOException e) {
101101
failed.incrementAndGet();
102+
} catch (InterruptedException e) {
103+
throw new RuntimeException(e);
102104
}
103105
}
104106
};
@@ -140,7 +142,7 @@ public void reuseClientsUpToConfigVariableConcurrent() throws Exception {
140142
}
141143

142144
@Test
143-
public void returnDifferentClientsForDifferentServers() throws IOException {
145+
public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException {
144146
TransportClientFactory factory = context.createClientFactory();
145147
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
146148
TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());
@@ -169,7 +171,7 @@ public void neverReturnInactiveClients() throws IOException, InterruptedExceptio
169171
}
170172

171173
@Test
172-
public void closeBlockClientsWithFactory() throws IOException {
174+
public void closeBlockClientsWithFactory() throws IOException, InterruptedException {
173175
TransportClientFactory factory = context.createClientFactory();
174176
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
175177
TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void fetchBlocks(
101101
new RetryingBlockFetcher.BlockFetchStarter() {
102102
@Override
103103
public void createAndStart(String[] blockIds, BlockFetchingListener listener)
104-
throws IOException {
104+
throws IOException, InterruptedException {
105105
TransportClient client = clientFactory.createClient(host, port);
106106
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
107107
}
@@ -136,7 +136,7 @@ public void registerWithShuffleServer(
136136
String host,
137137
int port,
138138
String execId,
139-
ExecutorShuffleInfo executorInfo) throws IOException {
139+
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
140140
checkInit();
141141
TransportClient client = clientFactory.createUnmanagedClient(host, port);
142142
try {

network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public static interface BlockFetchStarter {
5757
* {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
5858
* issues.
5959
*/
60-
void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException;
60+
void createAndStart(String[] blockIds, BlockFetchingListener listener)
61+
throws IOException, InterruptedException;
6162
}
6263

6364
/** Shared executor service used for waiting and retrying. */

network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ public MesosExternalShuffleClient(
5353
super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
5454
}
5555

56-
public void registerDriverWithShuffleService(String host, int port) throws IOException {
56+
public void registerDriverWithShuffleService(String host, int port)
57+
throws IOException, InterruptedException {
5758
checkInit();
5859
ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer();
5960
TransportClient client = clientFactory.createClient(host, port);

network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void afterEach() {
102102
}
103103

104104
@Test
105-
public void testGoodClient() throws IOException {
105+
public void testGoodClient() throws IOException, InterruptedException {
106106
clientFactory = context.createClientFactory(
107107
Lists.<TransportClientBootstrap>newArrayList(
108108
new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
@@ -132,7 +132,7 @@ public void testBadClient() {
132132
}
133133

134134
@Test
135-
public void testNoSaslClient() throws IOException {
135+
public void testNoSaslClient() throws IOException, InterruptedException {
136136
clientFactory = context.createClientFactory(
137137
Lists.<TransportClientBootstrap>newArrayList());
138138

network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ public void testFetchNoServer() throws Exception {
273273
}
274274

275275
private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
276-
throws IOException {
276+
throws IOException, InterruptedException {
277277
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
278278
client.init(APP_ID);
279279
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),

network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void afterEach() {
6060
}
6161

6262
@Test
63-
public void testValid() throws IOException {
63+
public void testValid() throws IOException, InterruptedException {
6464
validate("my-app-id", "secret", false);
6565
}
6666

@@ -83,12 +83,13 @@ public void testBadSecret() {
8383
}
8484

8585
@Test
86-
public void testEncryption() throws IOException {
86+
public void testEncryption() throws IOException, InterruptedException {
8787
validate("my-app-id", "secret", true);
8888
}
8989

9090
/** Creates an ExternalShuffleClient and attempts to register with the server. */
91-
private void validate(String appId, String secretKey, boolean encrypt) throws IOException {
91+
private void validate(String appId, String secretKey, boolean encrypt)
92+
throws IOException, InterruptedException {
9293
ExternalShuffleClient client =
9394
new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt);
9495
client.init(appId);

network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void afterEach() {
6666
}
6767

6868
@Test
69-
public void testNoFailures() throws IOException {
69+
public void testNoFailures() throws IOException, InterruptedException {
7070
BlockFetchingListener listener = mock(BlockFetchingListener.class);
7171

7272
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -85,7 +85,7 @@ public void testNoFailures() throws IOException {
8585
}
8686

8787
@Test
88-
public void testUnrecoverableFailure() throws IOException {
88+
public void testUnrecoverableFailure() throws IOException, InterruptedException {
8989
BlockFetchingListener listener = mock(BlockFetchingListener.class);
9090

9191
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -104,7 +104,7 @@ public void testUnrecoverableFailure() throws IOException {
104104
}
105105

106106
@Test
107-
public void testSingleIOExceptionOnFirst() throws IOException {
107+
public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException {
108108
BlockFetchingListener listener = mock(BlockFetchingListener.class);
109109

110110
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -127,7 +127,7 @@ public void testSingleIOExceptionOnFirst() throws IOException {
127127
}
128128

129129
@Test
130-
public void testSingleIOExceptionOnSecond() throws IOException {
130+
public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException {
131131
BlockFetchingListener listener = mock(BlockFetchingListener.class);
132132

133133
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -149,7 +149,7 @@ public void testSingleIOExceptionOnSecond() throws IOException {
149149
}
150150

151151
@Test
152-
public void testTwoIOExceptions() throws IOException {
152+
public void testTwoIOExceptions() throws IOException, InterruptedException {
153153
BlockFetchingListener listener = mock(BlockFetchingListener.class);
154154

155155
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -177,7 +177,7 @@ public void testTwoIOExceptions() throws IOException {
177177
}
178178

179179
@Test
180-
public void testThreeIOExceptions() throws IOException {
180+
public void testThreeIOExceptions() throws IOException, InterruptedException {
181181
BlockFetchingListener listener = mock(BlockFetchingListener.class);
182182

183183
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -209,7 +209,7 @@ public void testThreeIOExceptions() throws IOException {
209209
}
210210

211211
@Test
212-
public void testRetryAndUnrecoverable() throws IOException {
212+
public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
213213
BlockFetchingListener listener = mock(BlockFetchingListener.class);
214214

215215
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -252,7 +252,7 @@ public void testRetryAndUnrecoverable() throws IOException {
252252
@SuppressWarnings("unchecked")
253253
private static void performInteractions(List<? extends Map<String, Object>> interactions,
254254
BlockFetchingListener listener)
255-
throws IOException {
255+
throws IOException, InterruptedException {
256256

257257
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
258258
BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);

0 commit comments

Comments
 (0)