Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions conf/kubernetes-shuffle-service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
labels:
app: spark-shuffle-service
spark-version: 2.1.0
name: shuffle
spec:
template:
metadata:
labels:
app: spark-shuffle-service
spark-version: 2.1.0
spec:
volumes:
- name: temp-volume
hostPath:
path: '/var/tmp' # change this path according to your cluster configuration.
containers:
- name: shuffle
# This is an official image that is built
# from the dockerfiles/shuffle directory
# in the spark distribution.
image: kubespark/spark-shuffle:v2.1.0-kubernetes-0.1.0-alpha.3
volumeMounts:
- mountPath: '/tmp'
name: temp-volume
# more volumes can be mounted here.
# The spark job must be configured to use these
# mounts using the configuration:
# spark.kubernetes.shuffle.dir=<mount-1>,<mount-2>,...
resources:
requests:
cpu: "1"
limits:
cpu: "1"
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
new SecurityManager(executorConf),
clientMode = true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)
val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig(executorId))
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable

private[spark] object CoarseGrainedClusterMessages {

case object RetrieveSparkAppConfig extends CoarseGrainedClusterMessage
case class RetrieveSparkAppConfig(executorId: String) extends CoarseGrainedClusterMessage

case class SparkAppConfig(
sparkProperties: Seq[(String, String)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
removeExecutor(executorId, reason)
context.reply(true)

case RetrieveSparkAppConfig =>
case RetrieveSparkAppConfig(executorId) =>
val reply = SparkAppConfig(sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey())
context.reply(reply)
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,14 @@ private[spark] class BlockManager(
blockManagerId = if (idFromMaster != null) idFromMaster else id

shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
val shuffleServerHostName = if (blockManagerId.isDriver) {
blockTransferService.hostName
} else {
conf.get("spark.shuffle.service.host", blockTransferService.hostName)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new configuration you are introducing? We will need documentation for this for sure to know what is this for.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also please use the ConfigBuilder from config.scala and its .internal() marker if this is not meant to be set by users and is only used internally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ash211 do you think we should add it into spark-core alongside spark.shuffle.service.port that already exists there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the configbuilder for references to spark.shuffle.service.host within the Kubernetes package; and left it as it is here, conforming to the surrounding code in spark-core.

}
logInfo(s"external shuffle service host = $shuffleServerHostName, " +
s"port = $externalShuffleServicePort")
BlockManagerId(executorId, shuffleServerHostName, externalShuffleServicePort)
} else {
blockManagerId
}
Expand Down
59 changes: 57 additions & 2 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ from the other deployment modes. See the [configuration page](configuration.html
<td>
The namespace that will be used for running the driver and executor pods. When using
<code>spark-submit</code> in cluster mode, this can also be passed to <code>spark-submit</code> via the
<code>--kubernetes-namespace</code> command line argument. The namespace must already exist.
<code>--kubernetes-namespace</code> command line argument.
</td>
</tr>
<tr>
Expand All @@ -208,6 +208,30 @@ from the other deployment modes. See the [configuration page](configuration.html
<a href="https://docs.docker.com/engine/reference/commandline/tag/">Docker tag</a> format.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.shuffle.namespace</code></td>
<td><code>default</code></td>
<td>
Namespace in which the shuffle service pods are present. The shuffle service must be
created in the cluster prior to attempts to use it.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.shuffle.labels</code></td>
<td><code>(none)</code></td>
<td>
Labels that will be used to look up shuffle service pods. This should be a comma-separated list of label key-value pairs,
where each label is in the format <code>key=value</code>. The labels chosen must be such that
they match exactly one shuffle service pod on each node that executors are launched.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.dynamic.allocation.size</code></td>
<td><code>5</code></td>
<td>
Number of executor pods to launch at once in each round of dynamic allocation.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.submission.caCertFile</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -382,10 +406,41 @@ from the other deployment modes. See the [configuration page](configuration.html
</tr>
</table>

## Dynamic Executor Scaling

Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running
an external shuffle service. This is typically a [daemonset](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/)
with a provisioned [hostpath](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) volume.
This shuffle service may be shared by executors belonging to different SparkJobs. Using Spark with dynamic allocation
on Kubernetes assumes that a cluster administrator has set up one or more shuffle-service daemonsets in the cluster.

A sample configuration file is provided in `conf/kubernetes-shuffle-service.yaml` which can be customized as needed
for a particular cluster. It is important to note that `spec.template.metadata.labels` are setup appropriately for the shuffle
service because there may be multiple shuffle service instances running in a cluster. The labels give us a way to target a particular
shuffle service.

For example, if the shuffle service we want to use is in the default namespace, and
has pods with labels `app=spark-shuffle-service` and `spark-version=2.1.0`, we can
use those tags to target that particular shuffle service at job launch time. In order to run a job with dynamic allocation enabled,
the command may then look like the following:

bin/spark-submit \
--deploy-mode cluster \
--class org.apache.spark.examples.GroupByTest \
--master k8s://<k8s-master>:<port> \
--kubernetes-namespace default \
--conf spark.app.name=group-by-test \
--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \
--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.kubernetes.shuffle.namespace=default \
--conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.1.0" \
examples/jars/spark_examples_2.11-2.2.0.jar 10 400000 2

## Current Limitations

Running Spark on Kubernetes is currently an experimental feature. Some restrictions on the current implementation that
should be lifted in the future include:
* Applications can only use a fixed number of executors. Dynamic allocation is not supported.
* Applications can only run in cluster mode.
* Only Scala and Java applications can be run.
6 changes: 3 additions & 3 deletions resource-managers/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ important matters to keep in mind when developing this feature.

# Building Spark with Kubernetes Support

To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven.
To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile
the Kubernetes core implementation module along with its dependencies:

git checkout branch-2.1-kubernetes
build/mvn package -Pkubernetes -DskipTests
build/mvn compile -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests

To build a distribution of Spark with Kubernetes support, use the `dev/make-distribution.sh` script, and add the
`kubernetes` profile as part of the build arguments. Any other build arguments can be specified as one would expect when
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes

import org.apache.spark.SparkException

object Util {
def parseKeyValuePairs(
maybeKeyValues: Option[String],
configKey: String,
keyValueType: String): Map[String, String] = {

maybeKeyValues.map(keyValues => {
keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => {
keyValue.split("=", 2).toSeq match {
case Seq(k, v) =>
(k, v)
case _ =>
throw new SparkException(s"Custom $keyValueType set by $configKey must be a" +
s" comma-separated list of key-value pairs, with format <key>=<value>." +
s" Got value: $keyValue. All values: $keyValues")
}
}).toMap
}).getOrElse(Map.empty[String, String])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val SPARK_SHUFFLE_SERVICE_HOST =
ConfigBuilder("spark.shuffle.service.host")
.doc("Host for Spark Shuffle Service")
.internal()
.stringConf
.createOptional

// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
Expand Down Expand Up @@ -271,6 +278,38 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_SHUFFLE_NAMESPACE =
ConfigBuilder("spark.kubernetes.shuffle.namespace")
.doc("Namespace of the shuffle service")
.stringConf
.createWithDefault("default")

private[spark] val KUBERNETES_SHUFFLE_SVC_IP =
ConfigBuilder("spark.kubernetes.shuffle.ip")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug only?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like setting it lets you override the IP that the executor thinks its colocated shuffle service is on -- should put that in the doc

.doc("This setting is for debugging only. Setting this " +
"allows overriding the IP that the executor thinks its colocated " +
"shuffle service is on")
.stringConf
.createOptional

private[spark] val KUBERNETES_SHUFFLE_LABELS =
ConfigBuilder("spark.kubernetes.shuffle.labels")
.doc("Labels to identify the shuffle service")
.stringConf
.createOptional

private[spark] val KUBERNETES_SHUFFLE_DIR =
ConfigBuilder("spark.kubernetes.shuffle.dir")
.doc("Path to the shared shuffle directories.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DYNAMIC_ALLOCATION_SIZE =
ConfigBuilder("spark.kubernetes.dynamic.allocation.size")
.doc("Number of pods to launch at once in each round of dynamic allocation.")
.intConf
.createWithDefault(5)

private[spark] val DRIVER_SERVICE_MANAGER_TYPE =
ConfigBuilder("spark.kubernetes.driver.serviceManagerType")
.doc("A tag indicating which class to use for creating the Kubernetes service and" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,5 @@ package object constants {
s"$INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
private[spark] val DOWNLOAD_JARS_VOLUME_NAME = "download-jars"
private[spark] val DOWNLOAD_FILES_VOLUME_NAME = "download-files"
private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle"
}
Loading