Skip to content

Commit 4139a4e

Browse files
markhamstrasquito
authored andcommitted
[SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage
This issue was addressed in #5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug. The intent of `submitMissingTasks` should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId". Because of a long-standing bug, the `jobId` parameter was never being used. Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880. The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks. This fix should be applied to all maintenance branches, since it has existed since 1.0. kayousterhout pankajarora12 Author: Mark Hamstra <[email protected]> Author: Imran Rashid <[email protected]> Closes #6291 from markhamstra/SPARK-6880. (cherry picked from commit 0a5aef7) Signed-off-by: Imran Rashid <[email protected]>
1 parent 27b5f31 commit 4139a4e

File tree

2 files changed

+109
-4
lines changed

2 files changed

+109
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,9 @@ class DAGScheduler(
810810
stage.resetInternalAccumulators()
811811
}
812812

813-
val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
813+
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
814+
// with this Stage
815+
val properties = jobIdToActiveJob(jobId).properties
814816

815817
runningStages += stage
816818
// SparkListenerStageSubmitted should be posted before testing whether tasks are
@@ -905,7 +907,7 @@ class DAGScheduler(
905907
stage.pendingTasks ++= tasks
906908
logDebug("New pending tasks: " + stage.pendingTasks)
907909
taskScheduler.submitTasks(new TaskSet(
908-
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
910+
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
909911
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
910912
} else {
911913
// Because we posted SparkListenerStageSubmitted earlier, we should mark

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.Properties
21+
2022
import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
2123
import scala.language.reflectiveCalls
2224
import scala.util.control.NonFatal
@@ -234,9 +236,10 @@ class DAGSchedulerSuite
234236
rdd: RDD[_],
235237
partitions: Array[Int],
236238
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
237-
listener: JobListener = jobListener): Int = {
239+
listener: JobListener = jobListener,
240+
properties: Properties = null): Int = {
238241
val jobId = scheduler.nextJobId.getAndIncrement()
239-
runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener))
242+
runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties))
240243
jobId
241244
}
242245

@@ -750,6 +753,106 @@ class DAGSchedulerSuite
750753
assertDataStructuresEmpty()
751754
}
752755

756+
def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: String, priority: Int): Unit = {
757+
assert(taskSet.properties != null)
758+
assert(taskSet.properties.getProperty("testProperty") === expected)
759+
assert(taskSet.priority === priority)
760+
}
761+
762+
def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = {
763+
val baseRdd = new MyRDD(sc, 1, Nil)
764+
val shuffleDep1 = new ShuffleDependency(baseRdd, new HashPartitioner(1))
765+
val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
766+
val shuffleDep2 = new ShuffleDependency(intermediateRdd, new HashPartitioner(1))
767+
val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
768+
val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
769+
val job1Properties = new Properties()
770+
val job2Properties = new Properties()
771+
job1Properties.setProperty("testProperty", "job1")
772+
job2Properties.setProperty("testProperty", "job2")
773+
774+
// Run jobs 1 & 2, both referencing the same stage, then cancel job1.
775+
// Note that we have to submit job2 before we cancel job1 to have them actually share
776+
// *Stages*, and not just shuffle dependencies, due to skipped stages (at least until
777+
// we address SPARK-10193.)
778+
val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
779+
val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
780+
assert(scheduler.activeJobs.nonEmpty)
781+
val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")
782+
783+
// remove job1 as an ActiveJob
784+
cancel(jobId1)
785+
786+
// job2 should still be running
787+
assert(scheduler.activeJobs.nonEmpty)
788+
val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
789+
assert(testProperty1 != testProperty2)
790+
// NB: This next assert isn't necessarily the "desired" behavior; it's just to document
791+
// the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but
792+
// even though we have cancelled that job and are now running it because of job2, we haven't
793+
// updated the TaskSet's properties. Changing the properties to "job2" is likely the more
794+
// correct behavior.
795+
val job1Id = 0 // TaskSet priority for Stages run with "job1" as the ActiveJob
796+
checkJobPropertiesAndPriority(taskSets(0), "job1", job1Id)
797+
complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
798+
799+
shuffleDep1
800+
}
801+
802+
/**
803+
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
804+
* later, active job if they were previously run under a job that is no longer active
805+
*/
806+
test("stage used by two jobs, the first no longer active (SPARK-6880)") {
807+
launchJobsThatShareStageAndCancelFirst()
808+
809+
// The next check is the key for SPARK-6880. For the stage which was shared by both job1 and
810+
// job2 but never had any tasks submitted for job1, the properties of job2 are now used to run
811+
// the stage.
812+
checkJobPropertiesAndPriority(taskSets(1), "job2", 1)
813+
814+
complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
815+
assert(taskSets(2).properties != null)
816+
complete(taskSets(2), Seq((Success, 42)))
817+
assert(results === Map(0 -> 42))
818+
assert(scheduler.activeJobs.isEmpty)
819+
820+
assertDataStructuresEmpty()
821+
}
822+
823+
/**
824+
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
825+
* later, active job if they were previously run under a job that is no longer active, even when
826+
* there are fetch failures
827+
*/
828+
test("stage used by two jobs, some fetch failures, and the first job no longer active " +
829+
"(SPARK-6880)") {
830+
val shuffleDep1 = launchJobsThatShareStageAndCancelFirst()
831+
val job2Id = 1 // TaskSet priority for Stages run with "job2" as the ActiveJob
832+
833+
// lets say there is a fetch failure in this task set, which makes us go back and
834+
// run stage 0, attempt 1
835+
complete(taskSets(1), Seq(
836+
(FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null)))
837+
scheduler.resubmitFailedStages()
838+
839+
// stage 0, attempt 1 should have the properties of job2
840+
assert(taskSets(2).stageId === 0)
841+
assert(taskSets(2).stageAttemptId === 1)
842+
checkJobPropertiesAndPriority(taskSets(2), "job2", job2Id)
843+
844+
// run the rest of the stages normally, checking that they have the correct properties
845+
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
846+
checkJobPropertiesAndPriority(taskSets(3), "job2", job2Id)
847+
complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1))))
848+
checkJobPropertiesAndPriority(taskSets(4), "job2", job2Id)
849+
complete(taskSets(4), Seq((Success, 42)))
850+
assert(results === Map(0 -> 42))
851+
assert(scheduler.activeJobs.isEmpty)
852+
853+
assertDataStructuresEmpty()
854+
}
855+
753856
test("run trivial shuffle with out-of-band failure and retry") {
754857
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
755858
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)

0 commit comments

Comments
 (0)