Skip to content

Commit 13c5634

Browse files
schintapsrowen
authored andcommitted
[SPARK-25692][CORE] Remove static initialization of worker eventLoop handling chunk fetch requests within TransportContext. This fixes ChunkFetchIntegrationSuite as well
## What changes were proposed in this pull request? How to reproduce ./build/mvn test -Dtest=org.apache.spark.network.RequestTimeoutIntegrationSuite,org.apache.spark.network.ChunkFetchIntegrationSuite -DwildcardSuites=None test furtherRequestsDelay Test within RequestTimeoutIntegrationSuite was holding onto buffer references within worker threads. The test does close the server context but since the threads are global and there is sleep of 60 secs to fetch a specific chunk within this test, it grabs on it and waits for the client to consume but however the test is testing for a request timeout and it times out after 10 secs, so the workers are just waiting there for the buffer to be consumed by client as per my understanding. This tends to happen if you dont have enough IO threads available on the specific system and also the order of the tests being run determines its flakyness like if ChunkFetchIntegrationSuite runs first then there is no issue. For example on mac with 8 threads these tests run fine but on my vm with 4 threads it fails. It matches the number of fetch calls in RequestTimeoutIntegrationSuite. So do we really need it to be static? I dont think this requires a global declaration as these threads are only required on the shuffle server end and on the client TransportContext initialization i.e the Client don't initialize these threads. The Shuffle Server initializes one TransportContext object. So, I think this is fine to be an instance variable and I see no harm. ## How was this patch tested? Integration tests, manual tests Closes #23700 from redsanket/SPARK-25692. Authored-by: schintap <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 1dd7419 commit 13c5634

File tree

1 file changed

+10
-11
lines changed

1 file changed

+10
-11
lines changed

common/network-common/src/main/java/org/apache/spark/network/TransportContext.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public class TransportContext {
8888
// Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling
8989
// max number of TransportServer worker threads that are blocked on writing response
9090
// of ChunkFetchRequest message back to the client via the underlying channel.
91-
private static EventLoopGroup chunkFetchWorkers;
91+
private final EventLoopGroup chunkFetchWorkers;
9292

9393
public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
9494
this(conf, rpcHandler, false, false);
@@ -122,16 +122,15 @@ public TransportContext(
122122
this.closeIdleConnections = closeIdleConnections;
123123
this.isClientOnly = isClientOnly;
124124

125-
synchronized(TransportContext.class) {
126-
if (chunkFetchWorkers == null &&
127-
conf.getModuleName() != null &&
128-
conf.getModuleName().equalsIgnoreCase("shuffle") &&
129-
!isClientOnly) {
130-
chunkFetchWorkers = NettyUtils.createEventLoop(
131-
IOMode.valueOf(conf.ioMode()),
132-
conf.chunkFetchHandlerThreads(),
133-
"shuffle-chunk-fetch-handler");
134-
}
125+
if (conf.getModuleName() != null &&
126+
conf.getModuleName().equalsIgnoreCase("shuffle") &&
127+
!isClientOnly) {
128+
chunkFetchWorkers = NettyUtils.createEventLoop(
129+
IOMode.valueOf(conf.ioMode()),
130+
conf.chunkFetchHandlerThreads(),
131+
"shuffle-chunk-fetch-handler");
132+
} else {
133+
chunkFetchWorkers = null;
135134
}
136135
}
137136

0 commit comments

Comments
 (0)