Skip to content

Commit a0bc4f6

Browse files
committed
fix tests
1 parent 3152e9b commit a0bc4f6

File tree

2 files changed

+14
-21
lines changed

2 files changed

+14
-21
lines changed

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -849,8 +849,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
849849

850850
private val shuffleFileLossTests = Seq(
851851
("executor process lost with shuffle service", ExecutorProcessLost("", None), true, false),
852-
("worker lost with shuffle service", ExecutorProcessLost("", None), true, true),
853-
("worker lost without shuffle service", ExecutorProcessLost("", None), false, true),
852+
("worker lost with shuffle service", ExecutorProcessLost("", Some("hostA")), true, true),
853+
("worker lost without shuffle service", ExecutorProcessLost("", Some("hostA")), false, true),
854854
("executor failure with shuffle service", ExecutorKilled, true, false),
855855
("executor failure without shuffle service", ExecutorKilled, false, true))
856856

@@ -874,10 +874,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
874874
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
875875
submit(reduceRdd, Array(0))
876876
completeShuffleMapStageSuccessfully(0, 0, 1)
877+
val expectHostFileLoss = event match {
878+
case ExecutorProcessLost(_, hostOpt, _) => hostOpt.isDefined
879+
case _ => false
880+
}
877881
runEvent(ExecutorLost("hostA-exec", event))
878882
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
879883
if (expectFileLoss) {
880-
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
884+
if (expectHostFileLoss) {
885+
verify(mapOutputTracker, times(1)).removeOutputsOnHost("hostA")
886+
} else {
887+
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
888+
}
881889
intercept[MetadataFetchFailedException] {
882890
mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
883891
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,18 +1864,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
18641864
test("scheduler should keep the decommission state where host was decommissioned") {
18651865
val clock = new ManualClock(10000L)
18661866
val scheduler = setupSchedulerForDecommissionTests(clock, 2)
1867-
val oldTime = clock.getTimeMillis()
1867+
val decomTime = clock.getTimeMillis()
18681868
scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", None))
18691869
scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", Some("host1")))
18701870

1871-
clock.advance(3000L)
1872-
scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0 new", None))
1873-
scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1 new", None))
1874-
18751871
assert(scheduler.getExecutorDecommissionState("executor0")
1876-
=== Some(ExecutorDecommissionState(oldTime, None)))
1872+
=== Some(ExecutorDecommissionState(decomTime, None)))
18771873
assert(scheduler.getExecutorDecommissionState("executor1")
1878-
=== Some(ExecutorDecommissionState(oldTime, Some("host1"))))
1874+
=== Some(ExecutorDecommissionState(decomTime, Some("host1"))))
18791875
assert(scheduler.getExecutorDecommissionState("executor2").isEmpty)
18801876
}
18811877

@@ -1914,17 +1910,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
19141910
assert(manager.copiesRunning.take(2) === Array(0, 0))
19151911
clock.advance(2000)
19161912

1917-
// Decommission state should hang around a bit after removal ...
1918-
assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
1919-
scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", None))
1920-
clock.advance(2000)
1921-
assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
1922-
1923-
// The default timeout for expiry is 300k milliseconds (5 minutes) which completes now,
1924-
// and the executor1's decommission state should finally be purged.
1925-
clock.advance(300000)
1926-
assert(scheduler.getExecutorDecommissionState("executor1").isEmpty)
1927-
19281913
// Now give it some resources and both tasks should be rerun
19291914
val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq(
19301915
WorkerOffer("executor2", "host2", 1), WorkerOffer("executor3", "host3", 1))).flatten

0 commit comments

Comments
 (0)