Skip to content

Commit aaa3a7d

Browse files
committed
Added test "stage used by two jobs, the first no longer active"
1 parent ff54dc9 commit aaa3a7d

File tree

1 file changed

+42
-2
lines changed

1 file changed

+42
-2
lines changed

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.Properties
21+
2022
import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
2123
import scala.language.reflectiveCalls
2224
import scala.util.control.NonFatal
@@ -234,9 +236,10 @@ class DAGSchedulerSuite
234236
rdd: RDD[_],
235237
partitions: Array[Int],
236238
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
237-
listener: JobListener = jobListener): Int = {
239+
listener: JobListener = jobListener,
240+
properties: Properties = null): Int = {
238241
val jobId = scheduler.nextJobId.getAndIncrement()
239-
runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener))
242+
runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties))
240243
jobId
241244
}
242245

@@ -749,6 +752,43 @@ class DAGSchedulerSuite
749752
assertDataStructuresEmpty()
750753
}
751754

755+
/**
756+
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
757+
* later, active job if they were previously run under a job that is no longer active
758+
*/
759+
test("stage used by two jobs, the first no longer active") {
760+
val baseRdd = new MyRDD(sc, 1, Nil)
761+
val finalRdd1 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
762+
val finalRdd2 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
763+
val job1Properties = new Properties()
764+
val job2Properties = new Properties()
765+
job1Properties.setProperty("testProperty", "job1")
766+
job2Properties.setProperty("testProperty", "job2")
767+
768+
// run job1
769+
val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
770+
assert(scheduler.activeJobs.nonEmpty)
771+
val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")
772+
773+
// remove job1 as an ActiveJob
774+
cancel(jobId1)
775+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
776+
assert(sparkListener.failedStages.contains(0))
777+
assert(sparkListener.failedStages.size === 1)
778+
assert(scheduler.activeJobs.isEmpty)
779+
780+
// run job2
781+
val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
782+
assert(scheduler.activeJobs.nonEmpty)
783+
val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
784+
assert(testProperty1 != testProperty2)
785+
complete(taskSets(1), Seq((Success, 42)))
786+
assert(results === Map(0 -> 42))
787+
assert(scheduler.activeJobs.isEmpty)
788+
789+
assertDataStructuresEmpty()
790+
}
791+
752792
test("run trivial shuffle with out-of-band failure and retry") {
753793
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
754794
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)

0 commit comments

Comments
 (0)