Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit a9d7bba

Browse files
akirillovAnton Kirillov
authored andcommitted
Spark Dispatcher support for launching applications in the same virtual network by default (#45)
1 parent cec7c65 commit a9d7bba

File tree

3 files changed

+68
-8
lines changed

3 files changed

+68
-8
lines changed

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ private[mesos] class MesosSubmitRequestServlet(
7474
* This does not currently consider fields used by python applications since python
7575
* is not supported in mesos cluster mode yet.
7676
*/
77-
private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = {
77+
// Visible for testing.
78+
private[rest] def buildDriverDescription(
79+
request: CreateSubmissionRequest): MesosDriverDescription = {
7880
// Required fields, including the main class because python is not yet supported
7981
val appResource = Option(request.appResource).getOrElse {
8082
throw new SubmitRestMissingFieldException("Application jar 'appResource' is missing.")
@@ -101,11 +103,15 @@ private[mesos] class MesosSubmitRequestServlet(
101103
val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
102104

103105
// Construct driver description
104-
val conf = new SparkConf(false).setAll(sparkProperties)
106+
val defaultConf = this.conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
107+
val driverConf = new SparkConf(false)
108+
.setAll(defaultConf)
109+
.setAll(sparkProperties)
110+
105111
val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
106112
val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
107113
val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
108-
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
114+
val sparkJavaOpts = Utils.sparkJavaOpts(driverConf)
109115
val javaOpts = sparkJavaOpts ++ extraJavaOpts
110116
val command = new Command(
111117
mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
@@ -117,7 +123,7 @@ private[mesos] class MesosSubmitRequestServlet(
117123

118124
new MesosDriverDescription(
119125
name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver,
120-
command, request.sparkProperties, submissionId, submitDate)
126+
command, driverConf.getAll.toMap, submissionId, submitDate)
121127
}
122128

123129
protected override def handleSubmit(

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -557,12 +557,13 @@ private[spark] class MesosClusterScheduler(
557557
"spark.submit.deployMode", // this would be set to `cluster`, but we need client
558558
"spark.master" // this contains the address of the dispatcher, not master
559559
)
560-
val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
561-
val driverConf = desc.conf.getAll
560+
561+
desc.conf.getAll
562562
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
563563
.toMap
564-
(defaultConf ++ driverConf).foreach { case (key, value) =>
565-
options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
564+
.foreach { case (key, value) =>
565+
options ++= Seq("--conf", s"$key=${shellEscape(value)}")
566+
}
566567

567568
options
568569
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest.mesos
19+
20+
import org.mockito.Mockito.mock
21+
22+
import org.apache.spark.{SparkConf, SparkFunSuite}
23+
import org.apache.spark.deploy.TestPrematureExit
24+
import org.apache.spark.deploy.rest.CreateSubmissionRequest
25+
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
26+
27+
class MesosSubmitRequestServletSuite extends SparkFunSuite
28+
with TestPrematureExit {
29+
30+
test("test buildDriverDescription applies default settings from dispatcher conf to Driver") {
31+
val conf = new SparkConf(loadDefaults = false)
32+
33+
conf.set("spark.mesos.dispatcher.driverDefault.spark.mesos.network.name", "test_network")
34+
conf.set("spark.mesos.dispatcher.driverDefault.spark.mesos.network.labels", "k0:v0,k1:v1")
35+
36+
val submitRequestServlet = new MesosSubmitRequestServlet(
37+
scheduler = mock(classOf[MesosClusterScheduler]),
38+
conf
39+
)
40+
41+
val request = new CreateSubmissionRequest
42+
request.appResource = "hdfs://test.jar"
43+
request.mainClass = "foo.Bar"
44+
request.appArgs = Array.empty[String]
45+
request.sparkProperties = Map.empty[String, String]
46+
request.environmentVariables = Map.empty[String, String]
47+
48+
val driverConf = submitRequestServlet.buildDriverDescription(request).conf
49+
50+
assert("test_network" == driverConf.get("spark.mesos.network.name"))
51+
assert("k0:v0,k1:v1" == driverConf.get("spark.mesos.network.labels"))
52+
}
53+
}

0 commit comments

Comments
 (0)