Skip to content

Commit c78c378

Browse files
Ngone51venkata91
authored andcommitted
[SPARK-36206][CORE] Support shuffle data corruption diagnosis via shuffle checksum
### What changes were proposed in this pull request? This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this: The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown. After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis. Please check out apache#32385 to see the completed proposal of the shuffle checksum project. ### Why are the changes needed? Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users. ### Does this PR introduce _any_ user-facing change? Yes, users may know the cause of the shuffle corruption after this change. ### How was this patch tested? Added tests. Closes apache#33451 from Ngone51/SPARK-36206. Authored-by: yi.wu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a98d919) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
1 parent ffdcb25 commit c78c378

32 files changed

+973
-190
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333
import org.apache.spark.network.client.RpcResponseCallback;
3434
import org.apache.spark.network.client.TransportClient;
3535
import org.apache.spark.network.client.TransportClientFactory;
36-
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
37-
import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors;
38-
import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors;
36+
import org.apache.spark.network.shuffle.checksum.Cause;
37+
import org.apache.spark.network.shuffle.protocol.*;
38+
import org.apache.spark.network.util.TransportConf;
3939

4040
/**
4141
* Provides an interface for reading both shuffle files and RDD blocks, either from an Executor
@@ -46,6 +46,45 @@ public abstract class BlockStoreClient implements Closeable {
4646

4747
protected volatile TransportClientFactory clientFactory;
4848
protected String appId;
49+
protected TransportConf transportConf;
50+
51+
/**
52+
* Send the diagnosis request for the corrupted shuffle block to the server.
53+
*
54+
* @param host the host of the remote node.
55+
* @param port the port of the remote node.
56+
* @param execId the executor id.
57+
* @param shuffleId the shuffleId of the corrupted shuffle block
58+
* @param mapId the mapId of the corrupted shuffle block
59+
* @param reduceId the reduceId of the corrupted shuffle block
60+
* @param checksum the shuffle checksum which calculated at client side for the corrupted
61+
* shuffle block
62+
* @return The cause of the shuffle block corruption
63+
*/
64+
public Cause diagnoseCorruption(
65+
String host,
66+
int port,
67+
String execId,
68+
int shuffleId,
69+
long mapId,
70+
int reduceId,
71+
long checksum,
72+
String algorithm) {
73+
try {
74+
TransportClient client = clientFactory.createClient(host, port);
75+
ByteBuffer response = client.sendRpcSync(
76+
new DiagnoseCorruption(appId, execId, shuffleId, mapId, reduceId, checksum, algorithm)
77+
.toByteBuffer(),
78+
transportConf.connectionTimeoutMs()
79+
);
80+
CorruptionCause cause =
81+
(CorruptionCause) BlockTransferMessage.Decoder.fromByteBuffer(response);
82+
return cause.cause;
83+
} catch (Exception e) {
84+
logger.warn("Failed to get the corruption cause.");
85+
return Cause.UNKNOWN_ISSUE;
86+
}
87+
}
4988

5089
/**
5190
* Fetch a sequence of blocks from a remote node asynchronously,

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.network.shuffle;
1919

20-
import com.google.common.base.Preconditions;
2120
import java.io.File;
2221
import java.io.IOException;
2322
import java.nio.ByteBuffer;
@@ -34,8 +33,9 @@
3433
import com.codahale.metrics.RatioGauge;
3534
import com.codahale.metrics.Timer;
3635
import com.codahale.metrics.Counter;
37-
import com.google.common.collect.Sets;
3836
import com.google.common.annotations.VisibleForTesting;
37+
import com.google.common.base.Preconditions;
38+
import com.google.common.collect.Sets;
3939
import org.slf4j.Logger;
4040
import org.slf4j.LoggerFactory;
4141

@@ -48,6 +48,7 @@
4848
import org.apache.spark.network.server.OneForOneStreamManager;
4949
import org.apache.spark.network.server.RpcHandler;
5050
import org.apache.spark.network.server.StreamManager;
51+
import org.apache.spark.network.shuffle.checksum.Cause;
5152
import org.apache.spark.network.shuffle.protocol.*;
5253
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
5354
import org.apache.spark.network.util.TransportConf;
@@ -221,6 +222,14 @@ protected void handleMessage(
221222
} finally {
222223
responseDelayContext.stop();
223224
}
225+
} else if (msgObj instanceof DiagnoseCorruption) {
226+
DiagnoseCorruption msg = (DiagnoseCorruption) msgObj;
227+
checkAuth(client, msg.appId);
228+
Cause cause = blockManager.diagnoseShuffleBlockCorruption(
229+
msg.appId, msg.execId, msg.shuffleId, msg.mapId, msg.reduceId, msg.checksum, msg.algorithm);
230+
// In any cases of the error, diagnoseShuffleBlockCorruption should return UNKNOWN_ISSUE,
231+
// so it should always reply as success.
232+
callback.onSuccess(new CorruptionCause(cause).toByteBuffer());
224233
} else {
225234
throw new UnsupportedOperationException("Unexpected message: " + msgObj);
226235
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
public class ExternalBlockStoreClient extends BlockStoreClient {
5050
private static final ErrorHandler PUSH_ERROR_HANDLER = new ErrorHandler.BlockPushErrorHandler();
5151

52-
private final TransportConf conf;
5352
private final boolean authEnabled;
5453
private final SecretKeyHolder secretKeyHolder;
5554
private final long registrationTimeoutMs;
@@ -63,7 +62,7 @@ public ExternalBlockStoreClient(
6362
SecretKeyHolder secretKeyHolder,
6463
boolean authEnabled,
6564
long registrationTimeoutMs) {
66-
this.conf = conf;
65+
this.transportConf = conf;
6766
this.secretKeyHolder = secretKeyHolder;
6867
this.authEnabled = authEnabled;
6968
this.registrationTimeoutMs = registrationTimeoutMs;
@@ -75,10 +74,11 @@ public ExternalBlockStoreClient(
7574
*/
7675
public void init(String appId) {
7776
this.appId = appId;
78-
TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true, true);
77+
TransportContext context = new TransportContext(
78+
transportConf, new NoOpRpcHandler(), true, true);
7979
List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
8080
if (authEnabled) {
81-
bootstraps.add(new AuthClientBootstrap(conf, appId, secretKeyHolder));
81+
bootstraps.add(new AuthClientBootstrap(transportConf, appId, secretKeyHolder));
8282
}
8383
clientFactory = context.createClientFactory(bootstraps);
8484
}
@@ -94,7 +94,7 @@ public void fetchBlocks(
9494
checkInit();
9595
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
9696
try {
97-
int maxRetries = conf.maxIORetries();
97+
int maxRetries = transportConf.maxIORetries();
9898
RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
9999
(inputBlockId, inputListener) -> {
100100
// Unless this client is closed.
@@ -103,7 +103,7 @@ public void fetchBlocks(
103103
"Expecting a BlockFetchingListener, but got " + inputListener.getClass();
104104
TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
105105
new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
106-
(BlockFetchingListener) inputListener, conf, downloadFileManager).start();
106+
(BlockFetchingListener) inputListener, transportConf, downloadFileManager).start();
107107
} else {
108108
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
109109
}
@@ -112,7 +112,7 @@ public void fetchBlocks(
112112
if (maxRetries > 0) {
113113
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
114114
// a bug in this code. We should remove the if statement once we're sure of the stability.
115-
new RetryingBlockTransferor(conf, blockFetchStarter, blockIds, listener).start();
115+
new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start();
116116
} else {
117117
blockFetchStarter.createAndStart(blockIds, listener);
118118
}
@@ -146,16 +146,16 @@ public void pushBlocks(
146146
assert inputListener instanceof BlockPushingListener :
147147
"Expecting a BlockPushingListener, but got " + inputListener.getClass();
148148
TransportClient client = clientFactory.createClient(host, port);
149-
new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId,
149+
new OneForOneBlockPusher(client, appId, transportConf.appAttemptId(), inputBlockId,
150150
(BlockPushingListener) inputListener, buffersWithId).start();
151151
} else {
152152
logger.info("This clientFactory was closed. Skipping further block push retries.");
153153
}
154154
};
155-
int maxRetries = conf.maxIORetries();
155+
int maxRetries = transportConf.maxIORetries();
156156
if (maxRetries > 0) {
157157
new RetryingBlockTransferor(
158-
conf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start();
158+
transportConf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start();
159159
} else {
160160
blockPushStarter.createAndStart(blockIds, listener);
161161
}
@@ -178,7 +178,7 @@ public void finalizeShuffleMerge(
178178
try {
179179
TransportClient client = clientFactory.createClient(host, port);
180180
ByteBuffer finalizeShuffleMerge =
181-
new FinalizeShuffleMerge(appId, conf.appAttemptId(), shuffleId,
181+
new FinalizeShuffleMerge(appId, transportConf.appAttemptId(), shuffleId,
182182
shuffleMergeId).toByteBuffer();
183183
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
184184
@Override

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545

4646
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
4747
import org.apache.spark.network.buffer.ManagedBuffer;
48+
import org.apache.spark.network.shuffle.checksum.Cause;
49+
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
4850
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
4951
import org.apache.spark.network.util.LevelDBProvider;
5052
import org.apache.spark.network.util.LevelDBProvider.StoreVersion;
@@ -374,6 +376,29 @@ public Map<String, String[]> getLocalDirs(String appId, Set<String> execIds) {
374376
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
375377
}
376378

379+
/**
380+
* Diagnose the possible cause of the shuffle data corruption by verifying the shuffle checksums
381+
*/
382+
public Cause diagnoseShuffleBlockCorruption(
383+
String appId,
384+
String execId,
385+
int shuffleId,
386+
long mapId,
387+
int reduceId,
388+
long checksumByReader,
389+
String algorithm) {
390+
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
391+
// This should be in sync with IndexShuffleBlockResolver.getChecksumFile
392+
String fileName = "shuffle_" + shuffleId + "_" + mapId + "_0.checksum." + algorithm;
393+
File checksumFile = ExecutorDiskUtils.getFile(
394+
executor.localDirs,
395+
executor.subDirsPerLocalDir,
396+
fileName);
397+
ManagedBuffer data = getBlockData(appId, execId, shuffleId, mapId, reduceId);
398+
return ShuffleChecksumHelper.diagnoseCorruption(
399+
algorithm, checksumFile, reduceId, data, checksumByReader);
400+
}
401+
377402
/** Simply encodes an executor's full ID, which is appId + execId. */
378403
public static class AppExecId {
379404
public final String appId;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle.checksum;
19+
20+
/**
21+
* The cause of shuffle data corruption.
22+
*/
23+
public enum Cause {
24+
DISK_ISSUE, NETWORK_ISSUE, UNKNOWN_ISSUE, CHECKSUM_VERIFY_PASS, UNSUPPORTED_CHECKSUM_ALGORITHM
25+
}

0 commit comments

Comments
 (0)