Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit cec7c65

Browse files
akirillovAnton Kirillov
authored andcommitted
CNI Support for Docker containerizer, binding to SPARK_LOCAL_IP instead of 0.0.0.0 to properly advertise executors during shuffle (#44)
1 parent 7bd59c2 commit cec7c65

File tree

5 files changed

+93
-30
lines changed

5 files changed

+93
-30
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
286286
}
287287
}
288288

289-
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
289+
if (hostname == null) {
290+
hostname = Utils.localHostName()
291+
log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself")
292+
}
293+
294+
if (driverUrl == null || executorId == null || cores <= 0 ||
290295
appId == null) {
291296
printUsageAndExit()
292297
}

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -281,27 +281,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
281281
.getOrElse {
282282
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
283283
}
284-
val runScript = new File(executorSparkHome, "./bin/spark-class").getPath
285-
command.setValue(
286-
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
287-
.format(prefixEnv, runScript) +
288-
s" --driver-url $driverURL" +
289-
s" --executor-id $taskId" +
290-
s" --hostname ${executorHostname(offer)}" +
291-
s" --cores $numCores" +
292-
s" --app-id $appId")
284+
val executable = new File(executorSparkHome, "./bin/spark-class").getPath
285+
val runScript = s"$prefixEnv $executable " +
286+
s"org.apache.spark.executor.CoarseGrainedExecutorBackend"
287+
288+
command.setValue(buildExecutorCommand(runScript, taskId, numCores, offer))
293289
} else {
294290
// Grab everything to the first '.'. We'll use that and '*' to
295291
// glob the directory "correctly".
296292
val basename = uri.get.split('/').last.split('.').head
297-
command.setValue(
298-
s"cd $basename*; $prefixEnv " +
299-
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
300-
s" --driver-url $driverURL" +
301-
s" --executor-id $taskId" +
302-
s" --hostname ${executorHostname(offer)}" +
303-
s" --cores $numCores" +
304-
s" --app-id $appId")
293+
val runScript = s"cd $basename*; $prefixEnv " +
294+
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend"
295+
296+
command.setValue(buildExecutorCommand(runScript, taskId, numCores, offer))
305297
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
306298
}
307299

@@ -310,6 +302,28 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
310302
command.build()
311303
}
312304

305+
private def buildExecutorCommand(
306+
runScript: String, taskId: String, numCores: Int, offer: Offer): String = {
307+
308+
val sb = new StringBuilder()
309+
.append(runScript)
310+
.append(" --driver-url ")
311+
.append(driverURL)
312+
.append(" --executor-id ")
313+
.append(taskId)
314+
.append(" --cores ")
315+
.append(numCores)
316+
.append(" --app-id ")
317+
.append(appId)
318+
319+
if (sc.conf.get(NETWORK_NAME).isEmpty) {
320+
sb.append(" --hostname ")
321+
sb.append(offer.getHostname)
322+
}
323+
324+
sb.toString()
325+
}
326+
313327
protected def driverURL: String = {
314328
if (conf.contains("spark.testing")) {
315329
"driverURL"
@@ -811,15 +825,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
811825
slaves.values.map(_.taskIDs.size).sum
812826
}
813827

814-
private def executorHostname(offer: Offer): String = {
815-
if (sc.conf.get(NETWORK_NAME).isDefined) {
816-
// The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0
817-
"0.0.0.0"
818-
} else {
819-
offer.getHostname
820-
}
821-
}
822-
823828
override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = {
824829
if (UserGroupInformation.isSecurityEnabled) {
825830
Some(hadoopDelegationTokenManager.getTokens())

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,9 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
153153
.getOrElse(List.empty)
154154

155155
if (containerType == ContainerInfo.Type.DOCKER) {
156-
containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps, params))
156+
containerInfo.setDocker(
157+
dockerInfo(image, forcePullImage, portMaps, params, conf.get(NETWORK_NAME))
158+
)
157159
} else {
158160
containerInfo.setMesos(mesosInfo(image, forcePullImage))
159161
}
@@ -263,13 +265,24 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
263265
image: String,
264266
forcePullImage: Boolean,
265267
portMaps: List[ContainerInfo.DockerInfo.PortMapping],
266-
params: List[Parameter]): DockerInfo = {
268+
params: List[Parameter],
269+
networkName: Option[String]): DockerInfo = {
267270
val dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
268271
.setImage(image)
269272
.setForcePullImage(forcePullImage)
270273
portMaps.foreach(dockerBuilder.addPortMappings(_))
271274
params.foreach(dockerBuilder.addParameters(_))
272275

276+
networkName.foreach { net =>
277+
val network = Parameter.newBuilder()
278+
.setKey("net")
279+
.setValue(net)
280+
.build()
281+
282+
dockerBuilder.setNetwork(DockerInfo.Network.USER)
283+
dockerBuilder.addParameters(network)
284+
}
285+
273286
dockerBuilder.build
274287
}
275288

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,30 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
657657
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
658658
}
659659

660+
test("scheduler backend skips '--hostname' for executor when virtual network is enabled") {
661+
setBackend()
662+
val (mem, cpu) = (backend.executorMemory(sc), 4)
663+
val offer = createOffer("o1", "s1", mem, cpu)
664+
665+
assert(backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
666+
sc.stop()
667+
668+
setBackend(Map("spark.executor.uri" -> "hdfs://test/executor.jar"))
669+
assert(backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
670+
sc.stop()
671+
672+
setBackend(Map("spark.mesos.network.name" -> "test"))
673+
assert(!backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
674+
sc.stop()
675+
676+
setBackend(Map(
677+
"spark.mesos.network.name" -> "test",
678+
"spark.executor.uri" -> "hdfs://test/executor.jar"
679+
))
680+
assert(!backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
681+
sc.stop()
682+
}
683+
660684
test("supports spark.scheduler.minRegisteredResourcesRatio") {
661685
val expectedCores = 1
662686
setBackend(Map(

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.spark.scheduler.cluster.mesos
1919

20+
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
21+
2022
import org.apache.spark.{SparkConf, SparkFunSuite}
21-
import org.apache.spark.deploy.mesos.config
2223

2324
class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
2425

@@ -50,4 +51,19 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
5051
assert(params.get(2).getKey == "c")
5152
assert(params.get(2).getValue == "3")
5253
}
54+
55+
test("ContainerInfo respects Docker network configuration") {
56+
val networkName = "test"
57+
val conf = new SparkConf()
58+
conf.set("spark.mesos.network.name", networkName)
59+
conf.set("spark.mesos.executor.docker.image", "test")
60+
61+
val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf)
62+
63+
assert(containerInfo.getDocker.getNetwork == DockerInfo.Network.USER)
64+
val params = containerInfo.getDocker.getParametersList
65+
assert(params.size() == 1)
66+
assert(params.get(0).getKey == "net")
67+
assert(params.get(0).getValue == networkName)
68+
}
5369
}

0 commit comments

Comments
 (0)