From 90caf6d689405dc0b9dde3195e00091b7a1a1f61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Fri, 15 Feb 2019 16:18:19 +0100 Subject: [PATCH 1/3] initial commit --- .../cluster/YarnSchedulerBackendSuite.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala index 5d285f89f22f5..bf874cf7287eb 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster import java.net.URL +import java.util.concurrent.atomic.AtomicReference import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import scala.language.reflectiveCalls @@ -34,8 +35,15 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc test("RequestExecutors reflects node blacklist and is serializable") { sc = new SparkContext("local", "YarnSchedulerBackendSuite") - val sched = mock[TaskSchedulerImpl] - when(sched.sc).thenReturn(sc) + // Subclassing the TaskSchedulerImpl here instead of using Mockito. For details see SPARK-26891. + val sched = new TaskSchedulerImpl(sc) { + val blacklistedNodes = new AtomicReference[Set[String]]() + + def setNodeBlacklist(nodeBlacklist: Set[String]): Unit = blacklistedNodes.set(nodeBlacklist) + + override def nodeBlacklist(): Set[String] = blacklistedNodes.get() + } + val yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) { def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = { this.hostToLocalTaskCount = hostToLocalTaskCount @@ -51,7 +59,7 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc ) } { yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount) - when(sched.nodeBlacklist()).thenReturn(blacklist) + sched.setNodeBlacklist(blacklist) val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested) assert(req.requestedTotal === numRequested) assert(req.nodeBlacklist === blacklist) From 765f7e896e0bd0bd359c7b810b9aaed59b341b5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Sat, 16 Feb 2019 10:41:39 +0100 Subject: [PATCH 2/3] Fixing resource leak --- .../cluster/YarnSchedulerBackendSuite.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala index bf874cf7287eb..d0a31ffbc4d0f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala @@ -33,6 +33,16 @@ import org.apache.spark.ui.TestFilter class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with LocalSparkContext { + private var yarnSchedulerBackend: YarnSchedulerBackend = _ + + override def afterEach() { + try { + yarnSchedulerBackend.stop() + } finally { + super.afterEach() + } + } + test("RequestExecutors reflects node blacklist and is serializable") { sc = new SparkContext("local", "YarnSchedulerBackendSuite") // Subclassing the TaskSchedulerImpl here instead of using Mockito. For details see SPARK-26891. @@ -44,11 +54,12 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc override def nodeBlacklist(): Set[String] = blacklistedNodes.get() } - val yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) { + val yarnSchedulerBackendExtended = new YarnSchedulerBackend(sched, sc) { def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = { this.hostToLocalTaskCount = hostToLocalTaskCount } } + yarnSchedulerBackend = yarnSchedulerBackendExtended val ser = new JavaSerializer(sc.conf).newInstance() for { blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c")) @@ -58,9 +69,9 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc Map("a" -> 1, "b" -> 2) ) } { - yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount) + yarnSchedulerBackendExtended.setHostToLocalTaskCount(hostToLocalCount) sched.setNodeBlacklist(blacklist) - val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested) + val req = yarnSchedulerBackendExtended.prepareRequestExecutors(numRequested) assert(req.requestedTotal === numRequested) assert(req.nodeBlacklist === blacklist) assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty) @@ -83,9 +94,9 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc // Before adding the "YARN" filter, should get the code from the filter in SparkConf. assert(TestUtils.httpResponseCode(url) === HttpServletResponse.SC_BAD_GATEWAY) - val backend = new YarnSchedulerBackend(sched, sc) { } + yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) { } - backend.addWebUIFilter(classOf[TestFilter2].getName(), + yarnSchedulerBackend.addWebUIFilter(classOf[TestFilter2].getName(), Map("responseCode" -> HttpServletResponse.SC_NOT_ACCEPTABLE.toString), "") sc.ui.get.getHandlers.foreach { h => From 1ff92bf4e895122855754834b1faab94813046ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Sun, 17 Feb 2019 19:57:26 +0100 Subject: [PATCH 3/3] adding missing if --- .../spark/scheduler/cluster/YarnSchedulerBackendSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala index d0a31ffbc4d0f..e49cd62c08878 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala @@ -37,7 +37,9 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc override def afterEach() { try { - yarnSchedulerBackend.stop() + if (yarnSchedulerBackend != null) { + yarnSchedulerBackend.stop() + } } finally { super.afterEach() }