Skip to content

Commit ec2cfeb

Browse files
artembilanspring-builds
authored andcommitted
Fix tests for executorService.shutdown()
Even if `Executors.newSingleThreadExecutor()` returns a `FinalizableDelegatedExecutorService`, an instance is kept in the memory until JVM exists. That may lead to memory leak since we have a lot of threads in memory. (cherry picked from commit fdac8f1)
1 parent 6a7581e commit ec2cfeb

File tree

13 files changed

+132
-78
lines changed

13 files changed

+132
-78
lines changed

spring-integration-core/src/test/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandlerTests.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
import java.time.Duration;
2020
import java.util.List;
2121
import java.util.Objects;
22+
import java.util.concurrent.ExecutorService;
2223
import java.util.concurrent.Executors;
2324
import java.util.stream.Collectors;
2425
import java.util.stream.IntStream;
2526

2627
import org.assertj.core.api.InstanceOfAssertFactories;
2728
import org.junit.jupiter.api.Test;
28-
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
2929
import reactor.core.publisher.Flux;
3030
import reactor.test.StepVerifier;
3131

@@ -156,27 +156,25 @@ void testCustomCombineFunction() {
156156
}
157157

158158
@Test
159-
@DisabledIfEnvironmentVariable(named = "bamboo_buildKey", matches = ".*?",
160-
disabledReason = "Timing is too short for CI")
161159
void testWindowTimespan() {
162160
QueueChannel resultChannel = new QueueChannel();
163161
FluxAggregatorMessageHandler fluxAggregatorMessageHandler = new FluxAggregatorMessageHandler();
164162
fluxAggregatorMessageHandler.setOutputChannel(resultChannel);
165163
fluxAggregatorMessageHandler.setWindowTimespan(Duration.ofMillis(100));
166164
fluxAggregatorMessageHandler.start();
167165

168-
Executors.newSingleThreadExecutor()
169-
.submit(() -> {
170-
for (int i = 0; i < 10; i++) {
171-
Message<?> messageToAggregate =
172-
MessageBuilder.withPayload(i)
173-
.setCorrelationId("1")
174-
.build();
175-
fluxAggregatorMessageHandler.handleMessage(messageToAggregate);
176-
Thread.sleep(20);
177-
}
178-
return null;
179-
});
166+
ExecutorService executorService = Executors.newSingleThreadExecutor();
167+
executorService.submit(() -> {
168+
for (int i = 0; i < 10; i++) {
169+
Message<?> messageToAggregate =
170+
MessageBuilder.withPayload(i)
171+
.setCorrelationId("1")
172+
.build();
173+
fluxAggregatorMessageHandler.handleMessage(messageToAggregate);
174+
Thread.sleep(20);
175+
}
176+
return null;
177+
});
180178

181179
Message<?> result = resultChannel.receive(10_000);
182180
assertThat(result).isNotNull();
@@ -211,6 +209,8 @@ void testWindowTimespan() {
211209
.doesNotContain(0, 1);
212210

213211
fluxAggregatorMessageHandler.stop();
212+
213+
executorService.shutdown();
214214
}
215215

216216
@Test

spring-integration-core/src/test/java/org/springframework/integration/channel/config/ThreadLocalChannelParserTests.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,21 +19,19 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121
import java.util.concurrent.CountDownLatch;
22-
import java.util.concurrent.Executor;
22+
import java.util.concurrent.ExecutorService;
2323
import java.util.concurrent.Executors;
2424
import java.util.concurrent.TimeUnit;
2525

26-
import org.junit.Test;
27-
import org.junit.runner.RunWith;
26+
import org.junit.jupiter.api.Test;
2827

2928
import org.springframework.beans.factory.annotation.Autowired;
3029
import org.springframework.beans.factory.annotation.Qualifier;
3130
import org.springframework.integration.config.TestChannelInterceptor;
3231
import org.springframework.messaging.PollableChannel;
3332
import org.springframework.messaging.support.GenericMessage;
3433
import org.springframework.test.annotation.DirtiesContext;
35-
import org.springframework.test.context.ContextConfiguration;
36-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
34+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3735

3836
import static org.assertj.core.api.Assertions.assertThat;
3937

@@ -42,8 +40,7 @@
4240
* @author Dave Syer
4341
* @author Artem Bilan
4442
*/
45-
@RunWith(SpringJUnit4ClassRunner.class)
46-
@ContextConfiguration
43+
@SpringJUnitConfig
4744
@DirtiesContext
4845
public class ThreadLocalChannelParserTests {
4946

@@ -58,28 +55,29 @@ public class ThreadLocalChannelParserTests {
5855

5956
@Test
6057
public void testSendInAnotherThread() throws Exception {
61-
simpleChannel.send(new GenericMessage<String>("test"));
62-
Executor otherThreadExecutor = Executors.newSingleThreadExecutor();
58+
simpleChannel.send(new GenericMessage<>("test"));
59+
ExecutorService otherThreadExecutor = Executors.newSingleThreadExecutor();
6360
final CountDownLatch latch = new CountDownLatch(1);
6461
otherThreadExecutor.execute(() -> {
65-
simpleChannel.send(new GenericMessage<String>("crap"));
62+
simpleChannel.send(new GenericMessage<>("crap"));
6663
latch.countDown();
6764
});
6865
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
6966
assertThat(simpleChannel.receive(10).getPayload()).isEqualTo("test");
7067
// Message sent on another thread is not collected here
71-
assertThat(simpleChannel.receive(10)).isEqualTo(null);
68+
assertThat(simpleChannel.receive(1)).isEqualTo(null);
69+
otherThreadExecutor.shutdown();
7270
}
7371

7472
@Test
7573
public void testReceiveInAnotherThread() throws Exception {
76-
simpleChannel.send(new GenericMessage<String>("test-1.1"));
77-
simpleChannel.send(new GenericMessage<String>("test-1.2"));
78-
simpleChannel.send(new GenericMessage<String>("test-1.3"));
79-
channelWithInterceptor.send(new GenericMessage<String>("test-2.1"));
80-
channelWithInterceptor.send(new GenericMessage<String>("test-2.2"));
81-
Executor otherThreadExecutor = Executors.newSingleThreadExecutor();
82-
final List<Object> otherThreadResults = new ArrayList<Object>();
74+
simpleChannel.send(new GenericMessage<>("test-1.1"));
75+
simpleChannel.send(new GenericMessage<>("test-1.2"));
76+
simpleChannel.send(new GenericMessage<>("test-1.3"));
77+
channelWithInterceptor.send(new GenericMessage<>("test-2.1"));
78+
channelWithInterceptor.send(new GenericMessage<>("test-2.2"));
79+
ExecutorService otherThreadExecutor = Executors.newSingleThreadExecutor();
80+
final List<Object> otherThreadResults = new ArrayList<>();
8381
final CountDownLatch latch = new CountDownLatch(2);
8482
otherThreadExecutor.execute(() -> {
8583
otherThreadResults.add(simpleChannel.receive(0));
@@ -100,12 +98,14 @@ public void testReceiveInAnotherThread() throws Exception {
10098
assertThat(channelWithInterceptor.receive(0).getPayload()).isEqualTo("test-2.1");
10199
assertThat(channelWithInterceptor.receive(0).getPayload()).isEqualTo("test-2.2");
102100
assertThat(channelWithInterceptor.receive(0)).isNull();
101+
102+
otherThreadExecutor.shutdown();
103103
}
104104

105105
@Test
106106
public void testInterceptor() {
107107
int before = interceptor.getSendCount();
108-
channelWithInterceptor.send(new GenericMessage<String>("test"));
108+
channelWithInterceptor.send(new GenericMessage<>("test"));
109109
assertThat(interceptor.getSendCount()).isEqualTo(before + 1);
110110
}
111111

spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Map;
2121
import java.util.concurrent.Callable;
2222
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ExecutorService;
2324
import java.util.concurrent.Executors;
2425
import java.util.concurrent.Future;
2526
import java.util.concurrent.TimeUnit;
@@ -395,7 +396,8 @@ public Message<?> preSend(Message<?> message, MessageChannel channel) {
395396
}
396397

397398
private void startResponder(final PollableChannel requestChannel, final MessageChannel replyChannel) {
398-
Executors.newSingleThreadExecutor().execute(() -> {
399+
ExecutorService executorService = Executors.newSingleThreadExecutor();
400+
executorService.execute(() -> {
399401
Message<?> request = requestChannel.receive(60000);
400402
assertThat(request).as("Request not received").isNotNull();
401403
Message<?> reply = MessageBuilder.fromMessage(request)
@@ -405,7 +407,7 @@ private void startResponder(final PollableChannel requestChannel, final MessageC
405407
payload = CompletableFuture.completedFuture(reply);
406408
}
407409
else if (request.getPayload().equals("flowCompletable")) {
408-
payload = CompletableFuture.<String>completedFuture("SYNC_COMPLETABLE");
410+
payload = CompletableFuture.completedFuture("SYNC_COMPLETABLE");
409411
}
410412
else if (request.getPayload().equals("flowCustomCompletable")) {
411413
MyCompletableFuture myCompletableFuture1 = new MyCompletableFuture();
@@ -427,6 +429,7 @@ else if (request.getPayload().equals("flowCustomCompletableM")) {
427429
}
428430
replyChannel.send(reply);
429431
});
432+
executorService.shutdown();
430433
}
431434

432435
@SuppressWarnings("unused")

spring-integration-core/src/test/java/org/springframework/integration/core/AsyncMessagingTemplateTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.concurrent.CancellationException;
2020
import java.util.concurrent.ExecutionException;
21+
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.Executors;
2223
import java.util.concurrent.Future;
2324
import java.util.concurrent.TimeUnit;
@@ -432,8 +433,8 @@ public void cancellationException() throws Throwable {
432433
}
433434

434435
private static void sendMessageAfterDelay(MessageChannel channel, GenericMessage<String> message, int delay) {
435-
Executors.newSingleThreadExecutor()
436-
.execute(() -> {
436+
ExecutorService executorService = Executors.newSingleThreadExecutor();
437+
executorService.execute(() -> {
437438
try {
438439
Thread.sleep(delay);
439440
}
@@ -443,6 +444,7 @@ private static void sendMessageAfterDelay(MessageChannel channel, GenericMessage
443444
}
444445
channel.send(message);
445446
});
447+
executorService.shutdown();
446448
}
447449

448450
private static class EchoHandler extends AbstractReplyProducingMessageHandler {

spring-integration-file/src/test/java/org/springframework/integration/file/filters/PersistentAcceptOnceFileListFilterExternalStoreTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2014-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import java.util.Map;
2222
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.ExecutorService;
2324
import java.util.concurrent.Executors;
2425
import java.util.concurrent.Future;
2526
import java.util.concurrent.TimeUnit;
@@ -48,6 +49,7 @@
4849
* @author Artem Bilan
4950
* @author Bojan Vukasovic
5051
* @author Artem Vozhdayenko
52+
*
5153
* @since 4.0
5254
*/
5355
public class PersistentAcceptOnceFileListFilterExternalStoreTests implements RedisContainerTest {
@@ -139,7 +141,8 @@ private void testFileSystem(ConcurrentMetadataStore store) throws Exception {
139141
suspend.set(true);
140142
assertThat(file.setLastModified(file.lastModified() + 5000L)).isTrue();
141143

142-
Future<Integer> result = Executors.newSingleThreadExecutor()
144+
ExecutorService executorService = Executors.newSingleThreadExecutor();
145+
Future<Integer> result = executorService
143146
.submit(() -> filter.filterFiles(new File[] {file}).size());
144147
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
145148
store.put("foo:" + file.getAbsolutePath(), "43");
@@ -149,6 +152,7 @@ private void testFileSystem(ConcurrentMetadataStore store) throws Exception {
149152

150153
assertThat(file.delete()).isTrue();
151154
filter.close();
155+
executorService.shutdown();
152156
}
153157

154158
}

spring-integration-file/src/test/java/org/springframework/integration/file/filters/PersistentAcceptOnceFileListFilterTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Arrays;
2323
import java.util.List;
2424
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.Executors;
2627
import java.util.concurrent.Future;
2728
import java.util.concurrent.TimeUnit;
@@ -82,8 +83,8 @@ public boolean replace(String key, String oldValue, String newValue) {
8283
suspend.set(true);
8384
file.setLastModified(file.lastModified() + 5000L);
8485

85-
Future<Integer> result = Executors.newSingleThreadExecutor()
86-
.submit(() -> filter.filterFiles(new File[] {file}).size());
86+
ExecutorService executorService = Executors.newSingleThreadExecutor();
87+
Future<Integer> result = executorService.submit(() -> filter.filterFiles(new File[] {file}).size());
8788
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
8889
store.put("foo:" + file.getAbsolutePath(), "43");
8990
latch1.countDown();
@@ -92,6 +93,7 @@ public boolean replace(String key, String oldValue, String newValue) {
9293

9394
file.delete();
9495
filter.close();
96+
executorService.shutdown();
9597
}
9698

9799
@Override

spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/lock/HazelcastLockRegistryTests.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.hazelcast.lock;
1818

1919
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.ExecutorService;
2021
import java.util.concurrent.Executors;
2122
import java.util.concurrent.Future;
2223
import java.util.concurrent.TimeUnit;
@@ -28,8 +29,8 @@
2829
import com.hazelcast.core.HazelcastInstance;
2930
import com.hazelcast.cp.lock.FencedLock;
3031
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
31-
import org.junit.AfterClass;
32-
import org.junit.Test;
32+
import org.junit.jupiter.api.AfterAll;
33+
import org.junit.jupiter.api.Test;
3334

3435
import static org.assertj.core.api.Assertions.assertThat;
3536

@@ -46,7 +47,7 @@ public class HazelcastLockRegistryTests {
4647

4748
private static final HazelcastInstance instance = Hazelcast.newHazelcastInstance(CONFIG);
4849

49-
@AfterClass
50+
@AfterAll
5051
public static void destroy() {
5152
HazelcastInstanceFactory.terminateAll();
5253
}
@@ -144,7 +145,8 @@ public void testTwoThreadsSecondFailsToGetLock() throws Exception {
144145
lock1.lockInterruptibly();
145146
AtomicBoolean locked = new AtomicBoolean();
146147
CountDownLatch latch = new CountDownLatch(1);
147-
Future<Object> result = Executors.newSingleThreadExecutor().submit(() -> {
148+
ExecutorService executorService = Executors.newSingleThreadExecutor();
149+
Future<Object> result = executorService.submit(() -> {
148150
Lock lock2 = registry.obtain("foo");
149151
locked.set(lock2.tryLock(200, TimeUnit.MILLISECONDS));
150152
latch.countDown();
@@ -162,6 +164,7 @@ public void testTwoThreadsSecondFailsToGetLock() throws Exception {
162164
Object ise = result.get(10, TimeUnit.SECONDS);
163165
assertThat(ise).isInstanceOf(IllegalMonitorStateException.class);
164166
assertThat(((Exception) ise).getMessage()).contains("Current thread is not owner of the lock!");
167+
executorService.shutdown();
165168
}
166169

167170
@Test
@@ -173,7 +176,8 @@ public void testTwoThreads() throws Exception {
173176
CountDownLatch latch2 = new CountDownLatch(1);
174177
CountDownLatch latch3 = new CountDownLatch(1);
175178
lock1.lockInterruptibly();
176-
Executors.newSingleThreadExecutor().execute(() -> {
179+
ExecutorService executorService = Executors.newSingleThreadExecutor();
180+
executorService.execute(() -> {
177181
Lock lock2 = registry.obtain("foo");
178182
try {
179183
latch1.countDown();
@@ -195,6 +199,7 @@ public void testTwoThreads() throws Exception {
195199
latch2.countDown();
196200
assertThat(latch3.await(10, TimeUnit.SECONDS)).isTrue();
197201
assertThat(locked.get()).isTrue();
202+
executorService.shutdown();
198203
}
199204

200205
@Test
@@ -207,7 +212,8 @@ public void testTwoThreadsDifferentRegistries() throws Exception {
207212
CountDownLatch latch2 = new CountDownLatch(1);
208213
CountDownLatch latch3 = new CountDownLatch(1);
209214
lock1.lockInterruptibly();
210-
Executors.newSingleThreadExecutor().execute(() -> {
215+
ExecutorService executorService = Executors.newSingleThreadExecutor();
216+
executorService.execute(() -> {
211217
Lock lock2 = registry2.obtain("foo");
212218
try {
213219
latch1.countDown();
@@ -229,6 +235,7 @@ public void testTwoThreadsDifferentRegistries() throws Exception {
229235
latch2.countDown();
230236
assertThat(latch3.await(10, TimeUnit.SECONDS)).isTrue();
231237
assertThat(locked.get()).isTrue();
238+
executorService.shutdown();
232239
}
233240

234241
@Test
@@ -238,7 +245,8 @@ public void testTwoThreadsWrongOneUnlocks() throws Exception {
238245
lock.lockInterruptibly();
239246
final AtomicBoolean locked = new AtomicBoolean();
240247
final CountDownLatch latch = new CountDownLatch(1);
241-
Future<Object> result = Executors.newSingleThreadExecutor().submit(() -> {
248+
ExecutorService executorService = Executors.newSingleThreadExecutor();
249+
Future<Object> result = executorService.submit(() -> {
242250
try {
243251
lock.unlock();
244252
}
@@ -254,6 +262,7 @@ public void testTwoThreadsWrongOneUnlocks() throws Exception {
254262
Object imse = result.get(10, TimeUnit.SECONDS);
255263
assertThat(imse).isInstanceOf(IllegalMonitorStateException.class);
256264
assertThat(((Exception) imse).getMessage()).contains("Current thread is not owner of the lock!");
265+
executorService.shutdown();
257266
}
258267

259268
@Test

0 commit comments

Comments
 (0)