Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ private[deploy] class Master(
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
worker.addDriver(driver)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One major question(though I haven't tested this) -- Won't we call schedule() after we completed recovery? I think we will handle the resource change correctly there.

Copy link
Contributor Author

@jerryshao jerryshao Jun 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, schedule() will only handle waiting drivers, but here is trying to calculate the exiting drivers, so I don't think schedule() will save the issue here. Let me try to test on latest master and back to you the result.

}
}
case None =>
Expand Down Expand Up @@ -547,6 +547,9 @@ private[deploy] class Master(
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

// Update the state of recovered apps to RUNNING
apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also been done later in schedule().


// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
Expand Down
120 changes: 117 additions & 3 deletions core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ import java.util.Date
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}
import scala.concurrent.duration._
import scala.io.Source
import scala.language.postfixOps
import scala.reflect.ClassTag

import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfter, Matchers, PrivateMethodTester}
import org.scalatest.concurrent.Eventually
import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy._
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEnv}
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.serializer

class MasterSuite extends SparkFunSuite
with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
Expand Down Expand Up @@ -134,6 +138,71 @@ class MasterSuite extends SparkFunSuite
CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
}

test("master correctly recover the application") {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.deploy.recoveryMode", "CUSTOM")
conf.set("spark.deploy.recoveryMode.factory",
classOf[FakeRecoveryModeFactory].getCanonicalName)
conf.set("spark.master.rest.enabled", "false")

val fakeAppInfo = makeAppInfo(1024)
val fakeWorkerInfo = makeWorkerInfo(8192, 16)
val fakeDriverInfo = new DriverInfo(
startTime = 0,
id = "test_driver",
desc = new DriverDescription(
jarUrl = "",
mem = 1024,
cores = 1,
supervise = false,
command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
submitDate = new Date())

// Build the fake recovery data
FakeRecoveryModeFactory.persistentData.put(s"app_${fakeAppInfo.id}", fakeAppInfo)
FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", fakeDriverInfo)
FakeRecoveryModeFactory.persistentData.put(s"worker_${fakeWorkerInfo.id}", fakeWorkerInfo)

var master: Master = null
try {
master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
// Wait until Master recover from checkpoint data.
eventually(timeout(5 seconds), interval(100 milliseconds)) {
master.idToApp.size should be(1)
}

master.idToApp.keySet should be(Set(fakeAppInfo.id))
getDrivers(master) should be(Set(fakeDriverInfo))
master.workers should be(Set(fakeWorkerInfo))

// Notify Master about the executor and driver info to make it correctly recovered.
val fakeExecutors = List(
new ExecutorDescription(fakeAppInfo.id, 0, 8, ExecutorState.RUNNING),
new ExecutorDescription(fakeAppInfo.id, 0, 7, ExecutorState.RUNNING))
master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
master.self.send(
WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, Seq(fakeDriverInfo.id)))

eventually(timeout(5 seconds), interval(100 microseconds)) {
getState(master) should be(RecoveryState.ALIVE)
}

// If driver's resource is also counted, free cores should 0
fakeWorkerInfo.coresFree should be(0)
fakeWorkerInfo.coresUsed should be(16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also test these 2 before recovering

// State of application should be RUNNING
fakeAppInfo.state should be(ApplicationState.RUNNING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also test these before the recovery? To show that we do change something when recovering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks for review.

} finally {
if (master != null) {
master.rpcEnv.shutdown()
master.rpcEnv.awaitTermination()
master = null
FakeRecoveryModeFactory.persistentData.clear()
}
}
}

test("master/worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
Expand Down Expand Up @@ -394,6 +463,9 @@ class MasterSuite extends SparkFunSuite
// ==========================================

private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
private val _drivers = PrivateMethod[HashSet[DriverInfo]]('drivers)
private val _state = PrivateMethod[RecoveryState.Value]('state)

private val workerInfo = makeWorkerInfo(4096, 10)
private val workerInfos = Array(workerInfo, workerInfo, workerInfo)

Expand All @@ -412,12 +484,18 @@ class MasterSuite extends SparkFunSuite
val desc = new ApplicationDescription(
"test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor)
val appId = System.currentTimeMillis.toString
new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue)
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
new ApplicationInfo(0, appId, desc, new Date, endpointRef, Int.MaxValue)
}

private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
val workerId = System.currentTimeMillis.toString
new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, "http://localhost:80")
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
new WorkerInfo(workerId, "host", 100, cores, memoryMb, endpointRef, "http://localhost:80")
}

private def scheduleExecutorsOnWorkers(
Expand Down Expand Up @@ -499,4 +577,40 @@ class MasterSuite extends SparkFunSuite
assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
}
}

private def getDrivers(master: Master): HashSet[DriverInfo] = {
master.invokePrivate(_drivers())
}

private def getState(master: Master): RecoveryState.Value = {
master.invokePrivate(_state())
}
}

private class FakeRecoveryModeFactory(conf: SparkConf, ser: serializer.Serializer)
extends StandaloneRecoveryModeFactory(conf, ser) {
import FakeRecoveryModeFactory.persistentData

override def createPersistenceEngine(): PersistenceEngine = new PersistenceEngine {

override def unpersist(name: String): Unit = {
persistentData.remove(name)
}

override def persist(name: String, obj: Object): Unit = {
persistentData(name) = obj
}

override def read[T: ClassTag](prefix: String): Seq[T] = {
persistentData.filter(_._1.startsWith(prefix)).map(_._2.asInstanceOf[T]).toSeq
}
}

override def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
new MonarchyLeaderAgent(master)
}
}

private object FakeRecoveryModeFactory {
val persistentData = new HashMap[String, Object]()
}