Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,27 @@ public static int[] decode(ByteBuf buf) {
return ints;
}
}

/** Long integer arrays are encoded with their length followed by long integers. */
public static class LongArrays {
public static int encodedLength(long[] longs) {
return 4 + 8 * longs.length;
}

public static void encode(ByteBuf buf, long[] longs) {
buf.writeInt(longs.length);
for (long i : longs) {
buf.writeLong(i);
}
}

public static long[] decode(ByteBuf buf) {
int numLongs = buf.readInt();
long[] longs = new long[numLongs];
for (int i = 0; i < longs.length; i ++) {
longs[i] = buf.readLong();
}
return longs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected void handleMessage(
numBlockIds += ids.length;
}
streamId = streamManager.registerStream(client.getClientId(),
new ManagedBufferIterator(msg, numBlockIds), client.getChannel());
new ShuffleManagedBufferIterator(msg), client.getChannel());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also remove

numBlockIds = 0;
          for (int[] ids: msg.reduceIds) {
            numBlockIds += ids.length;
          }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The numBlockIds used in callback:

callback.onSuccess(new StreamHandle(streamId, numBlockIds).toByteBuffer());

} else {
// For the compatibility with the old version, still keep the support for OpenBlocks.
OpenBlocks msg = (OpenBlocks) msgObj;
Expand Down Expand Up @@ -299,21 +299,6 @@ private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
return mapIdAndReduceIds;
}

ManagedBufferIterator(FetchShuffleBlocks msg, int numBlockIds) {
final int[] mapIdAndReduceIds = new int[2 * numBlockIds];
int idx = 0;
for (int i = 0; i < msg.mapIds.length; i++) {
for (int reduceId : msg.reduceIds[i]) {
mapIdAndReduceIds[idx++] = msg.mapIds[i];
mapIdAndReduceIds[idx++] = reduceId;
}
}
assert(idx == 2 * numBlockIds);
size = mapIdAndReduceIds.length;
blockDataForIndexFn = index -> blockManager.getBlockData(msg.appId, msg.execId,
msg.shuffleId, mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
}

@Override
public boolean hasNext() {
return index < size;
Expand All @@ -328,6 +313,49 @@ public ManagedBuffer next() {
}
}

private class ShuffleManagedBufferIterator implements Iterator<ManagedBuffer> {

private int mapIdx = 0;
private int reduceIdx = 0;

private final String appId;
private final String execId;
private final int shuffleId;
private final long[] mapIds;
private final int[][] reduceIds;

ShuffleManagedBufferIterator(FetchShuffleBlocks msg) {
appId = msg.appId;
execId = msg.execId;
shuffleId = msg.shuffleId;
mapIds = msg.mapIds;
reduceIds = msg.reduceIds;
}

@Override
public boolean hasNext() {
// mapIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add check logic here to be safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Xingbo mean a double check here? Basically there's existing checking for both the length and non-empty.

if (blockIds.length == 0) {
throw new IllegalArgumentException("Zero-sized blockIds array");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea if the place is not super performance critical I'd prefer a double check here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done the double-check in 00e78b2.

// must have non-empty mapIds and reduceIds, see the checking logic in
// OneForOneBlockFetcher.
assert(mapIds.length != 0 && mapIds.length == reduceIds.length);
return mapIdx < mapIds.length && reduceIdx < reduceIds[mapIdx].length;
}

@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(
appId, execId, shuffleId, mapIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
if (reduceIdx < reduceIds[mapIdx].length - 1) {
reduceIdx += 1;
} else {
reduceIdx = 0;
mapIdx += 1;
}
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
}
}

@Override
public void channelActive(TransportClient client) {
metrics.activeConnections.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
int mapId,
long mapId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking why change this from int to long? Is it possible that a mapId can be greater than 2^31?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous the map id is the index of the mapper, and can get conflicts when we re-run the task. Now the map id is the task id, which is unique. task id needs to be long.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after this patch, we set mapId by using the taskAttemptId of map task, which is a unique Id within the same SparkContext. You can see the comment #25620 (comment)

int reduceId) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
Expand Down Expand Up @@ -296,7 +296,7 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) {
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
ExecutorShuffleInfo executor, int shuffleId, long mapId, int reduceId) {
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.HashMap;

import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -111,21 +113,21 @@ private boolean isShuffleBlocks(String[] blockIds) {
*/
private FetchShuffleBlocks createFetchShuffleBlocksMsg(
String appId, String execId, String[] blockIds) {
int shuffleId = splitBlockId(blockIds[0])[0];
HashMap<Integer, ArrayList<Integer>> mapIdToReduceIds = new HashMap<>();
int shuffleId = splitBlockId(blockIds[0]).left;
HashMap<Long, ArrayList<Integer>> mapIdToReduceIds = new HashMap<>();
for (String blockId : blockIds) {
int[] blockIdParts = splitBlockId(blockId);
if (blockIdParts[0] != shuffleId) {
ImmutableTriple<Integer, Long, Integer> blockIdParts = splitBlockId(blockId);
if (blockIdParts.left != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockId);
}
int mapId = blockIdParts[1];
long mapId = blockIdParts.middle;
if (!mapIdToReduceIds.containsKey(mapId)) {
mapIdToReduceIds.put(mapId, new ArrayList<>());
}
mapIdToReduceIds.get(mapId).add(blockIdParts[2]);
mapIdToReduceIds.get(mapId).add(blockIdParts.right);
}
int[] mapIds = Ints.toArray(mapIdToReduceIds.keySet());
long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
int[][] reduceIdArr = new int[mapIds.length][];
for (int i = 0; i < mapIds.length; i++) {
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
Expand All @@ -134,17 +136,16 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
}

/** Split the shuffleBlockId and return shuffleId, mapId and reduceId. */
private int[] splitBlockId(String blockId) {
private ImmutableTriple<Integer, Long, Integer> splitBlockId(String blockId) {
String[] blockIdParts = blockId.split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
throw new IllegalArgumentException(
"Unexpected shuffle block id format: " + blockId);
}
return new int[] {
Integer.parseInt(blockIdParts[1]),
Integer.parseInt(blockIdParts[2]),
Integer.parseInt(blockIdParts[3])
};
return new ImmutableTriple<>(
Integer.parseInt(blockIdParts[1]),
Long.parseLong(blockIdParts[2]),
Integer.parseInt(blockIdParts[3]));
}

/** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ public class FetchShuffleBlocks extends BlockTransferMessage {
public final int shuffleId;
// The length of mapIds must equal to reduceIds.size(), for the i-th mapId in mapIds,
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map id.
public final int[] mapIds;
public final long[] mapIds;
public final int[][] reduceIds;

public FetchShuffleBlocks(
String appId,
String execId,
int shuffleId,
int[] mapIds,
long[] mapIds,
int[][] reduceIds) {
this.appId = appId;
this.execId = execId;
Expand Down Expand Up @@ -98,7 +98,7 @@ public int encodedLength() {
return Encoders.Strings.encodedLength(appId)
+ Encoders.Strings.encodedLength(execId)
+ 4 /* encoded length of shuffleId */
+ Encoders.IntArrays.encodedLength(mapIds)
+ Encoders.LongArrays.encodedLength(mapIds)
+ 4 /* encoded length of reduceIds.size() */
+ encodedLengthOfReduceIds;
}
Expand All @@ -108,7 +108,7 @@ public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
Encoders.Strings.encode(buf, execId);
buf.writeInt(shuffleId);
Encoders.IntArrays.encode(buf, mapIds);
Encoders.LongArrays.encode(buf, mapIds);
buf.writeInt(reduceIds.length);
for (int[] ids: reduceIds) {
Encoders.IntArrays.encode(buf, ids);
Expand All @@ -119,7 +119,7 @@ public static FetchShuffleBlocks decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
String execId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
int[] mapIds = Encoders.IntArrays.decode(buf);
long[] mapIds = Encoders.LongArrays.decode(buf);
int reduceIdsSize = buf.readInt();
int[][] reduceIds = new int[reduceIdsSize][];
for (int i = 0; i < reduceIdsSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class BlockTransferMessagesSuite {
public void serializeOpenShuffleBlocks() {
checkSerializeDeserialize(new OpenBlocks("app-1", "exec-2", new String[] { "b1", "b2" }));
checkSerializeDeserialize(new FetchShuffleBlocks(
"app-1", "exec-2", 0, new int[] {0, 1},
"app-1", "exec-2", 0, new long[] {0, 1},
new int[][] {{ 0, 1 }, { 0, 1, 2 }}));
checkSerializeDeserialize(new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleInfo(
new String[] { "/local1", "/local2" }, 32, "MyShuffleManager")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testFetchShuffleBlocks() {
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(blockMarkers[1]);

FetchShuffleBlocks fetchShuffleBlocks = new FetchShuffleBlocks(
"app0", "exec1", 0, new int[] { 0 }, new int[][] {{ 0, 1 }});
"app0", "exec1", 0, new long[] { 0 }, new int[][] {{ 0, 1 }});
checkOpenBlocksReceive(fetchShuffleBlocks, blockMarkers);

verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testFetchOne() {
BlockFetchingListener listener = fetchBlocks(
blocks,
blockIds,
new FetchShuffleBlocks("app-id", "exec-id", 0, new int[] { 0 }, new int[][] {{ 0 }}),
new FetchShuffleBlocks("app-id", "exec-id", 0, new long[] { 0 }, new int[][] {{ 0 }}),
conf);

verify(listener).onBlockFetchSuccess("shuffle_0_0_0", blocks.get("shuffle_0_0_0"));
Expand Down Expand Up @@ -100,7 +100,7 @@ public void testFetchThreeShuffleBlocks() {
BlockFetchingListener listener = fetchBlocks(
blocks,
blockIds,
new FetchShuffleBlocks("app-id", "exec-id", 0, new int[] { 0 }, new int[][] {{ 0, 1, 2 }}),
new FetchShuffleBlocks("app-id", "exec-id", 0, new long[] { 0 }, new int[][] {{ 0, 1, 2 }}),
conf);

for (int i = 0; i < 3; i ++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,13 @@ public interface ShuffleExecutorComponents {
* partitioned bytes written by that map task.
*
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param mapId Within the shuffle, the identifier of the map task
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
* with the same (shuffleId, mapId) pair can be distinguished by the
* different values of mapTaskAttemptId.
* @param mapId An ID of the map task. The ID is unique within this Spark application.
* @param numPartitions The number of partitions that will be written by the map task. Some of
* these partitions may be empty.
*/
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
long mapTaskAttemptId,
long mapId,
int numPartitions) throws IOException;

/**
Expand All @@ -64,15 +60,11 @@ ShuffleMapOutputWriter createMapOutputWriter(
* preserving an optimization in the local disk shuffle storage implementation.
*
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param mapId Within the shuffle, the identifier of the map task
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
* with the same (shuffleId, mapId) pair can be distinguished by the
* different values of mapTaskAttemptId.
* @param mapId An ID of the map task. The ID is unique within this Spark application.
*/
default Optional<SingleSpillShuffleMapOutputWriter> createSingleFileMapOutputWriter(
int shuffleId,
int mapId,
long mapTaskAttemptId) throws IOException {
long mapId) throws IOException {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface ShuffleMapOutputWriter {
* for the same partition within any given map task. The partition identifier will be in the
* range of precisely 0 (inclusive) to numPartitions (exclusive), where numPartitions was
* provided upon the creation of this map output writer via
* {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, long, int)}.
* {@link ShuffleExecutorComponents#createMapOutputWriter(int, long, int)}.
* <p>
* Calls to this method will be invoked with monotonically increasing reducePartitionIds; each
* call to this method will be called with a reducePartitionId that is strictly greater than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final Partitioner partitioner;
private final ShuffleWriteMetricsReporter writeMetrics;
private final int shuffleId;
private final int mapId;
private final long mapTaskAttemptId;
private final long mapId;
private final Serializer serializer;
private final ShuffleExecutorComponents shuffleExecutorComponents;

Expand All @@ -106,8 +105,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
BypassMergeSortShuffleWriter(
BlockManager blockManager,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
long mapTaskAttemptId,
long mapId,
SparkConf conf,
ShuffleWriteMetricsReporter writeMetrics,
ShuffleExecutorComponents shuffleExecutorComponents) {
Expand All @@ -117,7 +115,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
this.mapTaskAttemptId = mapTaskAttemptId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
Expand All @@ -130,11 +127,12 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions);
.createMapOutputWriter(shuffleId, mapId, numPartitions);
try {
if (!records.hasNext()) {
partitionLengths = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -167,7 +165,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}

partitionLengths = writePartitionedData(mapOutputWriter);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand Down
Loading