@@ -274,4 +274,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
274274 assert(" executor1" === taskDescriptions3(0 ).executorId)
275275 }
276276
277+ test(" if an executor is lost then state for tasks running on that executor is cleaned up" ) {
278+ sc = new SparkContext (" local" , " TaskSchedulerImplSuite" )
279+ val taskScheduler = new TaskSchedulerImpl (sc)
280+ taskScheduler.initialize(new FakeSchedulerBackend )
281+ // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
282+ new DAGScheduler (sc, taskScheduler) {
283+ override def taskStarted (task : Task [_], taskInfo : TaskInfo ) {}
284+ override def executorAdded (execId : String , host : String ) {}
285+ }
286+
287+ val e0Offers = Seq (new WorkerOffer (" executor0" , " host0" , 1 ))
288+ val attempt1 = FakeTask .createTaskSet(1 )
289+
290+ // submit attempt 1, offer resources, task gets scheduled
291+ taskScheduler.submitTasks(attempt1)
292+ val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
293+ assert(1 === taskDescriptions.length)
294+
295+ // mark executor0 as dead
296+ taskScheduler.executorLost(" executor0" , SlaveLost ())
297+
298+ // Check that state associated with the lost task attempt is cleaned up:
299+ assert(taskScheduler.taskIdToExecutorId.isEmpty)
300+ assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
301+ assert(taskScheduler.runningTasksByExecutors().get(" executor0" ).isEmpty)
302+ }
277303}
0 commit comments