@@ -1325,16 +1325,17 @@ class DAGSchedulerSuite
13251325 assertDataStructuresEmpty()
13261326 }
13271327
1328- def checkJobProperties (taskSet : TaskSet , expected : String ): Unit = {
1328+ def checkJobPropertiesAndPriority (taskSet : TaskSet , expected : String , priority : Int ): Unit = {
13291329 assert(taskSet.properties != null )
13301330 assert(taskSet.properties.getProperty(" testProperty" ) === expected)
1331+ assert(taskSet.priority === priority)
13311332 }
13321333
13331334 def launchJobsThatShareStageAndCancelFirst (): ShuffleDependency [Int , Int , Nothing ] = {
13341335 val baseRdd = new MyRDD (sc, 1 , Nil )
1335- val shuffleDep1 = new ShuffleDependency (baseRdd, null )
1336+ val shuffleDep1 = new ShuffleDependency (baseRdd, new HashPartitioner ( 1 ) )
13361337 val intermediateRdd = new MyRDD (sc, 1 , List (shuffleDep1))
1337- val shuffleDep2 = new ShuffleDependency (intermediateRdd, null )
1338+ val shuffleDep2 = new ShuffleDependency (intermediateRdd, new HashPartitioner ( 1 ) )
13381339 val finalRdd1 = new MyRDD (sc, 1 , List (shuffleDep2))
13391340 val finalRdd2 = new MyRDD (sc, 1 , List (shuffleDep2))
13401341 val job1Properties = new Properties ()
@@ -1353,7 +1354,6 @@ class DAGSchedulerSuite
13531354
13541355 // remove job1 as an ActiveJob
13551356 cancel(jobId1)
1356- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
13571357
13581358 // job2 should still be running
13591359 assert(scheduler.activeJobs.nonEmpty)
@@ -1364,7 +1364,8 @@ class DAGSchedulerSuite
13641364 // even though we have cancelled that job and are now running it because of job2, we haven't
13651365 // updated the TaskSet's properties. Changing the properties to "job2" is likely the more
13661366 // correct behavior.
1367- checkJobProperties(taskSets(0 ), " job1" )
1367+ val job1Id = 0 // TaskSet priority for Stages run with "job1" as the ActiveJob
1368+ checkJobPropertiesAndPriority(taskSets(0 ), " job1" , job1Id)
13681369 complete(taskSets(0 ), Seq ((Success , makeMapStatus(" hostA" , 1 ))))
13691370
13701371 shuffleDep1
@@ -1380,7 +1381,7 @@ class DAGSchedulerSuite
13801381 // The next check is the key for SPARK-6880. For the stage which was shared by both job1 and
13811382 // job2 but never had any tasks submitted for job1, the properties of job2 are now used to run
13821383 // the stage.
1383- checkJobProperties (taskSets(1 ), " job2" )
1384+ checkJobPropertiesAndPriority (taskSets(1 ), " job2" , 1 )
13841385
13851386 complete(taskSets(1 ), Seq ((Success , makeMapStatus(" hostA" , 1 ))))
13861387 assert(taskSets(2 ).properties != null )
@@ -1399,6 +1400,7 @@ class DAGSchedulerSuite
13991400 test(" stage used by two jobs, some fetch failures, and the first job no longer active " +
14001401 " (SPARK-6880)" ) {
14011402 val shuffleDep1 = launchJobsThatShareStageAndCancelFirst()
1403+ val job2Id = 1 // TaskSet priority for Stages run with "job2" as the ActiveJob
14021404
14031405 // lets say there is a fetch failure in this task set, which makes us go back and
14041406 // run stage 0, attempt 1
@@ -1409,13 +1411,13 @@ class DAGSchedulerSuite
14091411 // stage 0, attempt 1 should have the properties of job2
14101412 assert(taskSets(2 ).stageId === 0 )
14111413 assert(taskSets(2 ).stageAttemptId === 1 )
1412- checkJobProperties (taskSets(2 ), " job2" )
1414+ checkJobPropertiesAndPriority (taskSets(2 ), " job2" , job2Id )
14131415
14141416 // run the rest of the stages normally, checking that they have the correct properties
14151417 complete(taskSets(2 ), Seq ((Success , makeMapStatus(" hostA" , 1 ))))
1416- checkJobProperties (taskSets(3 ), " job2" )
1418+ checkJobPropertiesAndPriority (taskSets(3 ), " job2" , job2Id )
14171419 complete(taskSets(3 ), Seq ((Success , makeMapStatus(" hostA" , 1 ))))
1418- checkJobProperties (taskSets(4 ), " job2" )
1420+ checkJobPropertiesAndPriority (taskSets(4 ), " job2" , job2Id )
14191421 complete(taskSets(4 ), Seq ((Success , 42 )))
14201422 assert(results === Map (0 -> 42 ))
14211423 assert(scheduler.activeJobs.isEmpty)
0 commit comments