Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
7b40e2e
init
Ngone51 Jul 20, 2021
3cdd858
update
Ngone51 Jul 21, 2021
4c01069
move comment
Ngone51 Jul 21, 2021
7bd3788
fix ExternalBlockStoreClient
Ngone51 Jul 21, 2021
8eb667f
mark Cause as private
Ngone51 Jul 26, 2021
8015e86
remove unused Cause import
Ngone51 Jul 26, 2021
b25914d
fix indents of BlockStoreClient.diagnoseCorruption
Ngone51 Jul 26, 2021
03b26fe
verify -> verifying
Ngone51 Jul 26, 2021
10d60f8
fix comment
Ngone51 Jul 26, 2021
b0b17ab
include checksumfile path
Ngone51 Jul 26, 2021
c3dcef0
remove throws
Ngone51 Jul 26, 2021
a8f15bd
use ByteStreams.skipFully
Ngone51 Jul 26, 2021
815fdf6
check cheaper fields first
Ngone51 Jul 26, 2021
f193368
use this.transportConf
Ngone51 Jul 26, 2021
dfb594d
add todo for pushbased shuffle
Ngone51 Jul 26, 2021
30ed389
resolve magic number
Ngone51 Jul 26, 2021
c70fa09
use fileName strip the suffix
Ngone51 Jul 26, 2021
94576d1
use Files.newDirectoryStream
Ngone51 Jul 26, 2021
e606ca6
remove checksum file existence check
Ngone51 Jul 26, 2021
acf20cf
combine ShuffleChecksumHelper&ShuffleCorruptionDiagnosisHelper
Ngone51 Jul 26, 2021
895aad5
fix tests
Ngone51 Jul 26, 2021
91610b0
diagnose corruption when the block corrupted twice
Ngone51 Jul 26, 2021
6e5d2c0
send checksum algorithm together
Ngone51 Jul 26, 2021
993ea3d
fix tests
Ngone51 Jul 26, 2021
e2e5fa0
fix rat of ShuffleChecksumSupport
Ngone51 Jul 27, 2021
c407962
add since for Cause
Ngone51 Jul 27, 2021
1b612a2
add comment for fileName
Ngone51 Jul 27, 2021
3a41a92
fix ExternalBlockStoreClient
Ngone51 Jul 27, 2021
cab36a2
use the explicit checksum algorithm
Ngone51 Jul 27, 2021
1292390
update comment of diagnoseCorruption
Ngone51 Jul 27, 2021
8610333
improve error msg for different causes
Ngone51 Jul 27, 2021
47764e3
inline diagnosisResponse into diagnoseCorruption
Ngone51 Jul 27, 2021
c041f01
update warning msg
Ngone51 Jul 27, 2021
4cd8350
fix ShuffleBlockFetcherIteratorSuite
Ngone51 Jul 27, 2021
2da1b91
fix java lint
Ngone51 Jul 27, 2021
95ef9db
fix tests
Ngone51 Jul 27, 2021
925491c
swallow exception from diagnoseCorruption
Ngone51 Jul 28, 2021
73b7b70
ensure test stability
Ngone51 Jul 28, 2021
be116fb
update version to 3.2.0
Ngone51 Jul 28, 2021
64071ee
fix import style
Ngone51 Jul 28, 2021
a994c0d
add tests
Ngone51 Jul 28, 2021
261f5ec
refactor bufIn
Ngone51 Jul 29, 2021
8d9db93
use nano time
Ngone51 Jul 29, 2021
6f72ab1
Use Option
Ngone51 Jul 29, 2021
e5e58d5
move Cause
Ngone51 Jul 29, 2021
7e4c91d
fix tests
Ngone51 Jul 29, 2021
67262c9
fix
Ngone51 Jul 30, 2021
ca1b058
address comment
Ngone51 Aug 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors;
import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors;
import org.apache.spark.network.shuffle.checksum.Cause;
import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.util.TransportConf;

/**
* Provides an interface for reading both shuffle files and RDD blocks, either from an Executor
Expand All @@ -46,6 +46,45 @@ public abstract class BlockStoreClient implements Closeable {

protected volatile TransportClientFactory clientFactory;
protected String appId;
protected TransportConf transportConf;

/**
* Send the diagnosis request for the corrupted shuffle block to the server.
*
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param execId the executor id.
* @param shuffleId the shuffleId of the corrupted shuffle block
* @param mapId the mapId of the corrupted shuffle block
* @param reduceId the reduceId of the corrupted shuffle block
* @param checksum the shuffle checksum which calculated at client side for the corrupted
* shuffle block
* @return The cause of the shuffle block corruption
*/
public Cause diagnoseCorruption(
String host,
int port,
String execId,
int shuffleId,
long mapId,
int reduceId,
long checksum,
String algorithm) {
try {
TransportClient client = clientFactory.createClient(host, port);
ByteBuffer response = client.sendRpcSync(
new DiagnoseCorruption(appId, execId, shuffleId, mapId, reduceId, checksum, algorithm)
.toByteBuffer(),
transportConf.connectionTimeoutMs()
);
CorruptionCause cause =
(CorruptionCause) BlockTransferMessage.Decoder.fromByteBuffer(response);
return cause.cause;
} catch (Exception e) {
logger.warn("Failed to get the corruption cause.");
return Cause.UNKNOWN_ISSUE;
}
}

/**
* Fetch a sequence of blocks from a remote node asynchronously,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.network.shuffle;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -35,8 +34,9 @@
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Counter;
import com.google.common.collect.Sets;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,6 +49,7 @@
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.checksum.Cause;
import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.util.TimerWithCustomTimeUnit;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
Expand Down Expand Up @@ -223,6 +224,14 @@ protected void handleMessage(
} finally {
responseDelayContext.stop();
}
} else if (msgObj instanceof DiagnoseCorruption) {
DiagnoseCorruption msg = (DiagnoseCorruption) msgObj;
checkAuth(client, msg.appId);
Cause cause = blockManager.diagnoseShuffleBlockCorruption(
msg.appId, msg.execId, msg.shuffleId, msg.mapId, msg.reduceId, msg.checksum, msg.algorithm);
// In any cases of the error, diagnoseShuffleBlockCorruption should return UNKNOWN_ISSUE,
// so it should always reply as success.
callback.onSuccess(new CorruptionCause(cause).toByteBuffer());
} else {
throw new UnsupportedOperationException("Unexpected message: " + msgObj);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
public class ExternalBlockStoreClient extends BlockStoreClient {
private static final ErrorHandler PUSH_ERROR_HANDLER = new ErrorHandler.BlockPushErrorHandler();

private final TransportConf conf;
private final boolean authEnabled;
private final SecretKeyHolder secretKeyHolder;
private final long registrationTimeoutMs;
Expand All @@ -63,7 +62,7 @@ public ExternalBlockStoreClient(
SecretKeyHolder secretKeyHolder,
boolean authEnabled,
long registrationTimeoutMs) {
this.conf = conf;
this.transportConf = conf;
this.secretKeyHolder = secretKeyHolder;
this.authEnabled = authEnabled;
this.registrationTimeoutMs = registrationTimeoutMs;
Expand All @@ -75,10 +74,11 @@ public ExternalBlockStoreClient(
*/
public void init(String appId) {
this.appId = appId;
TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true, true);
TransportContext context = new TransportContext(
transportConf, new NoOpRpcHandler(), true, true);
List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
if (authEnabled) {
bootstraps.add(new AuthClientBootstrap(conf, appId, secretKeyHolder));
bootstraps.add(new AuthClientBootstrap(transportConf, appId, secretKeyHolder));
}
clientFactory = context.createClientFactory(bootstraps);
}
Expand All @@ -94,7 +94,7 @@ public void fetchBlocks(
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
int maxRetries = conf.maxIORetries();
int maxRetries = transportConf.maxIORetries();
RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
(inputBlockId, inputListener) -> {
// Unless this client is closed.
Expand All @@ -103,7 +103,7 @@ public void fetchBlocks(
"Expecting a BlockFetchingListener, but got " + inputListener.getClass();
TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
(BlockFetchingListener) inputListener, conf, downloadFileManager).start();
(BlockFetchingListener) inputListener, transportConf, downloadFileManager).start();
} else {
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
}
Expand All @@ -112,7 +112,7 @@ public void fetchBlocks(
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
new RetryingBlockTransferor(conf, blockFetchStarter, blockIds, listener).start();
new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start();
} else {
blockFetchStarter.createAndStart(blockIds, listener);
}
Expand Down Expand Up @@ -146,16 +146,16 @@ public void pushBlocks(
assert inputListener instanceof BlockPushingListener :
"Expecting a BlockPushingListener, but got " + inputListener.getClass();
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId,
new OneForOneBlockPusher(client, appId, transportConf.appAttemptId(), inputBlockId,
(BlockPushingListener) inputListener, buffersWithId).start();
} else {
logger.info("This clientFactory was closed. Skipping further block push retries.");
}
};
int maxRetries = conf.maxIORetries();
int maxRetries = transportConf.maxIORetries();
if (maxRetries > 0) {
new RetryingBlockTransferor(
conf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start();
transportConf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start();
} else {
blockPushStarter.createAndStart(blockIds, listener);
}
Expand All @@ -178,7 +178,7 @@ public void finalizeShuffleMerge(
try {
TransportClient client = clientFactory.createClient(host, port);
ByteBuffer finalizeShuffleMerge =
new FinalizeShuffleMerge(appId, conf.appAttemptId(), shuffleId,
new FinalizeShuffleMerge(appId, transportConf.appAttemptId(), shuffleId,
shuffleMergeId).toByteBuffer();
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.checksum.Cause;
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.LevelDBProvider;
import org.apache.spark.network.util.LevelDBProvider.StoreVersion;
Expand Down Expand Up @@ -374,6 +376,29 @@ public Map<String, String[]> getLocalDirs(String appId, Set<String> execIds) {
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

/**
* Diagnose the possible cause of the shuffle data corruption by verifying the shuffle checksums
*/
public Cause diagnoseShuffleBlockCorruption(
String appId,
String execId,
int shuffleId,
long mapId,
int reduceId,
long checksumByReader,
String algorithm) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
// This should be in sync with IndexShuffleBlockResolver.getChecksumFile
String fileName = "shuffle_" + shuffleId + "_" + mapId + "_0.checksum." + algorithm;
File checksumFile = ExecutorDiskUtils.getFile(
executor.localDirs,
executor.subDirsPerLocalDir,
fileName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, while looking at this, an unrelated potential issue - we use intern in ExecutorDiskUtils.
Probably should move to using guava interner (Utils.weakIntern does this) ... thoughts @Ngone51 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, Utils can't be referenced in network-shuffle module.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I meant something similar ... we dont need to do this for this PR btw; just thinking out.

ManagedBuffer data = getBlockData(appId, execId, shuffleId, mapId, reduceId);
return ShuffleChecksumHelper.diagnoseCorruption(
algorithm, checksumFile, reduceId, data, checksumByReader);
}

/** Simply encodes an executor's full ID, which is appId + execId. */
public static class AppExecId {
public final String appId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle.checksum;

/**
* The cause of shuffle data corruption.
*/
public enum Cause {
DISK_ISSUE, NETWORK_ISSUE, UNKNOWN_ISSUE, CHECKSUM_VERIFY_PASS, UNSUPPORTED_CHECKSUM_ALGORITHM
}
Loading