Skip to content

Commit f0e141d

Browse files
committed
applying review comments
1 parent 612c4f3 commit f0e141d

File tree

4 files changed

+25
-28
lines changed

4 files changed

+25
-28
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ private class ManagedBufferIterator implements Iterator<ManagedBuffer> {
228228
} else if (blockId0Parts.length == 3 && blockId0Parts[0].equals("rdd")) {
229229
final int[] rddAndSplitIds = rddAndSplitIds(blockIds);
230230
size = rddAndSplitIds.length;
231-
blockDataForIndexFn = index -> blockManager.getBlockData(appId, execId,
231+
blockDataForIndexFn = index -> blockManager.getRddBlockData(appId, execId,
232232
rddAndSplitIds[index], rddAndSplitIds[index + 1]);
233233
} else {
234234
throw new IllegalArgumentException("Unexpected block id format: " + blockIds[0]);

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public ManagedBuffer getBlockData(
179179
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
180180
}
181181

182-
public ManagedBuffer getBlockData(
182+
public ManagedBuffer getRddBlockData(
183183
String appId,
184184
String execId,
185185
int rddId,
@@ -313,10 +313,9 @@ public ManagedBuffer getDiskPersistedRddBlockData(
313313
ExecutorShuffleInfo executor, int rddId, int splitIndex) {
314314
File file = getFile(executor.localDirs, executor.subDirsPerLocalDir,
315315
"rdd_" + rddId + "_" + splitIndex);
316-
long length = file.length();
317316
ManagedBuffer res = null;
318317
if (file.exists()) {
319-
res = new FileSegmentManagedBuffer(conf, file, 0, length);
318+
res = new FileSegmentManagedBuffer(conf, file, 0, file.length());
320319
}
321320
return res;
322321
}

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,13 @@ public void testOpenShuffleBlocks() {
9494

9595
@Test
9696
public void testOpenDiskPersistedRDDBlocks() {
97-
when(blockResolver.getBlockData("app0", "exec1", 0, 0)).thenReturn(blockMarkers[0]);
98-
when(blockResolver.getBlockData("app0", "exec1", 0, 1)).thenReturn(blockMarkers[1]);
97+
when(blockResolver.getRddBlockData("app0", "exec1", 0, 0)).thenReturn(blockMarkers[0]);
98+
when(blockResolver.getRddBlockData("app0", "exec1", 0, 1)).thenReturn(blockMarkers[1]);
9999

100100
checkOpenBlocksReceive(new String[] { "rdd_0_0", "rdd_0_1" }, blockMarkers);
101101

102-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0);
103-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 1);
102+
verify(blockResolver, times(1)).getRddBlockData("app0", "exec1", 0, 0);
103+
verify(blockResolver, times(1)).getRddBlockData("app0", "exec1", 0, 1);
104104
verifyOpenBlockLatencyMetrics();
105105
}
106106

@@ -110,15 +110,15 @@ public void testOpenDiskPersistedRDDBlocksWithMissingBlock() {
110110
new NioManagedBuffer(ByteBuffer.wrap(new byte[3])),
111111
null
112112
};
113-
when(blockResolver.getBlockData("app0", "exec1", 0, 0))
113+
when(blockResolver.getRddBlockData("app0", "exec1", 0, 0))
114114
.thenReturn(blockMarkersWithMissingBlock[0]);
115-
when(blockResolver.getBlockData("app0", "exec1", 0, 1))
115+
when(blockResolver.getRddBlockData("app0", "exec1", 0, 1))
116116
.thenReturn(null);
117117

118118
checkOpenBlocksReceive(new String[] { "rdd_0_0", "rdd_0_1" }, blockMarkersWithMissingBlock);
119119

120-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0);
121-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 1);
120+
verify(blockResolver, times(1)).getRddBlockData("app0", "exec1", 0, 0);
121+
verify(blockResolver, times(1)).getRddBlockData("app0", "exec1", 0, 1);
122122
}
123123

124124
private void checkOpenBlocksReceive(String[] blockIds, ManagedBuffer[] blockMarkers) {

core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class BlockManagerInfoSuite extends SparkFunSuite {
4848
assert(bmInfo.blocks.asScala ===
4949
Map(broadcastId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 100)))
5050
assert(bmInfo.exclusiveCachedBlocks.isEmpty)
51-
assert(bmInfo.remainingMem == 29800)
51+
assert(bmInfo.remainingMem === 29800)
5252
}
5353

5454
testWithShuffleServiceOnOff("RDD block with MEMORY_ONLY") { (svcEnabled, bmInfo) =>
@@ -57,7 +57,7 @@ class BlockManagerInfoSuite extends SparkFunSuite {
5757
assert(bmInfo.blocks.asScala ===
5858
Map(rddId -> BlockStatus(StorageLevel.MEMORY_ONLY, 200, 0)))
5959
assert(bmInfo.exclusiveCachedBlocks === Set(rddId))
60-
assert(bmInfo.remainingMem == 29800)
60+
assert(bmInfo.remainingMem === 29800)
6161
if (svcEnabled) {
6262
assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
6363
}
@@ -71,7 +71,7 @@ class BlockManagerInfoSuite extends SparkFunSuite {
7171
Map(rddId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 400)))
7272
val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else Set(rddId)
7373
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForOneMemoryOnly)
74-
assert(bmInfo.remainingMem == 29800)
74+
assert(bmInfo.remainingMem === 29800)
7575
if (svcEnabled) {
7676
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
7777
Map(rddId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 400)))
@@ -85,7 +85,7 @@ class BlockManagerInfoSuite extends SparkFunSuite {
8585
Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
8686
val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else Set(rddId)
8787
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForOneMemoryOnly)
88-
assert(bmInfo.remainingMem == 30000)
88+
assert(bmInfo.remainingMem === 30000)
8989
if (svcEnabled) {
9090
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
9191
Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
@@ -97,16 +97,16 @@ class BlockManagerInfoSuite extends SparkFunSuite {
9797
bmInfo.updateBlockInfo(rddId, StorageLevel.MEMORY_ONLY, memSize = 200, 0)
9898
assert(bmInfo.blocks.asScala === Map(rddId -> BlockStatus(StorageLevel.MEMORY_ONLY, 200, 0)))
9999
assert(bmInfo.exclusiveCachedBlocks === Set(rddId))
100-
assert(bmInfo.remainingMem == 29800)
100+
assert(bmInfo.remainingMem === 29800)
101101
if (svcEnabled) {
102-
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala == Map())
102+
assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
103103
}
104104

105105
bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, diskSize = 200)
106106
assert(bmInfo.blocks.asScala === Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
107107
val exclusiveCachedBlocksForNoMemoryOnly = if (svcEnabled) Set() else Set(rddId)
108108
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForNoMemoryOnly)
109-
assert(bmInfo.remainingMem == 30000)
109+
assert(bmInfo.remainingMem === 30000)
110110
if (svcEnabled) {
111111
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
112112
Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
@@ -116,11 +116,10 @@ class BlockManagerInfoSuite extends SparkFunSuite {
116116
testWithShuffleServiceOnOff("using invalid StorageLevel") { (svcEnabled, bmInfo) =>
117117
val rddId: BlockId = RDDBlockId(0, 0)
118118
bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, diskSize = 200)
119-
assert(bmInfo.blocks.asScala
120-
=== Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
119+
assert(bmInfo.blocks.asScala === Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
121120
val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else Set(rddId)
122121
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForOneMemoryOnly)
123-
assert(bmInfo.remainingMem == 30000)
122+
assert(bmInfo.remainingMem === 30000)
124123
if (svcEnabled) {
125124
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
126125
Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
@@ -129,20 +128,19 @@ class BlockManagerInfoSuite extends SparkFunSuite {
129128
bmInfo.updateBlockInfo(rddId, StorageLevel.NONE, memSize = 0, diskSize = 200)
130129
assert(bmInfo.blocks.isEmpty)
131130
assert(bmInfo.exclusiveCachedBlocks.isEmpty)
132-
assert(bmInfo.remainingMem == 30000)
131+
assert(bmInfo.remainingMem === 30000)
133132
if (svcEnabled) {
134-
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala === Map())
133+
assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
135134
}
136135
}
137136

138137
testWithShuffleServiceOnOff("remove block") { (svcEnabled, bmInfo) =>
139138
val rddId: BlockId = RDDBlockId(0, 0)
140139
bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, diskSize = 200)
141-
assert(bmInfo.blocks.asScala
142-
=== Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
140+
assert(bmInfo.blocks.asScala === Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
143141
val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else Set(rddId)
144142
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForOneMemoryOnly)
145-
assert(bmInfo.remainingMem == 30000)
143+
assert(bmInfo.remainingMem === 30000)
146144
if (svcEnabled) {
147145
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
148146
Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
@@ -151,7 +149,7 @@ class BlockManagerInfoSuite extends SparkFunSuite {
151149
bmInfo.removeBlock(rddId)
152150
assert(bmInfo.blocks.asScala.isEmpty)
153151
assert(bmInfo.exclusiveCachedBlocks.isEmpty)
154-
assert(bmInfo.remainingMem == 30000)
152+
assert(bmInfo.remainingMem === 30000)
155153
if (svcEnabled) {
156154
assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
157155
}

0 commit comments

Comments
 (0)