Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark
import java.io.File
import java.net.{MalformedURLException, URI}
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.concurrent.{Semaphore, TimeUnit}

import scala.concurrent.duration._

Expand Down Expand Up @@ -499,6 +499,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("Cancelling stages/jobs with custom reasons.") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
val REASON = "You shall not pass"
val slices = 10

val listener = new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
Expand All @@ -508,6 +509,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
sc.cancelStage(taskStart.stageId, REASON)
SparkContextSuite.cancelStage = false
SparkContextSuite.semaphore.release(slices)
}
}

Expand All @@ -518,21 +520,25 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
sc.cancelJob(jobStart.jobId, REASON)
SparkContextSuite.cancelJob = false
SparkContextSuite.semaphore.release(slices)
}
}
}
sc.addSparkListener(listener)

for (cancelWhat <- Seq("stage", "job")) {
SparkContextSuite.semaphore.drainPermits()
SparkContextSuite.isTaskStarted = false
SparkContextSuite.cancelStage = (cancelWhat == "stage")
SparkContextSuite.cancelJob = (cancelWhat == "job")

val ex = intercept[SparkException] {
sc.range(0, 10000L).mapPartitions { x =>
org.apache.spark.SparkContextSuite.isTaskStarted = true
sc.range(0, 10000L, numSlices = slices).mapPartitions { x =>
SparkContextSuite.isTaskStarted = true
// Block waiting for the listener to cancel the stage or job.
SparkContextSuite.semaphore.acquire()
x
}.cartesian(sc.range(0, 10L))count()
}.count()
}

ex.getCause() match {
Expand Down Expand Up @@ -636,4 +642,5 @@ object SparkContextSuite {
@volatile var isTaskStarted = false
@volatile var taskKilled = false
@volatile var taskSucceeded = false
val semaphore = new Semaphore(0)
}