Skip to content

Commit 1390e56

Browse files
Andrew Orpwendell
authored andcommitted
[SPARK-5388] Provide a stable application submission gateway for standalone cluster mode
The goal is to provide a stable, REST-based application submission gateway that is not inherently based on Akka, which is unstable across versions. This PR targets standalone cluster mode, but is implemented in a general enough manner that can be potentially extended to other modes in the future. Client mode is currently not included in the changes here because there are many more Akka messages exchanged there. As of the changes here, the Master will advertise two ports, 7077 and 6066. We need to keep around the old one (7077) for client mode and older versions of Spark submit. However, all new versions of Spark submit will use the REST gateway (6066). By the way this includes ~700 lines of tests and ~200 lines of license. Author: Andrew Or <[email protected]> Closes #4216 from andrewor14/rest and squashes the following commits: 8d7ce07 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest 6f0c597 [Andrew Or] Use nullable fields for integer and boolean values dfe4bd7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest b9e2a08 [Andrew Or] Minor comments 02b5cea [Andrew Or] Fix tests d2b1ef8 [Andrew Or] Comment changes + minor code refactoring across the board 9c82a36 [Andrew Or] Minor comment and wording updates b4695e7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest c9a8ad7 [Andrew Or] Do not include appResource and mainClass as properties 6fc7670 [Andrew Or] Report REST server response back to the user 40e6095 [Andrew Or] Pass submit parameters through system properties cbd670b [Andrew Or] Include unknown fields, if any, in server response 9fee16f [Andrew Or] Include server protocol version on mismatch 09f873a [Andrew Or] Fix style 8188e61 [Andrew Or] Upgrade Jackson from 2.3.0 to 2.4.4 37538e0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest 9165ae8 [Andrew Or] Fall back to Akka if endpoint was not REST 252d53c [Andrew Or] Clean up server error handling behavior further c643f64 [Andrew Or] Fix style bbbd329 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest 792e112 [Andrew Or] Use specific HTTP response codes on error f98660b [Andrew Or] Version the protocol and include it in REST URL 721819f [Andrew Or] Provide more REST-like interface for submit/kill/status 581f7bf [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest 9e0d1af [Andrew Or] Move some classes around to reduce number of files (minor) 42e5de4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest 1f1c03f [Andrew Or] Use Jackson's DefaultScalaModule to simplify messages 9229433 [Andrew Or] Reduce duplicate naming in REST field ade28fd [Andrew Or] Clean up REST response output in Spark submit b2fef8b [Andrew Or] Abstract the success field to the general response 6c57b4b [Andrew Or] Increase timeout in end-to-end tests bf696ff [Andrew Or] Add checks for enabling REST when using kill/status 7ee6737 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest e2f7f5f [Andrew Or] Provide more safeguard against missing fields 9581df7 [Andrew Or] Clean up uses of exceptions 914fdff [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest e2104e6 [Andrew Or] stable -> rest 3db7379 [Andrew Or] Fix comments and name fields for better error messages 8d43486 [Andrew Or] Replace SubmitRestProtocolAction with class name df90e8b [Andrew Or] Use Jackson for JSON de/serialization d7a1f9f [Andrew Or] Fix local cluster tests efa5e18 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest e42c131 [Andrew Or] Add end-to-end tests for standalone REST protocol 837475b [Andrew Or] Show the REST port on the Master UI d8d3717 [Andrew Or] Use a daemon thread pool for REST server 6568ca5 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest 77774ba [Andrew Or] Minor fixes 206cae4 [Andrew Or] Refactor and add tests for the REST protocol 63c05b3 [Andrew Or] Remove MASTER as a field (minor) 9e21b72 [Andrew Or] Action -> SparkSubmitAction (minor) 51c5ca6 [Andrew Or] Distinguish client and server side Spark versions b44e103 [Andrew Or] Implement status requests + fix validation behavior 120ab9d [Andrew Or] Support kill and request driver status through SparkSubmit 544de1d [Andrew Or] Major clean ups in code and comments e958cae [Andrew Or] Supported nested values in messages 484bd21 [Andrew Or] Specify an ordering for fields in SubmitDriverRequestMessage 6ff088d [Andrew Or] Rename classes to generalize REST protocol af9d9cb [Andrew Or] Integrate REST protocol in standalone mode 53e7c0e [Andrew Or] Initial client, server, and all the messages
1 parent e772b4e commit 1390e56

23 files changed

+2027
-94
lines changed

core/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,14 @@
243243
<groupId>io.dropwizard.metrics</groupId>
244244
<artifactId>metrics-graphite</artifactId>
245245
</dependency>
246+
<dependency>
247+
<groupId>com.fasterxml.jackson.core</groupId>
248+
<artifactId>jackson-databind</artifactId>
249+
</dependency>
250+
<dependency>
251+
<groupId>com.fasterxml.jackson.module</groupId>
252+
<artifactId>jackson-module-scala_2.10</artifactId>
253+
</dependency>
246254
<dependency>
247255
<groupId>org.apache.derby</groupId>
248256
<artifactId>derby</artifactId>

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ span.expand-details {
103103
float: right;
104104
}
105105

106+
span.rest-uri {
107+
font-size: 10pt;
108+
font-style: italic;
109+
color: gray;
110+
}
111+
106112
pre {
107113
font-size: 0.8em;
108114
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2110,7 +2110,7 @@ object SparkContext extends Logging {
21102110

21112111
val scheduler = new TaskSchedulerImpl(sc)
21122112
val localCluster = new LocalSparkCluster(
2113-
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
2113+
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
21142114
val masterUrls = localCluster.start()
21152115
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
21162116
scheduler.initialize(backend)

core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ import org.apache.spark.util.{IntParam, MemoryParam}
2929
* Command-line parser for the driver client.
3030
*/
3131
private[spark] class ClientArguments(args: Array[String]) {
32-
val defaultCores = 1
33-
val defaultMemory = 512
32+
import ClientArguments._
3433

3534
var cmd: String = "" // 'launch' or 'kill'
3635
var logLevel = Level.WARN
@@ -39,9 +38,9 @@ private[spark] class ClientArguments(args: Array[String]) {
3938
var master: String = ""
4039
var jarUrl: String = ""
4140
var mainClass: String = ""
42-
var supervise: Boolean = false
43-
var memory: Int = defaultMemory
44-
var cores: Int = defaultCores
41+
var supervise: Boolean = DEFAULT_SUPERVISE
42+
var memory: Int = DEFAULT_MEMORY
43+
var cores: Int = DEFAULT_CORES
4544
private var _driverOptions = ListBuffer[String]()
4645
def driverOptions = _driverOptions.toSeq
4746

@@ -50,7 +49,7 @@ private[spark] class ClientArguments(args: Array[String]) {
5049

5150
parse(args.toList)
5251

53-
def parse(args: List[String]): Unit = args match {
52+
private def parse(args: List[String]): Unit = args match {
5453
case ("--cores" | "-c") :: IntParam(value) :: tail =>
5554
cores = value
5655
parse(tail)
@@ -106,9 +105,10 @@ private[spark] class ClientArguments(args: Array[String]) {
106105
|Usage: DriverClient kill <active-master> <driver-id>
107106
|
108107
|Options:
109-
| -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
110-
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
108+
| -c CORES, --cores CORES Number of cores to request (default: $DEFAULT_CORES)
109+
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $DEFAULT_MEMORY)
111110
| -s, --supervise Whether to restart the driver on failure
111+
| (default: $DEFAULT_SUPERVISE)
112112
| -v, --verbose Print more debugging output
113113
""".stripMargin
114114
System.err.println(usage)
@@ -117,6 +117,10 @@ private[spark] class ClientArguments(args: Array[String]) {
117117
}
118118

119119
object ClientArguments {
120+
private[spark] val DEFAULT_CORES = 1
121+
private[spark] val DEFAULT_MEMORY = 512 // MB
122+
private[spark] val DEFAULT_SUPERVISE = false
123+
120124
def isValidJarUrl(s: String): Boolean = {
121125
try {
122126
val uri = new URI(s)

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,22 @@ private[deploy] object DeployMessages {
148148

149149
// Master to MasterWebUI
150150

151-
case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
152-
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
153-
activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
154-
status: MasterState) {
151+
case class MasterStateResponse(
152+
host: String,
153+
port: Int,
154+
restPort: Option[Int],
155+
workers: Array[WorkerInfo],
156+
activeApps: Array[ApplicationInfo],
157+
completedApps: Array[ApplicationInfo],
158+
activeDrivers: Array[DriverInfo],
159+
completedDrivers: Array[DriverInfo],
160+
status: MasterState) {
155161

156162
Utils.checkHost(host, "Required hostname")
157163
assert (port > 0)
158164

159165
def uri = "spark://" + host + ":" + port
166+
def restUri: Option[String] = restPort.map { p => "spark://" + host + ":" + p }
160167
}
161168

162169
// WorkerWebUI to Worker

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ import org.apache.spark.util.Utils
3333
* fault recovery without spinning up a lot of processes.
3434
*/
3535
private[spark]
36-
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int)
36+
class LocalSparkCluster(
37+
numWorkers: Int,
38+
coresPerWorker: Int,
39+
memoryPerWorker: Int,
40+
conf: SparkConf)
3741
extends Logging {
3842

3943
private val localHostname = Utils.localHostName()
@@ -43,9 +47,11 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
4347
def start(): Array[String] = {
4448
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
4549

50+
// Disable REST server on Master in this mode unless otherwise specified
51+
val _conf = conf.clone().setIfMissing("spark.master.rest.enabled", "false")
52+
4653
/* Start the Master */
47-
val conf = new SparkConf(false)
48-
val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0, conf)
54+
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
4955
masterActorSystems += masterSystem
5056
val masterUrl = "spark://" + localHostname + ":" + masterPort
5157
val masters = Array(masterUrl)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 110 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,35 @@
1818
package org.apache.spark.deploy
1919

2020
import java.io.{File, PrintStream}
21-
import java.lang.reflect.{Modifier, InvocationTargetException}
21+
import java.lang.reflect.{InvocationTargetException, Modifier}
2222
import java.net.URL
23+
2324
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
2425

2526
import org.apache.hadoop.fs.Path
2627
import org.apache.ivy.Ivy
2728
import org.apache.ivy.core.LogOptions
28-
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
29-
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
29+
import org.apache.ivy.core.module.descriptor._
30+
import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
3031
import org.apache.ivy.core.report.ResolveReport
31-
import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions}
32+
import org.apache.ivy.core.resolve.ResolveOptions
3233
import org.apache.ivy.core.retrieve.RetrieveOptions
3334
import org.apache.ivy.core.settings.IvySettings
3435
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
3536
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
36-
import org.apache.spark.executor.ExecutorURLClassLoader
37+
38+
import org.apache.spark.deploy.rest._
39+
import org.apache.spark.executor._
3740
import org.apache.spark.util.Utils
38-
import org.apache.spark.executor.ChildExecutorURLClassLoader
39-
import org.apache.spark.executor.MutableURLClassLoader
41+
42+
/**
43+
* Whether to submit, kill, or request the status of an application.
44+
* The latter two operations are currently supported only for standalone cluster mode.
45+
*/
46+
private[spark] object SparkSubmitAction extends Enumeration {
47+
type SparkSubmitAction = Value
48+
val SUBMIT, KILL, REQUEST_STATUS = Value
49+
}
4050

4151
/**
4252
* Main gateway of launching a Spark application.
@@ -83,21 +93,74 @@ object SparkSubmit {
8393
if (appArgs.verbose) {
8494
printStream.println(appArgs)
8595
}
86-
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
87-
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
96+
appArgs.action match {
97+
case SparkSubmitAction.SUBMIT => submit(appArgs)
98+
case SparkSubmitAction.KILL => kill(appArgs)
99+
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
100+
}
101+
}
102+
103+
/** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
104+
private def kill(args: SparkSubmitArguments): Unit = {
105+
new StandaloneRestClient()
106+
.killSubmission(args.master, args.submissionToKill)
88107
}
89108

90109
/**
91-
* @return a tuple containing
92-
* (1) the arguments for the child process,
93-
* (2) a list of classpath entries for the child,
94-
* (3) a list of system properties and env vars, and
95-
* (4) the main class for the child
110+
* Request the status of an existing submission using the REST protocol.
111+
* Standalone cluster mode only.
96112
*/
97-
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
98-
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
113+
private def requestStatus(args: SparkSubmitArguments): Unit = {
114+
new StandaloneRestClient()
115+
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
116+
}
99117

100-
// Values to return
118+
/**
119+
* Submit the application using the provided parameters.
120+
*
121+
* This runs in two steps. First, we prepare the launch environment by setting up
122+
* the appropriate classpath, system properties, and application arguments for
123+
* running the child main class based on the cluster manager and the deploy mode.
124+
* Second, we use this launch environment to invoke the main method of the child
125+
* main class.
126+
*/
127+
private[spark] def submit(args: SparkSubmitArguments): Unit = {
128+
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
129+
// In standalone cluster mode, there are two submission gateways:
130+
// (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
131+
// (2) The new REST-based gateway introduced in Spark 1.3
132+
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
133+
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
134+
if (args.isStandaloneCluster && args.useRest) {
135+
try {
136+
printStream.println("Running Spark using the REST application submission protocol.")
137+
runMain(childArgs, childClasspath, sysProps, childMainClass)
138+
} catch {
139+
// Fail over to use the legacy submission gateway
140+
case e: SubmitRestConnectionException =>
141+
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
142+
"Falling back to legacy submission gateway instead.")
143+
args.useRest = false
144+
submit(args)
145+
}
146+
// In all other modes, just run the main class as prepared
147+
} else {
148+
runMain(childArgs, childClasspath, sysProps, childMainClass)
149+
}
150+
}
151+
152+
/**
153+
* Prepare the environment for submitting an application.
154+
* This returns a 4-tuple:
155+
* (1) the arguments for the child process,
156+
* (2) a list of classpath entries for the child,
157+
* (3) a map of system properties, and
158+
* (4) the main class for the child
159+
* Exposed for testing.
160+
*/
161+
private[spark] def prepareSubmitEnvironment(args: SparkSubmitArguments)
162+
: (Seq[String], Seq[String], Map[String, String], String) = {
163+
// Return values
101164
val childArgs = new ArrayBuffer[String]()
102165
val childClasspath = new ArrayBuffer[String]()
103166
val sysProps = new HashMap[String, String]()
@@ -235,10 +298,13 @@ object SparkSubmit {
235298
sysProp = "spark.driver.extraLibraryPath"),
236299

237300
// Standalone cluster only
301+
// Do not set CL arguments here because there are multiple possibilities for the main class
238302
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
239303
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
240-
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
241-
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
304+
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
305+
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
306+
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
307+
sysProp = "spark.driver.supervise"),
242308

243309
// Yarn client only
244310
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
@@ -279,7 +345,6 @@ object SparkSubmit {
279345
if (args.childArgs != null) { childArgs ++= args.childArgs }
280346
}
281347

282-
283348
// Map all arguments to command-line options or system properties for our chosen mode
284349
for (opt <- options) {
285350
if (opt.value != null &&
@@ -301,14 +366,21 @@ object SparkSubmit {
301366
sysProps.put("spark.jars", jars.mkString(","))
302367
}
303368

304-
// In standalone-cluster mode, use Client as a wrapper around the user class
305-
if (clusterManager == STANDALONE && deployMode == CLUSTER) {
306-
childMainClass = "org.apache.spark.deploy.Client"
307-
if (args.supervise) {
308-
childArgs += "--supervise"
369+
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
370+
// All Spark parameters are expected to be passed to the client through system properties.
371+
if (args.isStandaloneCluster) {
372+
if (args.useRest) {
373+
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
374+
childArgs += (args.primaryResource, args.mainClass)
375+
} else {
376+
// In legacy standalone cluster mode, use Client as a wrapper around the user class
377+
childMainClass = "org.apache.spark.deploy.Client"
378+
if (args.supervise) { childArgs += "--supervise" }
379+
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
380+
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
381+
childArgs += "launch"
382+
childArgs += (args.master, args.primaryResource, args.mainClass)
309383
}
310-
childArgs += "launch"
311-
childArgs += (args.master, args.primaryResource, args.mainClass)
312384
if (args.childArgs != null) {
313385
childArgs ++= args.childArgs
314386
}
@@ -345,7 +417,7 @@ object SparkSubmit {
345417

346418
// Ignore invalid spark.driver.host in cluster modes.
347419
if (deployMode == CLUSTER) {
348-
sysProps -= ("spark.driver.host")
420+
sysProps -= "spark.driver.host"
349421
}
350422

351423
// Resolve paths in certain spark properties
@@ -374,9 +446,15 @@ object SparkSubmit {
374446
(childArgs, childClasspath, sysProps, childMainClass)
375447
}
376448

377-
private def launch(
378-
childArgs: ArrayBuffer[String],
379-
childClasspath: ArrayBuffer[String],
449+
/**
450+
* Run the main method of the child class using the provided launch environment.
451+
*
452+
* Note that this main class will not be the one provided by the user if we're
453+
* running cluster deploy mode or python applications.
454+
*/
455+
private def runMain(
456+
childArgs: Seq[String],
457+
childClasspath: Seq[String],
380458
sysProps: Map[String, String],
381459
childMainClass: String,
382460
verbose: Boolean = false) {
@@ -697,7 +775,7 @@ private[spark] object SparkSubmitUtils {
697775
* Provides an indirection layer for passing arguments as system properties or flags to
698776
* the user's driver program or to downstream launcher tools.
699777
*/
700-
private[spark] case class OptionAssigner(
778+
private case class OptionAssigner(
701779
value: String,
702780
clusterManager: Int,
703781
deployMode: Int,

0 commit comments

Comments
 (0)