1717
1818package org .apache .spark .network .shuffle ;
1919
20+ import java .io .File ;
2021import java .io .IOException ;
2122import java .nio .ByteBuffer ;
2223import java .util .Arrays ;
2324import java .util .Collections ;
25+ import java .util .HashMap ;
2426import java .util .HashSet ;
2527import java .util .LinkedList ;
2628import java .util .List ;
3133
3234import com .google .common .collect .ImmutableMap ;
3335import com .google .common .collect .Sets ;
36+ import org .apache .spark .network .buffer .FileSegmentManagedBuffer ;
37+ import org .apache .spark .network .server .OneForOneStreamManager ;
3438import org .junit .After ;
3539import org .junit .AfterClass ;
3640import org .junit .BeforeClass ;
@@ -52,6 +56,11 @@ public class ExternalShuffleIntegrationSuite {
5256 private static final String APP_ID = "app-id" ;
5357 private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" ;
5458
59+ private static final int RDD_ID = 1 ;
60+ private static final int SPLIT_INDEX_VALID_BLOCK = 0 ;
61+ private static final int SPLIT_INDEX_MISSING_FILE = 1 ;
62+ private static final int SPLIT_INDEX_CORRUPT_LENGTH = 2 ;
63+
5564 // Executor 0 is sort-based
5665 static TestShuffleDataContext dataContext0 ;
5766
@@ -60,6 +69,8 @@ public class ExternalShuffleIntegrationSuite {
6069 static TransportConf conf ;
6170 static TransportContext transportContext ;
6271
72+ static byte [] exec0RddBlock = new byte [123 ];
73+
6374 static byte [][] exec0Blocks = new byte [][] {
6475 new byte [123 ],
6576 new byte [12345 ],
@@ -81,13 +92,36 @@ public static void beforeAll() throws IOException {
8192 for (byte [] block : exec1Blocks ) {
8293 rand .nextBytes (block );
8394 }
95+ rand .nextBytes (exec0RddBlock );
8496
8597 dataContext0 = new TestShuffleDataContext (2 , 5 );
8698 dataContext0 .create ();
8799 dataContext0 .insertSortShuffleData (0 , 0 , exec0Blocks );
88-
89- conf = new TransportConf ("shuffle" , MapConfigProvider .EMPTY );
90- handler = new ExternalShuffleBlockHandler (conf , null );
100+ dataContext0 .insertCachedRddData ( RDD_ID , SPLIT_INDEX_VALID_BLOCK , exec0RddBlock );
101+
102+ HashMap <String , String > config = new HashMap <>();
103+ config .put ("spark.shuffle.io.maxRetries" , "0" );
104+ conf = new TransportConf ("shuffle" , new MapConfigProvider (config ));
105+ handler = new ExternalShuffleBlockHandler (
106+ new OneForOneStreamManager (),
107+ new ExternalShuffleBlockResolver (conf , null ) {
108+ @ Override
109+ public ManagedBuffer getRddBlockData (String appId , String execId , int rddId , int splitIdx ) {
110+ ManagedBuffer res ;
111+ if (rddId == RDD_ID ) {
112+ switch (splitIdx ) {
113+ case SPLIT_INDEX_CORRUPT_LENGTH :
114+ res = new FileSegmentManagedBuffer (conf , new File ("missing.file" ), 0 , 12 );
115+ break ;
116+ default :
117+ res = super .getRddBlockData (appId , execId , rddId , splitIdx );
118+ }
119+ } else {
120+ res = super .getRddBlockData (appId , execId , rddId , splitIdx );
121+ }
122+ return res ;
123+ }
124+ });
91125 transportContext = new TransportContext (conf , handler );
92126 server = transportContext .createServer ();
93127 }
@@ -199,9 +233,38 @@ public void testRegisterInvalidExecutor() throws Exception {
199233 @ Test
200234 public void testFetchWrongBlockId () throws Exception {
201235 registerExecutor ("exec-1" , dataContext0 .createExecutorInfo (SORT_MANAGER ));
202- FetchResult execFetch = fetchBlocks ("exec-1" , new String [] { "rdd_1_0_0" });
236+ FetchResult execFetch = fetchBlocks ("exec-1" , new String [] { "broadcast_1" });
237+ assertTrue (execFetch .successBlocks .isEmpty ());
238+ assertEquals (Sets .newHashSet ("broadcast_1" ), execFetch .failedBlocks );
239+ }
240+
241+ @ Test
242+ public void testFetchValidRddBlock () throws Exception {
243+ registerExecutor ("exec-1" , dataContext0 .createExecutorInfo (SORT_MANAGER ));
244+ String validBlockId = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_VALID_BLOCK ;
245+ FetchResult execFetch = fetchBlocks ("exec-1" , new String [] { validBlockId });
246+ assertTrue (execFetch .failedBlocks .isEmpty ());
247+ assertEquals (Sets .newHashSet (validBlockId ), execFetch .successBlocks );
248+ assertBuffersEqual (new NioManagedBuffer (ByteBuffer .wrap (exec0RddBlock )),
249+ execFetch .buffers .get (0 ));
250+ }
251+
252+ @ Test
253+ public void testFetchDeletedRddBlock () throws Exception {
254+ registerExecutor ("exec-1" , dataContext0 .createExecutorInfo (SORT_MANAGER ));
255+ String missingBlockId = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_MISSING_FILE ;
256+ FetchResult execFetch = fetchBlocks ("exec-1" , new String [] { missingBlockId });
257+ assertTrue (execFetch .successBlocks .isEmpty ());
258+ assertEquals (Sets .newHashSet (missingBlockId ), execFetch .failedBlocks );
259+ }
260+
261+ @ Test
262+ public void testFetchCorruptRddBlock () throws Exception {
263+ registerExecutor ("exec-1" , dataContext0 .createExecutorInfo (SORT_MANAGER ));
264+ String corruptBlockId = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_CORRUPT_LENGTH ;
265+ FetchResult execFetch = fetchBlocks ("exec-1" , new String [] { corruptBlockId });
203266 assertTrue (execFetch .successBlocks .isEmpty ());
204- assertEquals (Sets .newHashSet ("rdd_1_0_0" ), execFetch .failedBlocks );
267+ assertEquals (Sets .newHashSet (corruptBlockId ), execFetch .failedBlocks );
205268 }
206269
207270 @ Test
0 commit comments