From b376114a333f4a1d3b2153f75b14856c9ba72875 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 31 Mar 2015 01:00:54 -0700 Subject: [PATCH 1/4] Fix bug that prevented jobs with inherited job group properties from being cancelled. --- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../apache/spark/JobCancellationSuite.scala | 35 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b405bd3338e7c..356790c4f0ad7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -675,7 +675,7 @@ class DAGScheduler( // Cancel all jobs belonging to this job group. // First finds all active jobs with this group id, and then kill stages for them. val activeInGroup = activeJobs.filter(activeJob => - groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) + groupId == activeJob.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId))) submitWaitingStages() diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 21487bc24d58a..c93e76377648b 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -141,6 +141,41 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter assert(jobB.get() === 100) } + test("inherited job group") { + sc = new SparkContext("local[2]", "test") + + // Add a listener to release the semaphore once any tasks are launched. + val sem = new Semaphore(0) + sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart) { + sem.release() + } + }) + + sc.setJobGroup("jobA", "this is a job to be cancelled") + @volatile var exception: Exception = null + val jobA = new Thread() { + // The job group should be inherited by this thread + override def run(): Unit = { + exception = intercept[SparkException] { + sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count() + } + } + } + jobA.start() + + // Block until both tasks of job A have started and cancel job A. + sem.acquire(2) + sc.cancelJobGroup("jobA") + jobA.join(10000) + assert(!jobA.isAlive) + assert(exception.getMessage contains "cancel") + + // Once A is cancelled, job B should finish fairly quickly. + val jobB = sc.parallelize(1 to 100, 2).countAsync() + assert(jobB.get() === 100) + } + test("job group with interruption") { sc = new SparkContext("local[2]", "test") From 707e4170c97aa38e529c457a6760b15e2e75e02b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 31 Mar 2015 01:10:58 -0700 Subject: [PATCH 2/4] Clone local properties to prevent mutations from breaking job cancellation. --- .../scala/org/apache/spark/SparkContext.scala | 8 ++-- .../org/apache/spark/ThreadingSuite.scala | 46 ++++++++++++++++++- 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a70be16f77eeb..9901c08ad343d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -34,6 +34,8 @@ import scala.reflect.{ClassTag, classTag} import akka.actor.Props +import org.apache.commons.lang3.SerializationUtils + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, @@ -1489,7 +1491,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, - resultHandler, localProperties.get) + resultHandler, SerializationUtils.clone(localProperties.get)) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() } @@ -1575,7 +1577,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo("Starting job: " + callSite.shortForm) val start = System.nanoTime val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, - localProperties.get) + SerializationUtils.clone(localProperties.get)) logInfo( "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s") result @@ -1603,7 +1605,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli callSite, allowLocal = false, resultHandler, - localProperties.get) + SerializationUtils.clone(localProperties.get)) new SimpleFutureAction(waiter, resultFunc) } diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index b5383d553add1..430eabb52d068 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -17,10 +17,11 @@ package org.apache.spark -import java.util.concurrent.Semaphore +import java.util.concurrent.{TimeUnit, Semaphore} import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.scheduler._ import org.scalatest.FunSuite /** @@ -189,4 +190,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext { assert(sc.getLocalProperty("test") === "parent") assert(sc.getLocalProperty("Foo") === null) } + + test("mutations to local properties should not affect submitted jobs") { + val jobStarted = new Semaphore(0) + val jobEnded = new Semaphore(0) + @volatile var jobResult: JobResult = null + + sc = new SparkContext("local", "test") + sc.setJobGroup("originalJobGroupId", "description") + sc.addSparkListener(new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStarted.release() + } + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobResult = jobEnd.jobResult + jobEnded.release() + } + }) + + // Create a new thread which will inherit the current thread's properties + val thread = new Thread() { + override def run(): Unit = { + assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId") + // Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task + try { + sc.parallelize(1 to 100).foreach { x => + Thread.sleep(100) + } + } catch { + case s: SparkException => // ignored so that we don't print noise in test logs + } + } + } + thread.start() + // Wait for the job to start, then mutate the original properties, which should have been + // inherited by the running job but hopefully defensively copied or snapshotted: + jobStarted.tryAcquire(10, TimeUnit.SECONDS) + sc.setJobGroup("modifiedJobGroupId", "description") + // Canceling the original job group should cancel the running job. In other words, the + // modification of the properties object should not affect the properties of running jobs + sc.cancelJobGroup("originalJobGroupId") + jobEnded.tryAcquire(10, TimeUnit.SECONDS) + assert(jobResult.isInstanceOf[JobFailed]) + } } From 3f7b9e83dc199d8ed772a7822ae4b012c888ca2a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 31 Mar 2015 01:19:02 -0700 Subject: [PATCH 3/4] Add JIRA reference; move clone into DAGScheduler --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 +++----- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 ++++++-- .../scala/org/apache/spark/JobCancellationSuite.scala | 2 +- core/src/test/scala/org/apache/spark/ThreadingSuite.scala | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9901c08ad343d..a70be16f77eeb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -34,8 +34,6 @@ import scala.reflect.{ClassTag, classTag} import akka.actor.Props -import org.apache.commons.lang3.SerializationUtils - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, @@ -1491,7 +1489,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, - resultHandler, SerializationUtils.clone(localProperties.get)) + resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() } @@ -1577,7 +1575,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo("Starting job: " + callSite.shortForm) val start = System.nanoTime val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, - SerializationUtils.clone(localProperties.get)) + localProperties.get) logInfo( "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s") result @@ -1605,7 +1603,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli callSite, allowLocal = false, resultHandler, - SerializationUtils.clone(localProperties.get)) + localProperties.get) new SimpleFutureAction(waiter, resultFunc) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 356790c4f0ad7..15c96065553b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -31,6 +31,8 @@ import scala.util.control.NonFatal import akka.pattern.ask import akka.util.Timeout +import org.apache.commons.lang3.SerializationUtils + import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.TaskMetrics @@ -493,7 +495,8 @@ class DAGScheduler( val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( - jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)) + jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, + SerializationUtils.clone(properties))) waiter } @@ -534,7 +537,8 @@ class DAGScheduler( val partitions = (0 until rdd.partitions.size).toArray val jobId = nextJobId.getAndIncrement() eventProcessLoop.post(JobSubmitted( - jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)) + jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, + SerializationUtils.clone(properties))) listener.awaitResult() // Will throw an exception if the job fails } diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index c93e76377648b..feb9c15656489 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -141,7 +141,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter assert(jobB.get() === 100) } - test("inherited job group") { + test("inherited job group (SPARK-6629)") { sc = new SparkContext("local[2]", "test") // Add a listener to release the semaphore once any tasks are launched. diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index 430eabb52d068..10917c866cc7d 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -191,7 +191,7 @@ class ThreadingSuite extends FunSuite with LocalSparkContext { assert(sc.getLocalProperty("Foo") === null) } - test("mutations to local properties should not affect submitted jobs") { + test("mutations to local properties should not affect submitted jobs (SPARK-6629)") { val jobStarted = new Semaphore(0) val jobEnded = new Semaphore(0) @volatile var jobResult: JobResult = null From 9e29654e066b4325ff7897725ea882376dc2603a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 18 Apr 2015 17:23:02 -0700 Subject: [PATCH 4/4] Fix style issue --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0718cedd23895..7fe980968f77d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -690,8 +690,11 @@ class DAGScheduler( private[scheduler] def handleJobGroupCancelled(groupId: String) { // Cancel all jobs belonging to this job group. // First finds all active jobs with this group id, and then kill stages for them. - val activeInGroup = activeJobs.filter(activeJob => - Option(activeJob.properties).exists(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID) == groupId)) + val activeInGroup = activeJobs.filter { activeJob => + Option(activeJob.properties).exists { + _.getProperty(SparkContext.SPARK_JOB_GROUP_ID) == groupId + } + } val jobIds = activeInGroup.map(_.jobId) jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId))) submitWaitingStages()