Skip to content

Commit c86f6cc

Browse files
committed
further rename
1 parent 69d59a1 commit c86f6cc

File tree

13 files changed

+88
-91
lines changed

13 files changed

+88
-91
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -321,30 +321,30 @@ private class ShuffleManagedBufferIterator implements Iterator<ManagedBuffer> {
321321
private final String appId;
322322
private final String execId;
323323
private final int shuffleId;
324-
private final long[] mapTaskIds;
324+
private final long[] mapIds;
325325
private final int[][] reduceIds;
326326

327327
ShuffleManagedBufferIterator(FetchShuffleBlocks msg) {
328328
appId = msg.appId;
329329
execId = msg.execId;
330330
shuffleId = msg.shuffleId;
331-
mapTaskIds = msg.mapTaskIds;
331+
mapIds = msg.mapIds;
332332
reduceIds = msg.reduceIds;
333333
}
334334

335335
@Override
336336
public boolean hasNext() {
337-
// mapTaskIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks
338-
// must have non-empty mapTaskIds and reduceIds, see the checking logic in
337+
// mapIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks
338+
// must have non-empty mapIds and reduceIds, see the checking logic in
339339
// OneForOneBlockFetcher.
340-
assert(mapTaskIds.length != 0 && mapTaskIds.length == reduceIds.length);
341-
return mapIdx < mapTaskIds.length && reduceIdx < reduceIds[mapIdx].length;
340+
assert(mapIds.length != 0 && mapIds.length == reduceIds.length);
341+
return mapIdx < mapIds.length && reduceIdx < reduceIds[mapIdx].length;
342342
}
343343

344344
@Override
345345
public ManagedBuffer next() {
346346
final ManagedBuffer block = blockManager.getBlockData(
347-
appId, execId, shuffleId, mapTaskIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
347+
appId, execId, shuffleId, mapIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
348348
if (reduceIdx < reduceIds[mapIdx].length - 1) {
349349
reduceIdx += 1;
350350
} else {

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -165,21 +165,21 @@ public void registerExecutor(
165165
}
166166

167167
/**
168-
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapTaskId, reduceId). We make assumptions
168+
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
169169
* about how the hash and sort based shuffles store their data.
170170
*/
171171
public ManagedBuffer getBlockData(
172172
String appId,
173173
String execId,
174174
int shuffleId,
175-
long mapTaskId,
175+
long mapId,
176176
int reduceId) {
177177
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
178178
if (executor == null) {
179179
throw new RuntimeException(
180180
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
181181
}
182-
return getSortBasedShuffleBlockData(executor, shuffleId, mapTaskId, reduceId);
182+
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
183183
}
184184

185185
public ManagedBuffer getRddBlockData(
@@ -291,23 +291,22 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) {
291291
}
292292

293293
/**
294-
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapTaskId_0.index" into a data
295-
* file called "shuffle_ShuffleId_MapTaskId_0.data".
296-
* This logic is from IndexShuffleBlockResolver, and the block id format is from
297-
* ShuffleDataBlockId and ShuffleIndexBlockId.
294+
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
295+
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
296+
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
298297
*/
299298
private ManagedBuffer getSortBasedShuffleBlockData(
300-
ExecutorShuffleInfo executor, int shuffleId, long mapTaskId, int reduceId) {
299+
ExecutorShuffleInfo executor, int shuffleId, long mapId, int reduceId) {
301300
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
302-
"shuffle_" + shuffleId + "_" + mapTaskId + "_0.index");
301+
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
303302

304303
try {
305304
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
306305
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
307306
return new FileSegmentManagedBuffer(
308307
conf,
309308
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
310-
"shuffle_" + shuffleId + "_" + mapTaskId + "_0.data"),
309+
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
311310
shuffleIndexRecord.getOffset(),
312311
shuffleIndexRecord.getLength());
313312
} catch (ExecutionException e) {

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ private boolean isShuffleBlocks(String[] blockIds) {
108108

109109
/**
110110
* Analyze the pass in blockIds and create FetchShuffleBlocks message.
111-
* The blockIds has been sorted by mapTaskId and reduceId. It's produced in
111+
* The blockIds has been sorted by mapId and reduceId. It's produced in
112112
* org.apache.spark.MapOutputTracker.convertMapStatuses.
113113
*/
114114
private FetchShuffleBlocks createFetchShuffleBlocksMsg(
@@ -121,21 +121,21 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
121121
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
122122
", got:" + blockId);
123123
}
124-
long mapTaskId = blockIdParts.middle;
125-
if (!mapIdToReduceIds.containsKey(mapTaskId)) {
126-
mapIdToReduceIds.put(mapTaskId, new ArrayList<>());
124+
long mapId = blockIdParts.middle;
125+
if (!mapIdToReduceIds.containsKey(mapId)) {
126+
mapIdToReduceIds.put(mapId, new ArrayList<>());
127127
}
128-
mapIdToReduceIds.get(mapTaskId).add(blockIdParts.right);
128+
mapIdToReduceIds.get(mapId).add(blockIdParts.right);
129129
}
130-
long[] mapTaskIds = Longs.toArray(mapIdToReduceIds.keySet());
131-
int[][] reduceIdArr = new int[mapTaskIds.length][];
132-
for (int i = 0; i < mapTaskIds.length; i++) {
133-
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapTaskIds[i]));
130+
long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
131+
int[][] reduceIdArr = new int[mapIds.length][];
132+
for (int i = 0; i < mapIds.length; i++) {
133+
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
134134
}
135-
return new FetchShuffleBlocks(appId, execId, shuffleId, mapTaskIds, reduceIdArr);
135+
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIdArr);
136136
}
137137

138-
/** Split the shuffleBlockId and return shuffleId, mapTaskId and reduceId. */
138+
/** Split the shuffleBlockId and return shuffleId, mapId and reduceId. */
139139
private ImmutableTriple<Integer, Long, Integer> splitBlockId(String blockId) {
140140
String[] blockIdParts = blockId.split("_");
141141
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,23 @@ public class FetchShuffleBlocks extends BlockTransferMessage {
3232
public final String appId;
3333
public final String execId;
3434
public final int shuffleId;
35-
// The length of mapTaskIds must equal to reduceIds.size(), for the i-th mapTaskId in mapTaskIds,
36-
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map task.
37-
public final long[] mapTaskIds;
35+
// The length of mapIds must equal to reduceIds.size(), for the i-th mapId in mapIds,
36+
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map id.
37+
public final long[] mapIds;
3838
public final int[][] reduceIds;
3939

4040
public FetchShuffleBlocks(
4141
String appId,
4242
String execId,
4343
int shuffleId,
44-
long[] mapTaskIds,
44+
long[] mapIds,
4545
int[][] reduceIds) {
4646
this.appId = appId;
4747
this.execId = execId;
4848
this.shuffleId = shuffleId;
49-
this.mapTaskIds = mapTaskIds;
49+
this.mapIds = mapIds;
5050
this.reduceIds = reduceIds;
51-
assert(mapTaskIds.length == reduceIds.length);
51+
assert(mapIds.length == reduceIds.length);
5252
}
5353

5454
@Override
@@ -60,7 +60,7 @@ public String toString() {
6060
.add("appId", appId)
6161
.add("execId", execId)
6262
.add("shuffleId", shuffleId)
63-
.add("mapTaskIds", Arrays.toString(mapTaskIds))
63+
.add("mapIds", Arrays.toString(mapIds))
6464
.add("reduceIds", Arrays.deepToString(reduceIds))
6565
.toString();
6666
}
@@ -75,7 +75,7 @@ public boolean equals(Object o) {
7575
if (shuffleId != that.shuffleId) return false;
7676
if (!appId.equals(that.appId)) return false;
7777
if (!execId.equals(that.execId)) return false;
78-
if (!Arrays.equals(mapTaskIds, that.mapTaskIds)) return false;
78+
if (!Arrays.equals(mapIds, that.mapIds)) return false;
7979
return Arrays.deepEquals(reduceIds, that.reduceIds);
8080
}
8181

@@ -84,7 +84,7 @@ public int hashCode() {
8484
int result = appId.hashCode();
8585
result = 31 * result + execId.hashCode();
8686
result = 31 * result + shuffleId;
87-
result = 31 * result + Arrays.hashCode(mapTaskIds);
87+
result = 31 * result + Arrays.hashCode(mapIds);
8888
result = 31 * result + Arrays.deepHashCode(reduceIds);
8989
return result;
9090
}
@@ -98,7 +98,7 @@ public int encodedLength() {
9898
return Encoders.Strings.encodedLength(appId)
9999
+ Encoders.Strings.encodedLength(execId)
100100
+ 4 /* encoded length of shuffleId */
101-
+ Encoders.LongArrays.encodedLength(mapTaskIds)
101+
+ Encoders.LongArrays.encodedLength(mapIds)
102102
+ 4 /* encoded length of reduceIds.size() */
103103
+ encodedLengthOfReduceIds;
104104
}
@@ -108,7 +108,7 @@ public void encode(ByteBuf buf) {
108108
Encoders.Strings.encode(buf, appId);
109109
Encoders.Strings.encode(buf, execId);
110110
buf.writeInt(shuffleId);
111-
Encoders.LongArrays.encode(buf, mapTaskIds);
111+
Encoders.LongArrays.encode(buf, mapIds);
112112
buf.writeInt(reduceIds.length);
113113
for (int[] ids: reduceIds) {
114114
Encoders.IntArrays.encode(buf, ids);
@@ -119,12 +119,12 @@ public static FetchShuffleBlocks decode(ByteBuf buf) {
119119
String appId = Encoders.Strings.decode(buf);
120120
String execId = Encoders.Strings.decode(buf);
121121
int shuffleId = buf.readInt();
122-
long[] mapTaskIds = Encoders.LongArrays.decode(buf);
122+
long[] mapIds = Encoders.LongArrays.decode(buf);
123123
int reduceIdsSize = buf.readInt();
124124
int[][] reduceIds = new int[reduceIdsSize][];
125125
for (int i = 0; i < reduceIdsSize; i++) {
126126
reduceIds[i] = Encoders.IntArrays.decode(buf);
127127
}
128-
return new FetchShuffleBlocks(appId, execId, shuffleId, mapTaskIds, reduceIds);
128+
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIds);
129129
}
130130
}

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
8585
private final Partitioner partitioner;
8686
private final ShuffleWriteMetricsReporter writeMetrics;
8787
private final int shuffleId;
88-
private final long mapTaskId;
88+
private final long mapId;
8989
private final Serializer serializer;
9090
private final ShuffleExecutorComponents shuffleExecutorComponents;
9191

@@ -105,7 +105,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
105105
BypassMergeSortShuffleWriter(
106106
BlockManager blockManager,
107107
BypassMergeSortShuffleHandle<K, V> handle,
108-
long mapTaskId,
108+
long mapId,
109109
SparkConf conf,
110110
ShuffleWriteMetricsReporter writeMetrics,
111111
ShuffleExecutorComponents shuffleExecutorComponents) {
@@ -114,7 +114,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
114114
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
115115
this.blockManager = blockManager;
116116
final ShuffleDependency<K, V, V> dep = handle.dependency();
117-
this.mapTaskId = mapTaskId;
117+
this.mapId = mapId;
118118
this.shuffleId = dep.shuffleId();
119119
this.partitioner = dep.partitioner();
120120
this.numPartitions = partitioner.numPartitions();
@@ -127,12 +127,12 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
127127
public void write(Iterator<Product2<K, V>> records) throws IOException {
128128
assert (partitionWriters == null);
129129
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
130-
.createMapOutputWriter(shuffleId, mapTaskId, numPartitions);
130+
.createMapOutputWriter(shuffleId, mapId, numPartitions);
131131
try {
132132
if (!records.hasNext()) {
133133
partitionLengths = mapOutputWriter.commitAllPartitions();
134134
mapStatus = MapStatus$.MODULE$.apply(
135-
blockManager.shuffleServerId(), partitionLengths, mapTaskId);
135+
blockManager.shuffleServerId(), partitionLengths, mapId);
136136
return;
137137
}
138138
final SerializerInstance serInstance = serializer.newInstance();
@@ -166,7 +166,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
166166

167167
partitionLengths = writePartitionedData(mapOutputWriter);
168168
mapStatus = MapStatus$.MODULE$.apply(
169-
blockManager.shuffleServerId(), partitionLengths, mapTaskId);
169+
blockManager.shuffleServerId(), partitionLengths, mapId);
170170
} catch (Exception e) {
171171
try {
172172
mapOutputWriter.abort(e);

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
7878
private final ShuffleWriteMetricsReporter writeMetrics;
7979
private final ShuffleExecutorComponents shuffleExecutorComponents;
8080
private final int shuffleId;
81-
private final long mapTaskId;
81+
private final long mapId;
8282
private final TaskContext taskContext;
8383
private final SparkConf sparkConf;
8484
private final boolean transferToEnabled;
@@ -122,7 +122,7 @@ public UnsafeShuffleWriter(
122122
}
123123
this.blockManager = blockManager;
124124
this.memoryManager = memoryManager;
125-
this.mapTaskId = taskContext.taskAttemptId();
125+
this.mapId = taskContext.taskAttemptId();
126126
final ShuffleDependency<K, V, V> dep = handle.dependency();
127127
this.shuffleId = dep.shuffleId();
128128
this.serializer = dep.serializer().newInstance();
@@ -228,7 +228,7 @@ void closeAndWriteOutput() throws IOException {
228228
}
229229
}
230230
mapStatus = MapStatus$.MODULE$.apply(
231-
blockManager.shuffleServerId(), partitionLengths, mapTaskId);
231+
blockManager.shuffleServerId(), partitionLengths, mapId);
232232
}
233233

234234
@VisibleForTesting
@@ -264,11 +264,11 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
264264
long[] partitionLengths;
265265
if (spills.length == 0) {
266266
final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
267-
.createMapOutputWriter(shuffleId, mapTaskId, partitioner.numPartitions());
267+
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
268268
return mapWriter.commitAllPartitions();
269269
} else if (spills.length == 1) {
270270
Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter =
271-
shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapTaskId);
271+
shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId);
272272
if (maybeSingleFileWriter.isPresent()) {
273273
// Here, we don't need to perform any metrics updates because the bytes written to this
274274
// output file would have already been counted as shuffle bytes written.
@@ -293,7 +293,7 @@ private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOExcep
293293
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
294294
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
295295
final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
296-
.createMapOutputWriter(shuffleId, mapTaskId, partitioner.numPartitions());
296+
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
297297
try {
298298
// There are multiple spills to merge, so none of these spill files' lengths were counted
299299
// towards our shuffle write count or shuffle write time. If we use the slow merge path,

core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,11 @@ class NettyBlockRpcServer(
6464
responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)
6565

6666
case fetchShuffleBlocks: FetchShuffleBlocks =>
67-
val blocks = fetchShuffleBlocks.mapTaskIds.zipWithIndex.flatMap {
68-
case (mapTaskId, index) =>
69-
fetchShuffleBlocks.reduceIds.apply(index).map { reduceId =>
70-
blockManager.getBlockData(
71-
ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapTaskId, reduceId))
72-
}
67+
val blocks = fetchShuffleBlocks.mapIds.zipWithIndex.flatMap { case (mapId, index) =>
68+
fetchShuffleBlocks.reduceIds.apply(index).map { reduceId =>
69+
blockManager.getBlockData(
70+
ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId))
71+
}
7372
}
7473
val numBlockIds = fetchShuffleBlocks.reduceIds.map(_.length).sum
7574
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava,

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,27 +51,26 @@ private[spark] class IndexShuffleBlockResolver(
5151

5252
private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
5353

54-
def getDataFile(shuffleId: Int, mapTaskId: Long): File = {
55-
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapTaskId, NOOP_REDUCE_ID))
54+
def getDataFile(shuffleId: Int, mapId: Long): File = {
55+
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
5656
}
5757

58-
private def getIndexFile(shuffleId: Int, mapTaskId: Long): File = {
59-
blockManager.diskBlockManager.getFile(
60-
ShuffleIndexBlockId(shuffleId, mapTaskId, NOOP_REDUCE_ID))
58+
private def getIndexFile(shuffleId: Int, mapId: Long): File = {
59+
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
6160
}
6261

6362
/**
6463
* Remove data file and index file that contain the output data from one map.
6564
*/
66-
def removeDataByMap(shuffleId: Int, mapTaskId: Long): Unit = {
67-
var file = getDataFile(shuffleId, mapTaskId)
65+
def removeDataByMap(shuffleId: Int, mapId: Long): Unit = {
66+
var file = getDataFile(shuffleId, mapId)
6867
if (file.exists()) {
6968
if (!file.delete()) {
7069
logWarning(s"Error deleting data ${file.getPath()}")
7170
}
7271
}
7372

74-
file = getIndexFile(shuffleId, mapTaskId)
73+
file = getIndexFile(shuffleId, mapId)
7574
if (file.exists()) {
7675
if (!file.delete()) {
7776
logWarning(s"Error deleting index ${file.getPath()}")
@@ -136,13 +135,13 @@ private[spark] class IndexShuffleBlockResolver(
136135
*/
137136
def writeIndexFileAndCommit(
138137
shuffleId: Int,
139-
mapTaskId: Long,
138+
mapId: Long,
140139
lengths: Array[Long],
141140
dataTmp: File): Unit = {
142-
val indexFile = getIndexFile(shuffleId, mapTaskId)
141+
val indexFile = getIndexFile(shuffleId, mapId)
143142
val indexTmp = Utils.tempFileWith(indexFile)
144143
try {
145-
val dataFile = getDataFile(shuffleId, mapTaskId)
144+
val dataFile = getDataFile(shuffleId, mapId)
146145
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
147146
// the following check and rename are atomic.
148147
synchronized {

0 commit comments

Comments
 (0)