diff --git a/conf/kubernetes-shuffle-service.yaml b/conf/kubernetes-shuffle-service.yaml new file mode 100644 index 0000000000000..eb34931b64242 --- /dev/null +++ b/conf/kubernetes-shuffle-service.yaml @@ -0,0 +1,36 @@ +apiVersion: extensions/v1beta1 +kind: DaemonSet +metadata: + labels: + app: spark-shuffle-service + spark-version: 2.1.0 + name: shuffle +spec: + template: + metadata: + labels: + app: spark-shuffle-service + spark-version: 2.1.0 + spec: + volumes: + - name: temp-volume + hostPath: + path: '/var/tmp' # change this path according to your cluster configuration. + containers: + - name: shuffle + # This is an official image that is built + # from the dockerfiles/shuffle directory + # in the spark distribution. + image: kubespark/spark-shuffle:v2.1.0-kubernetes-0.1.0-alpha.3 + volumeMounts: + - mountPath: '/tmp' + name: temp-volume + # more volumes can be mounted here. + # The spark job must be configured to use these + # mounts using the configuration: + # spark.kubernetes.shuffle.dir=,,... + resources: + requests: + cpu: "1" + limits: + cpu: "1" \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 92a27902c6696..f0e13aa6bf109 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -200,7 +200,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { new SecurityManager(executorConf), clientMode = true) val driver = fetcher.setupEndpointRefByURI(driverUrl) - val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig) + val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig(executorId)) val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 0a4f19d76073e..2406999f9ee92 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable private[spark] object CoarseGrainedClusterMessages { - case object RetrieveSparkAppConfig extends CoarseGrainedClusterMessage + case class RetrieveSparkAppConfig(executorId: String) extends CoarseGrainedClusterMessage case class SparkAppConfig( sparkProperties: Seq[(String, String)], diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 3452487e72e88..89e59353de845 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -206,7 +206,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeExecutor(executorId, reason) context.reply(true) - case RetrieveSparkAppConfig => + case RetrieveSparkAppConfig(executorId) => val reply = SparkAppConfig(sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey()) context.reply(reply) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 04521c9159eac..18f7d135acdd2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -184,8 +184,14 @@ private[spark] class BlockManager( blockManagerId = if (idFromMaster != null) idFromMaster else id shuffleServerId = if (externalShuffleServiceEnabled) { - logInfo(s"external shuffle service port = $externalShuffleServicePort") - BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) + val shuffleServerHostName = if (blockManagerId.isDriver) { + blockTransferService.hostName + } else { + conf.get("spark.shuffle.service.host", blockTransferService.hostName) + } + logInfo(s"external shuffle service host = $shuffleServerHostName, " + + s"port = $externalShuffleServicePort") + BlockManagerId(executorId, shuffleServerHostName, externalShuffleServicePort) } else { blockManagerId } diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 02933c28bbc66..e9fbeb1221bf7 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -189,7 +189,7 @@ from the other deployment modes. See the [configuration page](configuration.html The namespace that will be used for running the driver and executor pods. When using spark-submit in cluster mode, this can also be passed to spark-submit via the - --kubernetes-namespace command line argument. The namespace must already exist. + --kubernetes-namespace command line argument. @@ -208,6 +208,30 @@ from the other deployment modes. See the [configuration page](configuration.html Docker tag format. + + spark.kubernetes.shuffle.namespace + default + + Namespace in which the shuffle service pods are present. The shuffle service must be + created in the cluster prior to attempts to use it. + + + + spark.kubernetes.shuffle.labels + (none) + + Labels that will be used to look up shuffle service pods. This should be a comma-separated list of label key-value pairs, + where each label is in the format key=value. The labels chosen must be such that + they match exactly one shuffle service pod on each node that executors are launched. + + + + spark.kubernetes.dynamic.allocation.size + 5 + + Number of executor pods to launch at once in each round of dynamic allocation. + + spark.kubernetes.authenticate.submission.caCertFile (none) @@ -382,10 +406,41 @@ from the other deployment modes. See the [configuration page](configuration.html +## Dynamic Executor Scaling + +Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running +an external shuffle service. This is typically a [daemonset](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/) +with a provisioned [hostpath](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) volume. +This shuffle service may be shared by executors belonging to different SparkJobs. Using Spark with dynamic allocation +on Kubernetes assumes that a cluster administrator has set up one or more shuffle-service daemonsets in the cluster. + +A sample configuration file is provided in `conf/kubernetes-shuffle-service.yaml` which can be customized as needed +for a particular cluster. It is important to note that `spec.template.metadata.labels` are setup appropriately for the shuffle +service because there may be multiple shuffle service instances running in a cluster. The labels give us a way to target a particular +shuffle service. + +For example, if the shuffle service we want to use is in the default namespace, and +has pods with labels `app=spark-shuffle-service` and `spark-version=2.1.0`, we can +use those tags to target that particular shuffle service at job launch time. In order to run a job with dynamic allocation enabled, +the command may then look like the following: + + bin/spark-submit \ + --deploy-mode cluster \ + --class org.apache.spark.examples.GroupByTest \ + --master k8s://: \ + --kubernetes-namespace default \ + --conf spark.app.name=group-by-test \ + --conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \ + --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \ + --conf spark.dynamicAllocation.enabled=true \ + --conf spark.shuffle.service.enabled=true \ + --conf spark.kubernetes.shuffle.namespace=default \ + --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.1.0" \ + examples/jars/spark_examples_2.11-2.2.0.jar 10 400000 2 + ## Current Limitations Running Spark on Kubernetes is currently an experimental feature. Some restrictions on the current implementation that should be lifted in the future include: -* Applications can only use a fixed number of executors. Dynamic allocation is not supported. * Applications can only run in cluster mode. * Only Scala and Java applications can be run. diff --git a/resource-managers/kubernetes/README.md b/resource-managers/kubernetes/README.md index 734c29947b6d9..fd1ad29eb795d 100644 --- a/resource-managers/kubernetes/README.md +++ b/resource-managers/kubernetes/README.md @@ -14,10 +14,10 @@ important matters to keep in mind when developing this feature. # Building Spark with Kubernetes Support -To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. +To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile +the Kubernetes core implementation module along with its dependencies: - git checkout branch-2.1-kubernetes - build/mvn package -Pkubernetes -DskipTests + build/mvn compile -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests To build a distribution of Spark with Kubernetes support, use the `dev/make-distribution.sh` script, and add the `kubernetes` profile as part of the build arguments. Any other build arguments can be specified as one would expect when diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Util.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Util.scala new file mode 100644 index 0000000000000..495c988356467 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Util.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes + +import org.apache.spark.SparkException + +object Util { + def parseKeyValuePairs( + maybeKeyValues: Option[String], + configKey: String, + keyValueType: String): Map[String, String] = { + + maybeKeyValues.map(keyValues => { + keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => { + keyValue.split("=", 2).toSeq match { + case Seq(k, v) => + (k, v) + case _ => + throw new SparkException(s"Custom $keyValueType set by $configKey must be a" + + s" comma-separated list of key-value pairs, with format =." + + s" Got value: $keyValue. All values: $keyValues") + } + }).toMap + }).getOrElse(Map.empty[String, String]) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 1c8b6798bbdd5..cab793cbc1998 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -157,6 +157,13 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val SPARK_SHUFFLE_SERVICE_HOST = + ConfigBuilder("spark.shuffle.service.host") + .doc("Host for Spark Shuffle Service") + .internal() + .stringConf + .createOptional + // Note that while we set a default for this when we start up the // scheduler, the specific default value is dynamically determined // based on the executor memory. @@ -271,6 +278,38 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_SHUFFLE_NAMESPACE = + ConfigBuilder("spark.kubernetes.shuffle.namespace") + .doc("Namespace of the shuffle service") + .stringConf + .createWithDefault("default") + + private[spark] val KUBERNETES_SHUFFLE_SVC_IP = + ConfigBuilder("spark.kubernetes.shuffle.ip") + .doc("This setting is for debugging only. Setting this " + + "allows overriding the IP that the executor thinks its colocated " + + "shuffle service is on") + .stringConf + .createOptional + + private[spark] val KUBERNETES_SHUFFLE_LABELS = + ConfigBuilder("spark.kubernetes.shuffle.labels") + .doc("Labels to identify the shuffle service") + .stringConf + .createOptional + + private[spark] val KUBERNETES_SHUFFLE_DIR = + ConfigBuilder("spark.kubernetes.shuffle.dir") + .doc("Path to the shared shuffle directories.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DYNAMIC_ALLOCATION_SIZE = + ConfigBuilder("spark.kubernetes.dynamic.allocation.size") + .doc("Number of pods to launch at once in each round of dynamic allocation.") + .intConf + .createWithDefault(5) + private[spark] val DRIVER_SERVICE_MANAGER_TYPE = ConfigBuilder("spark.kubernetes.driver.serviceManagerType") .doc("A tag indicating which class to use for creating the Kubernetes service and" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index f82cb88b4c622..27e47eb61933f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -113,4 +113,5 @@ package object constants { s"$INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH/$INIT_CONTAINER_PROPERTIES_FILE_NAME" private[spark] val DOWNLOAD_JARS_VOLUME_NAME = "download-jars" private[spark] val DOWNLOAD_FILES_VOLUME_NAME = "download-files" + private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index a2294a6766980..a4df3342d1d8f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,111 +16,140 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.io.Closeable -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} -import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, - EnvVarSourceBuilder, Pod, QuantityBuilder} -import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.dsl.Watchable +import io.fabric8.kubernetes.client.internal.readiness.Readiness +import org.apache.commons.io.FilenameUtils -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{SparkContext, SparkEnv, SparkException} +import org.apache.spark.deploy.kubernetes.Util import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.rpc.RpcEndpointAddress +import org.apache.spark.internal.Logging +import org.apache.spark.rpc._ import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( - scheduler: TaskSchedulerImpl, - val sc: SparkContext) + scheduler: TaskSchedulerImpl, + val sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { import KubernetesClusterSchedulerBackend._ - - private val RUNNING_EXECUTOR_PODS_LOCK = new Object - private val runningExecutorPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs. - - private val EXECUTOR_PODS_BY_IPS_LOCK = new Object - private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs. - + override val minRegisteredRatio = + if (conf + .getOption("spark.scheduler.minRegisteredResourcesRatio") + .isEmpty) { + 0.8 + } else { + super.minRegisteredRatio + } + private val EXECUTOR_MODIFICATION_LOCK = new Object + private val runningExecutorPods = + new scala.collection.mutable.HashMap[String, Pod] private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) - private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val executorPort = + conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) private val blockmanagerPort = conf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) - private val kubernetesDriverPodName = conf .get(KUBERNETES_DRIVER_POD_NAME) - .getOrElse( - throw new SparkException("Must specify the driver pod name")) - - private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + .getOrElse(throw new SparkException("Must specify the driver pod name")) + private val shuffleServiceConfig = readShuffleServiceConfig() + private val executorMemoryMb = + conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) private val executorMemoryString = conf.get( org.apache.spark.internal.config.EXECUTOR_MEMORY.key, org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) - private val memoryOverheadMb = conf .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt, - MEMORY_OVERHEAD_MIN)) + .getOrElse( + math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt, + MEMORY_OVERHEAD_MIN)) private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb - - private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1") - - private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) - - private val kubernetesClient = new DriverPodKubernetesClientProvider(conf, kubernetesNamespace) - .get - + private val executorCores = + conf.getOption("spark.executor.cores").getOrElse("1") + private val kubernetesClient = + new DriverPodKubernetesClientProvider(conf, kubernetesNamespace).get private val driverPod = try { - kubernetesClient.pods().inNamespace(kubernetesNamespace). - withName(kubernetesDriverPodName).get() + kubernetesClient + .pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() } catch { case throwable: Throwable => logError(s"Executor cannot find driver pod.", throwable) throw new SparkException(s"Executor cannot find driver pod", throwable) } - override val minRegisteredRatio = - if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { - 0.8 - } else { - super.minRegisteredRatio + private implicit val requestExecutorContext = + ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) + private val nodeCache = shuffleServiceConfig + .map { config => + new NodeCache(kubernetesClient, + config.shuffleNamespace, + config.shuffleLabels) } - - private val executorWatchResource = new AtomicReference[Closeable] - protected var totalExpectedExecutors = new AtomicInteger(0) - + .getOrElse(null) private val driverUrl = RpcEndpointAddress( sc.getConf.get("spark.driver.host"), sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString - private val initialExecutors = getInitialTargetExecutorNumber(1) - - private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { + private val allocator = new KubernetesAllocator(kubernetesClient, 1) + protected var totalExpectedExecutors = new AtomicInteger(0) + /** + * The total number of executors we aim to have. + * Undefined when not using dynamic allocation. + * Initially set to 0 when using dynamic allocation + */ + private var executorLimitOption: Option[Int] = { if (Utils.isDynamicAllocationEnabled(conf)) { - val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) - val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) - val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", 1) - require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, - s"initial executor number $initialNumExecutors must between min executor number " + - s"$minNumExecutors and max executor number $maxNumExecutors") - - initialNumExecutors + Some(0) } else { - conf.getInt("spark.executor.instances", defaultNumExecutors) + None } } - override def applicationId(): String = conf.get("spark.app.id", super.applicationId()) + def readShuffleServiceConfig(): Option[ShuffleServiceConfig] = { + Utils.isDynamicAllocationEnabled(sc.conf) match { + case true => + val shuffleNamespace = conf.get(KUBERNETES_SHUFFLE_NAMESPACE) + val parsedShuffleLabels = Util.parseKeyValuePairs( + conf + .get(KUBERNETES_SHUFFLE_LABELS), + KUBERNETES_SHUFFLE_LABELS.key, + "shuffle-labels") + + val shuffleDirs = conf.getOption(KUBERNETES_SHUFFLE_DIR.key) match { + case Some(s) => + s.split(",") + + case _ => + Utils.getConfiguredLocalDirs(conf) + } + Some( + ShuffleServiceConfig(shuffleNamespace, + parsedShuffleLabels, + shuffleDirs)) + + case _ => + None + } + } override def sufficientResourcesRegistered(): Boolean = { totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio @@ -128,41 +157,26 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() - executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) - .watch(new ExecutorPodsWatcher())) if (!Utils.isDynamicAllocationEnabled(sc.conf)) { - doRequestTotalExecutors(initialExecutors) + allocateInitialExecutors(initialExecutors) + } else { + allocator.start() + nodeCache.start() } } - override def stop(): Unit = { - // send stop message to executors so they shut down cleanly - super.stop() - - // then delete the executor pods - // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. - // When using Utils.tryLogNonFatalError some of the code fails but without any logs or - // indication as to why. - try { - RUNNING_EXECUTOR_PODS_LOCK.synchronized { - runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) - runningExecutorPods.clear() - } - EXECUTOR_PODS_BY_IPS_LOCK.synchronized { - executorPodsByIPs.clear() - } - val resource = executorWatchResource.getAndSet(null) - if (resource != null) { - resource.close() + private def allocateInitialExecutors(requestedTotal: Integer): Unit = { + EXECUTOR_MODIFICATION_LOCK.synchronized { + if (requestedTotal > totalExpectedExecutors.get) { + logInfo( + s"Requesting ${requestedTotal - totalExpectedExecutors.get}" + + s" additional executors, expecting total $requestedTotal and currently" + + s" expected ${totalExpectedExecutors.get}") + for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { + runningExecutorPods += allocateNewExecutorPod() + } } - } catch { - case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) - } - try { - logInfo("Closing kubernetes client") - kubernetesClient.close() - } catch { - case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) + totalExpectedExecutors.set(requestedTotal) } } @@ -192,21 +206,22 @@ private[spark] class KubernetesClusterSchedulerBackend( (ENV_EXECUTOR_CORES, executorCores), (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId()), - (ENV_EXECUTOR_ID, executorId)) - .map(env => new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build() - ) ++ Seq( + (ENV_EXECUTOR_ID, executorId) + ).map( + env => + new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build()) ++ Seq( new EnvVarBuilder() .withName(ENV_EXECUTOR_POD_IP) - .withValueFrom(new EnvVarSourceBuilder() - .withNewFieldRef("v1", "status.podIP") - .build()) + .withValueFrom( + new EnvVarSourceBuilder() + .withNewFieldRef("v1", "status.podIP") + .build()) .build() - ) - val requiredPorts = Seq( - (EXECUTOR_PORT_NAME, executorPort), + ) + val requiredPorts = Seq((EXECUTOR_PORT_NAME, executorPort), (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) .map(port => { new ContainerPortBuilder() @@ -214,37 +229,61 @@ private[spark] class KubernetesClusterSchedulerBackend( .withContainerPort(port._2) .build() }) - try { - (executorId, kubernetesClient.pods().createNew() - .withNewMetadata() - .withName(name) - .withLabels(selectors) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() - .withNewSpec() - .withHostname(hostname) - .addNewContainer() - .withName(s"executor") - .withImage(executorDockerImage) - .withImagePullPolicy("IfNotPresent") - .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .addToLimits("cpu", executorCpuQuantity) - .endResources() - .withEnv(requiredEnv.asJava) - .withPorts(requiredPorts.asJava) + + val basePodBuilder = new PodBuilder() + .withNewMetadata() + .withName(name) + .withLabels(selectors) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() + .withNewSpec() + .withHostname(hostname) + .addNewContainer() + .withName(s"executor") + .withImage(executorDockerImage) + .withImagePullPolicy("IfNotPresent") + .withNewResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .addToRequests("cpu", executorCpuQuantity) + .addToLimits("cpu", executorCpuQuantity) + .endResources() + .withEnv(requiredEnv.asJava) + .withPorts(requiredPorts.asJava) + .endContainer() + .endSpec() + + var resolvedPodBuilder = shuffleServiceConfig + .map { config => + config.shuffleDirs.foldLeft(basePodBuilder) { (builder, dir) => + builder + .editSpec() + .addNewVolume() + .withName(FilenameUtils.getBaseName(dir)) + .withNewHostPath() + .withPath(dir) + .endHostPath() + .endVolume() + .editFirstContainer() + .addNewVolumeMount() + .withName(FilenameUtils.getBaseName(dir)) + .withMountPath(dir) + .endVolumeMount() .endContainer() - .endSpec() - .done()) + .endSpec() + } + } + .getOrElse(basePodBuilder) + + try { + (executorId, kubernetesClient.pods().create(resolvedPodBuilder.build())) } catch { case throwable: Throwable => logError("Failed to allocate executor pod.", throwable) @@ -252,65 +291,208 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { - RUNNING_EXECUTOR_PODS_LOCK.synchronized { - if (requestedTotal > totalExpectedExecutors.get) { - logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" - + s" additional executors, expecting total $requestedTotal and currently" + - s" expected ${totalExpectedExecutors.get}") - for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { - runningExecutorPods += allocateNewExecutorPod() - } - } - totalExpectedExecutors.set(requestedTotal) + override def applicationId(): String = + conf.get("spark.app.id", super.applicationId()) + + override def stop(): Unit = { + if (Utils.isDynamicAllocationEnabled(conf)) { + // Stop trying to allocate more executors & updating caches + allocator.stop() + nodeCache.stop() + } + + // send stop message to executors so they shut down cleanly + super.stop() + + // then delete the executor pods + // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. + // When using Utils.tryLogNonFatalError some of the code fails but without any logs or + // indication as to why. + try { + runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) + } catch { + case e: Throwable => + logError("Uncaught exception while shutting down controllers.", e) + } + try { + kubernetesClient.close() + } catch { + case e: Throwable => + logError("Uncaught exception closing Kubernetes client.", e) } - true } - override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { - RUNNING_EXECUTOR_PODS_LOCK.synchronized { - for (executor <- executorIds) { - runningExecutorPods.remove(executor) match { - case Some(pod) => kubernetesClient.pods().delete(pod) - case None => logWarning(s"Unable to remove pod for unknown executor $executor") + override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = + Future.successful { + // We simply set the limit as an advisory maximum value. We do not know if we can + // actually allocate this number of executors. + logInfo("Capping the total amount of executors to " + requestedTotal) + executorLimitOption = Some(requestedTotal) + true + } + + override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = + Future[Boolean] { + EXECUTOR_MODIFICATION_LOCK.synchronized { + for (executor <- executorIds) { + runningExecutorPods.remove(executor) match { + case Some(pod) => kubernetesClient.pods().delete(pod) + case None => + logWarning( + s"Unable to remove pod for unknown executor $executor") + } } } + true } - true + + override def createDriverEndpoint( + properties: Seq[(String, String)]): DriverEndpoint = { + new KubernetesDriverEndpoint(rpcEnv, properties) } - def getExecutorPodByIP(podIP: String): Option[Pod] = { - EXECUTOR_PODS_BY_IPS_LOCK.synchronized { - executorPodsByIPs.get(podIP) + private def getInitialTargetExecutorNumber( + defaultNumExecutors: Int = 1): Int = { + if (Utils.isDynamicAllocationEnabled(conf)) { + val minNumExecutors = + conf.getInt("spark.dynamicAllocation.minExecutors", 0) + val initialNumExecutors = + Utils.getDynamicAllocationInitialExecutors(conf) + val maxNumExecutors = + conf.getInt("spark.dynamicAllocation.maxExecutors", 1) + require( + initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, + s"initial executor number $initialNumExecutors must between min executor number " + + s"$minNumExecutors and max executor number $maxNumExecutors" + ) + + initialNumExecutors + } else { + conf.getInt("spark.executor.instances", defaultNumExecutors) } } - private class ExecutorPodsWatcher extends Watcher[Pod] { - - override def eventReceived(action: Action, pod: Pod): Unit = { - if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" - && pod.getMetadata.getDeletionTimestamp == null) { - val podIP = pod.getStatus.getPodIP - val clusterNodeName = pod.getSpec.getNodeName - logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") - EXECUTOR_PODS_BY_IPS_LOCK.synchronized { - executorPodsByIPs += ((podIP, pod)) + case class ShuffleServiceConfig(shuffleNamespace: String, + shuffleLabels: Map[String, String], + shuffleDirs: Seq[String]) + + private class KubernetesDriverEndpoint( + rpcEnv: RpcEnv, + sparkProperties: Seq[(String, String)]) + extends DriverEndpoint(rpcEnv, sparkProperties) { + override def receiveAndReply( + context: RpcCallContext): PartialFunction[Any, Unit] = { + new PartialFunction[Any, Unit]() { + override def isDefinedAt(msg: Any): Boolean = { + msg match { + case RetrieveSparkAppConfig(executorId) => + Utils.isDynamicAllocationEnabled(sc.conf) + case _ => false + } } - } else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) || - action == Action.DELETED || action == Action.ERROR) { - val podName = pod.getMetadata.getName - val podIP = pod.getStatus.getPodIP - logDebug(s"Executor pod $podName at IP $podIP was at $action.") - if (podIP != null) { - EXECUTOR_PODS_BY_IPS_LOCK.synchronized { - executorPodsByIPs -= podIP + + override def apply(msg: Any): Unit = { + msg match { + case RetrieveSparkAppConfig(executorId) => + EXECUTOR_MODIFICATION_LOCK.synchronized { + var resolvedProperties = sparkProperties + val runningExecutorPod = kubernetesClient + .pods() + .withName( + runningExecutorPods(executorId).getMetadata.getName) + .get() + val nodeName = runningExecutorPod.getSpec.getNodeName + val shufflePodIp = nodeCache.getShufflePodForExecutor(nodeName) + resolvedProperties = resolvedProperties ++ Seq( + (SPARK_SHUFFLE_SERVICE_HOST.key, shufflePodIp)) + + val reply = SparkAppConfig( + resolvedProperties, + SparkEnv.get.securityManager.getIOEncryptionKey()) + context.reply(reply) + } } } + }.orElse(super.receiveAndReply(context)) + } + } + + /* + * KubernetesAllocator class watches all the executor pods associated with + * this SparkJob and creates new executors when it is appropriate. + */ + private[spark] class KubernetesAllocator(client: KubernetesClient, + interval: Int) + extends Logging { + + private var scheduler: ScheduledExecutorService = _ + private var watcher: Watch = _ + private var readyExecutorPods = Set[String]() + + def start(): Unit = { + scheduler = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") + + watcher = client + .pods() + .withLabels(Map(SPARK_APP_ID_LABEL -> applicationId()).asJava) + .watch(new Watcher[Pod] { + override def eventReceived(action: Watcher.Action, p: Pod): Unit = { + action match { + case Action.DELETED => + readyExecutorPods -= p.getMetadata.getName + + case _ => + if (Readiness.isReady(p)) { + readyExecutorPods += p.getMetadata.getName + } + } + } + override def onClose(e: KubernetesClientException): Unit = {} + }) + + if (interval > 0) { + scheduler.scheduleWithFixedDelay(allocatorRunnable, + 0, + interval, + TimeUnit.SECONDS) } } - override def onClose(cause: KubernetesClientException): Unit = { - logDebug("Executor pod watch closed.", cause) + def stop(): Unit = { + watcher.close() + scheduler.shutdown() + } + + private def getAllocationSize(): Int = { + val allocationChunkSize = conf.get(KUBERNETES_DYNAMIC_ALLOCATION_SIZE) + math.min(executorLimitOption.get - runningExecutorPods.size, allocationChunkSize) + } + + private val allocatorRunnable: Runnable = new Runnable { + override def run(): Unit = { + if (readyExecutorPods.size < totalExpectedExecutors.get * minRegisteredRatio) { + logDebug("Waiting for pending executors before scaling") + return + } + + if (runningExecutorPods.size >= executorLimitOption.get) { + logDebug( + "Maximum allowed executor limit reached. Not scaling up further.") + return + } + + EXECUTOR_MODIFICATION_LOCK.synchronized { + for (i <- 1 to math.min( + executorLimitOption.get - runningExecutorPods.size, + 5)) { + runningExecutorPods += allocateNewExecutorPod() + totalExpectedExecutors.set(runningExecutorPods.size) + logInfo( + s"Requesting a new executor, total executors is now ${totalExpectedExecutors}") + } + } + } } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeCache.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeCache.scala new file mode 100644 index 0000000000000..454e3ec7852a4 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeCache.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.kubernetes + +import java.util.concurrent.ExecutorService + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.readiness.Readiness + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils + +private[spark] class NodeCache( + val client: KubernetesClient, + val dsNamespace: String, + val dsLabels: Map[String, String]) extends Logging { + + // TODO: update the cache as needed using a watch. + // TODO: check for multiple shuffle services on the same pod. + private var shufflePodCache = scala.collection.mutable.Map[String, String]() + private var scheduler: ExecutorService = _ + private var watcher: Watch = _ + + def start(): Unit = { + scheduler = ThreadUtils.newDaemonSingleThreadExecutor("kubernetes-node-cache") + + // seed the initial cache. + val pods = client.pods().withLabels(dsLabels.asJava).list() + for (pod <- pods.getItems.asScala) { + if (Readiness.isReady(pod)) { + addShufflePodToCache(pod) + } else { + logWarning(s"Found unready shuffle pod ${pod.getMetadata.getName} " + + s"on node ${pod.getSpec.getNodeName}") + } + } + + watcher = client + .pods() + .withLabels(dsLabels.asJava) + .watch(new Watcher[Pod] { + override def eventReceived(action: Watcher.Action, p: Pod): Unit = { + action match { + case Action.DELETED => + shufflePodCache.remove(p.getSpec.getNodeName) + + case _ => + if (Readiness.isReady(p)) { + addShufflePodToCache(p) + } + } + } + override def onClose(e: KubernetesClientException): Unit = {} + }) + } + + private def addShufflePodToCache(pod: Pod): Unit = { + shufflePodCache.get(pod.getSpec.getNodeName) match { + case Some(registeredPodName) => + logError(s"Ambiguous specification of shuffle service pod. " + + s"Found multiple matching pods: ${pod.getMetadata.getName}, " + + s"${registeredPodName} on ${pod.getSpec.getNodeName}") + + throw new SparkException(s"Ambiguous specification of shuffle service pod. " + + s"Found multiple matching pods: ${pod.getMetadata.getName}, " + + s"${registeredPodName} on ${pod.getSpec.getNodeName}") + + case nil => + shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP + } + } + + def stop(): Unit = { + watcher.close() + scheduler.shutdown() + } + + def getShufflePodForExecutor(executorNode: String): String = { + shufflePodCache.get(executorNode) match { + case Some(pod) => + pod + + case _ => + throw new SparkException(s"Unable to find shuffle pod on node $executorNode") + } + } +} + diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile new file mode 100644 index 0000000000000..630d3408519ac --- /dev/null +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM openjdk:8-alpine + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-shuffle:latest -f dockerfiles/shuffle/Dockerfile . + +RUN apk upgrade --update +RUN apk add --update bash +RUN mkdir -p /opt/spark +RUN touch /opt/spark/RELEASE + +ADD jars /opt/spark/jars +ADD examples /opt/spark/examples +ADD bin /opt/spark/bin +ADD sbin /opt/spark/sbin +ADD conf /opt/spark/conf + +ENV SPARK_HOME /opt/spark + +WORKDIR /opt/spark + +CMD ["/bin/sh","-c","/opt/spark/bin/spark-class org.apache.spark.deploy.ExternalShuffleService 1"] \ No newline at end of file diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala new file mode 100644 index 0000000000000..ebae9aa6562c1 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.jobs + +import java.util.Random + +import org.apache.spark.sql.SparkSession + +object GroupByTest { + def main(args: Array[String]) { + val spark = SparkSession + .builder + .appName("GroupBy Test") + .getOrCreate() + + val numMappers = if (args.length > 0) args(0).toInt else 5 + val numKVPairs = if (args.length > 1) args(1).toInt else 200000 + val valSize = if (args.length > 2) args(2).toInt else 2 + val numReducers = if (args.length > 3) args(3).toInt else numMappers + + val pairs1 = spark.sparkContext.parallelize(0 until numMappers, numMappers).flatMap { p => + val ranGen = new Random + val arr1 = new Array[(Int, Array[Byte])](numKVPairs) + for (i <- 0 until numKVPairs) { + val byteArr = new Array[Byte](valSize) + ranGen.nextBytes(byteArr) + arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) + } + arr1 + }.cache() + // Enforce that everything has been calculated and in cache + pairs1.count() + + // scalastyle:off println + println(pairs1.groupByKey(numReducers).count()) + // scalastyle:on println + spark.stop() + } +} + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index bd5ff7a005d46..56fcf692b8ff7 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -67,4 +67,8 @@ private[spark] object KubernetesSuite { ".integrationtest.jobs.SparkPiWithInfiniteWait" val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.FileExistenceTest" + val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + + ".integrationtest.jobs.GroupByTest" + + case class ShuffleNotReadyException() extends Exception } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala index 4cbd074547915..e54104cea6737 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest +import java.io.FileInputStream +import java.nio.file.Paths import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -23,8 +25,10 @@ import scala.collection.JavaConverters._ import com.google.common.collect.ImmutableList import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.api.model.extensions.DaemonSet import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.readiness.Readiness import org.scalatest.{BeforeAndAfter, DoNotDiscover} import org.scalatest.concurrent.Eventually @@ -36,6 +40,7 @@ import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTes import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube import org.apache.spark.deploy.kubernetes.integrationtest.constants.{GCE_TEST_BACKEND, MINIKUBE_TEST_BACKEND} import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 + import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager} import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils @@ -81,6 +86,49 @@ private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend) }) } + private def createShuffleServiceDaemonSet(): Unit = { + val ds = kubernetesTestComponents.kubernetesClient.extensions().daemonSets() + .createNew() + .withNewMetadata() + .withName("shuffle") + .endMetadata() + .withNewSpec() + .withNewTemplate() + .withNewMetadata() + .withLabels(Map("app" -> "spark-shuffle-service").asJava) + .endMetadata() + .withNewSpec() + .addNewVolume() + .withName("shuffle-dir") + .withNewHostPath() + .withPath("/tmp") + .endHostPath() + .endVolume() + .addNewContainer() + .withName("shuffle") + .withImage("spark-shuffle:latest") + .withImagePullPolicy("IfNotPresent") + .addNewVolumeMount() + .withName("shuffle-dir") + .withMountPath("/tmp") + .endVolumeMount() + .endContainer() + .endSpec() + .endTemplate() + .endSpec() + .done() + + // wait for daemonset to become available. + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val pods = kubernetesTestComponents.kubernetesClient.pods() + .withLabel("app", "spark-shuffle-service").list().getItems() + + if (pods.size() == 0 || Readiness.isReady(pods.get(0))) { + throw KubernetesSuite.ShuffleNotReadyException() + } + } + } + private def getSparkMetricsService(sparkBaseAppName: String): SparkRestApiV1 = { val serviceName = kubernetesTestComponents.kubernetesClient.services() .withLabel("spark-app-name", sparkBaseAppName) @@ -116,165 +164,216 @@ private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend) } } - test("Run a simple example") { - new Client( - sparkConf = sparkConf, - mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, - mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, - appArgs = Array.empty[String]).run() - val sparkMetricsService = getSparkMetricsService("spark-pi") - expectationsForStaticAllocation(sparkMetricsService) - } + private def expectationsForDynamicAllocation(sparkMetricsService: SparkRestApiV1): Unit = { + val apps = Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val result = sparkMetricsService + .getApplications(ImmutableList.of(ApplicationStatus.RUNNING, ApplicationStatus.COMPLETED)) + assert(result.size == 1 + && !result.head.id.equalsIgnoreCase("appid") + && !result.head.id.equalsIgnoreCase("{appId}")) + result + } - test("Run with the examples jar on the docker image") { - sparkConf.setJars(Seq(KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH)) - new Client( - sparkConf = sparkConf, - mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, - mainAppResource = KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, - appArgs = Array.empty[String]).run() - val sparkMetricsService = getSparkMetricsService("spark-pi") - expectationsForStaticAllocation(sparkMetricsService) + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val result = sparkMetricsService.getStages( + apps.head.id, Seq(StageStatus.COMPLETE).asJava) + assert(result.size == 2) + result + } } - test("Run with custom labels and annotations") { - sparkConf.set(KUBERNETES_DRIVER_LABELS, "label1=label1value,label2=label2value") - sparkConf.set(KUBERNETES_DRIVER_ANNOTATIONS, "annotation1=annotation1value," + - "annotation2=annotation2value") - new Client( - sparkConf = sparkConf, - mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, - mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, - appArgs = Array.empty[String]).run() - val driverPodMetadata = kubernetesTestComponents.kubernetesClient - .pods - .withLabel("spark-app-name", "spark-pi") - .list() - .getItems - .get(0) - .getMetadata - val driverPodLabels = driverPodMetadata.getLabels - // We can't match all of the selectors directly since one of the selectors is based on the - // launch time. - assert(driverPodLabels.size === 5, "Unexpected number of pod labels.") - assert(driverPodLabels.get("spark-app-name") === "spark-pi", "Unexpected value for" + - " spark-app-name label.") - assert(driverPodLabels.get("spark-app-id").startsWith("spark-pi"), "Unexpected value for" + - " spark-app-id label (should be prefixed with the app name).") - assert(driverPodLabels.get("label1") === "label1value", "Unexpected value for label1") - assert(driverPodLabels.get("label2") === "label2value", "Unexpected value for label2") - val driverPodAnnotations = driverPodMetadata.getAnnotations - assert(driverPodAnnotations.size === 2, "Unexpected number of pod annotations.") - assert(driverPodAnnotations.get("annotation1") === "annotation1value", - "Unexpected value for annotation1") - assert(driverPodAnnotations.get("annotation2") === "annotation2value", - "Unexpected value for annotation2") - } + test("Run a simple example") { + new Client( + sparkConf = sparkConf, + mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, + mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() + val sparkMetricsService = getSparkMetricsService("spark-pi") + expectationsForStaticAllocation(sparkMetricsService) + } - test("Enable SSL on the driver submit server") { - assume(testBackend.name == MINIKUBE_TEST_BACKEND) - - val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair( - Minikube.getMinikubeIp, - "changeit", - "changeit", - "changeit") - sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE, s"file://${keyStoreFile.getAbsolutePath}") - sparkConf.set("spark.ssl.kubernetes.driversubmitserver.keyStorePassword", "changeit") - sparkConf.set("spark.ssl.kubernetes.driversubmitserver.keyPassword", "changeit") - sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE, - s"file://${trustStoreFile.getAbsolutePath}") - sparkConf.set("spark.ssl.kubernetes.driversubmitserver.trustStorePassword", "changeit") - sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) - new Client( - sparkConf = sparkConf, - mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, - mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, - appArgs = Array.empty[String]).run() - } + test("Run with the examples jar on the docker image") { + sparkConf.setJars(Seq(KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH)) + new Client( + sparkConf = sparkConf, + mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, + mainAppResource = KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() + val sparkMetricsService = getSparkMetricsService("spark-pi") + expectationsForStaticAllocation(sparkMetricsService) + } - test("Enable SSL on the driver submit server using PEM files") { - assume(testBackend.name == MINIKUBE_TEST_BACKEND) + test("Run with custom labels and annotations") { + sparkConf.set(KUBERNETES_DRIVER_LABELS, "label1=label1value,label2=label2value") + sparkConf.set(KUBERNETES_DRIVER_ANNOTATIONS, "annotation1=annotation1value," + + "annotation2=annotation2value") + new Client( + sparkConf = sparkConf, + mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, + mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() + val driverPodMetadata = kubernetesTestComponents.kubernetesClient + .pods + .withLabel("spark-app-name", "spark-pi") + .list() + .getItems + .get(0) + .getMetadata + val driverPodLabels = driverPodMetadata.getLabels + // We can't match all of the selectors directly since one of the selectors is based on the + // launch time. + assert(driverPodLabels.size === 5, "Unexpected number of pod labels.") + assert(driverPodLabels.get("spark-app-name") === "spark-pi", "Unexpected value for" + + " spark-app-name label.") + assert(driverPodLabels.get("spark-app-id").startsWith("spark-pi"), "Unexpected value for" + + " spark-app-id label (should be prefixed with the app name).") + assert(driverPodLabels.get("label1") === "label1value", "Unexpected value for label1") + assert(driverPodLabels.get("label2") === "label2value", "Unexpected value for label2") + val driverPodAnnotations = driverPodMetadata.getAnnotations + assert(driverPodAnnotations.size === 2, "Unexpected number of pod annotations.") + assert(driverPodAnnotations.get("annotation1") === "annotation1value", + "Unexpected value for annotation1") + assert(driverPodAnnotations.get("annotation2") === "annotation2value", + "Unexpected value for annotation2") + } - val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) - sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}") - sparkConf.set(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${certPem.getAbsolutePath}") - sparkConf.set(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM, s"file://${certPem.getAbsolutePath}") - sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) - new Client( - sparkConf = sparkConf, - mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, - mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, - appArgs = Array.empty[String]).run() - } + test("Enable SSL on the driver submit server") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair( + Minikube.getMinikubeIp, + "changeit", + "changeit", + "changeit") + sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE, s"file://${keyStoreFile.getAbsolutePath}") + sparkConf.set("spark.ssl.kubernetes.driversubmitserver.keyStorePassword", "changeit") + sparkConf.set("spark.ssl.kubernetes.driversubmitserver.keyPassword", "changeit") + sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE, + s"file://${trustStoreFile.getAbsolutePath}") + sparkConf.set("spark.ssl.kubernetes.driversubmitserver.trustStorePassword", "changeit") + sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) + new Client( + sparkConf = sparkConf, + mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, + mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() + } - test("Added files should exist on the driver.") { - assume(testBackend.name == MINIKUBE_TEST_BACKEND) - - sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath) - sparkConf.setAppName("spark-file-existence-test") - val podCompletedFuture = SettableFuture.create[Boolean] - val watch = new Watcher[Pod] { - override def eventReceived(action: Action, pod: Pod): Unit = { - val containerStatuses = pod.getStatus.getContainerStatuses.asScala - val allSuccessful = containerStatuses.nonEmpty && containerStatuses - .forall(status => { - status.getState.getTerminated != null && status.getState.getTerminated.getExitCode == 0 - }) - if (allSuccessful) { - podCompletedFuture.set(true) - } else { - val failedContainers = containerStatuses.filter(container => { - container.getState.getTerminated != null && - container.getState.getTerminated.getExitCode != 0 + test("Enable SSL on the driver submit server using PEM files") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) + sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}") + sparkConf.set(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${certPem.getAbsolutePath}") + sparkConf.set(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM, s"file://${certPem.getAbsolutePath}") + sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) + new Client( + sparkConf = sparkConf, + mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, + mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() + } + + test("Added files should exist on the driver.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath) + sparkConf.setAppName("spark-file-existence-test") + val podCompletedFuture = SettableFuture.create[Boolean] + val watch = new Watcher[Pod] { + override def eventReceived(action: Action, pod: Pod): Unit = { + val containerStatuses = pod.getStatus.getContainerStatuses.asScala + val allSuccessful = containerStatuses.nonEmpty && containerStatuses + .forall(status => { + status.getState.getTerminated != null && status.getState.getTerminated.getExitCode == 0 }) - if (failedContainers.nonEmpty) { - podCompletedFuture.setException(new SparkException( - "One or more containers in the driver failed with a nonzero exit code.")) + if (allSuccessful) { + podCompletedFuture.set(true) + } else { + val failedContainers = containerStatuses.filter(container => { + container.getState.getTerminated != null && + container.getState.getTerminated.getExitCode != 0 + }) + if (failedContainers.nonEmpty) { + podCompletedFuture.setException(new SparkException( + "One or more containers in the driver failed with a nonzero exit code.")) + } } } - } - override def onClose(e: KubernetesClientException): Unit = { - logWarning("Watch closed", e) + override def onClose(e: KubernetesClientException): Unit = { + logWarning("Watch closed", e) + } + } + Utils.tryWithResource(kubernetesTestComponents.kubernetesClient + .pods + .withLabel("spark-app-name", "spark-file-existence-test") + .watch(watch)) { _ => + new Client( + sparkConf = sparkConf, + mainClass = KubernetesSuite.FILE_EXISTENCE_MAIN_CLASS, + mainAppResource = KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array(KubernetesSuite.TEST_EXISTENCE_FILE.getName, + KubernetesSuite.TEST_EXISTENCE_FILE_CONTENTS)).run() + assert(podCompletedFuture.get(60, TimeUnit.SECONDS), "Failed to run driver pod") + val driverPod = kubernetesTestComponents.kubernetesClient + .pods + .withLabel("spark-app-name", "spark-file-existence-test") + .list() + .getItems + .get(0) + val podLog = kubernetesTestComponents.kubernetesClient + .pods + .withName(driverPod.getMetadata.getName) + .getLog + assert(podLog.contains(s"File found at" + + s" /opt/spark/${KubernetesSuite.TEST_EXISTENCE_FILE.getName} with correct contents."), + "Job did not find the file as expected.") } } - Utils.tryWithResource(kubernetesTestComponents.kubernetesClient - .pods - .withLabel("spark-app-name", "spark-file-existence-test") - .watch(watch)) { _ => - new Client( - sparkConf = sparkConf, - mainClass = KubernetesSuite.FILE_EXISTENCE_MAIN_CLASS, - mainAppResource = KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, - appArgs = Array(KubernetesSuite.TEST_EXISTENCE_FILE.getName, - KubernetesSuite.TEST_EXISTENCE_FILE_CONTENTS)).run() - assert(podCompletedFuture.get(60, TimeUnit.SECONDS), "Failed to run driver pod") - val driverPod = kubernetesTestComponents.kubernetesClient - .pods - .withLabel("spark-app-name", "spark-file-existence-test") - .list() - .getItems - .get(0) - val podLog = kubernetesTestComponents.kubernetesClient - .pods - .withName(driverPod.getMetadata.getName) - .getLog - assert(podLog.contains(s"File found at" + - s" /opt/spark/${KubernetesSuite.TEST_EXISTENCE_FILE.getName} with correct contents."), - "Job did not find the file as expected.") + + test("Use external URI provider") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + val externalUriProviderWatch = + new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient) + Utils.tryWithResource(kubernetesTestComponents.kubernetesClient.services() + .withLabel("spark-app-name", "spark-pi") + .watch(externalUriProviderWatch)) { _ => + sparkConf.set(DRIVER_SERVICE_MANAGER_TYPE, ExternalSuppliedUrisDriverServiceManager.TYPE) + new Client( + sparkConf = sparkConf, + mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, + mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() + val sparkMetricsService = getSparkMetricsService("spark-pi") + expectationsForStaticAllocation(sparkMetricsService) + assert(externalUriProviderWatch.annotationSet.get) + val driverService = kubernetesTestComponents.kubernetesClient + .services() + .withLabel("spark-app-name", "spark-pi") + .list() + .getItems + .asScala(0) + assert(driverService.getMetadata.getAnnotations.containsKey(ANNOTATION_PROVIDE_EXTERNAL_URI), + "External URI request annotation was not set on the driver service.") + // Unfortunately we can't check the correctness of the actual value of the URI, as it depends + // on the driver submission port set on the driver service but we remove that port from the + // service once the submission is complete. + assert(driverService.getMetadata.getAnnotations.containsKey(ANNOTATION_RESOLVED_EXTERNAL_URI), + "Resolved URI annotation not set on driver service.") + } } - } - test("Use external URI provider") { - assume(testBackend.name == MINIKUBE_TEST_BACKEND) + test("Mount the Kubernetes credentials onto the driver pod") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) - val externalUriProviderWatch = - new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient) - Utils.tryWithResource(kubernetesTestComponents.kubernetesClient.services() - .withLabel("spark-app-name", "spark-pi") - .watch(externalUriProviderWatch)) { _ => - sparkConf.set(DRIVER_SERVICE_MANAGER_TYPE, ExternalSuppliedUrisDriverServiceManager.TYPE) + sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, + kubernetesTestComponents.clientConfig.getCaCertFile) + sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE, + kubernetesTestComponents.clientConfig.getClientKeyFile) + sparkConf.set(KUBERNETES_DRIVER_CLIENT_CERT_FILE, + kubernetesTestComponents.clientConfig.getClientCertFile) new Client( sparkConf = sparkConf, mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, @@ -282,39 +381,24 @@ private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend) appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) - assert(externalUriProviderWatch.annotationSet.get) - val driverService = kubernetesTestComponents.kubernetesClient - .services() - .withLabel("spark-app-name", "spark-pi") - .list() - .getItems - .asScala(0) - assert(driverService.getMetadata.getAnnotations.containsKey(ANNOTATION_PROVIDE_EXTERNAL_URI), - "External URI request annotation was not set on the driver service.") - // Unfortunately we can't check the correctness of the actual value of the URI, as it depends - // on the driver submission port set on the driver service but we remove that port from the - // service once the submission is complete. - assert(driverService.getMetadata.getAnnotations.containsKey(ANNOTATION_RESOLVED_EXTERNAL_URI), - "Resolved URI annotation not set on driver service.") } - } + test("Dynamic executor scaling basic test") { + createShuffleServiceDaemonSet() - test("Mount the Kubernetes credentials onto the driver pod") { - assume(testBackend.name == MINIKUBE_TEST_BACKEND) + sparkConf.setJars(Seq(KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH)) + sparkConf.set("spark.dynamicAllocation.enabled", "true") + sparkConf.set("spark.shuffle.service.enabled", "true") + sparkConf.set("spark.kubernetes.shuffle.labels", "app=spark-shuffle-service") + sparkConf.set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace) + sparkConf.set("spark.app.name", "group-by-test") - sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, - kubernetesTestComponents.clientConfig.getCaCertFile) - sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE, - kubernetesTestComponents.clientConfig.getClientKeyFile) - sparkConf.set(KUBERNETES_DRIVER_CLIENT_CERT_FILE, - kubernetesTestComponents.clientConfig.getClientCertFile) new Client( sparkConf = sparkConf, - mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, + mainClass = KubernetesSuite.GROUP_BY_MAIN_CLASS, mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, appArgs = Array.empty[String]).run() - val sparkMetricsService = getSparkMetricsService("spark-pi") - expectationsForStaticAllocation(sparkMetricsService) + val sparkMetricsService = getSparkMetricsService("group-by-test") + expectationsForDynamicAllocation(sparkMetricsService) } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index d807c4d81009b..52b8c7d7359a6 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -31,6 +31,7 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, private val DRIVER_V1_DOCKER_FILE = "dockerfiles/driver/Dockerfile" private val DRIVER_V2_DOCKER_FILE = "dockerfiles/driver-v2/Dockerfile" private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" + private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile" private val DRIVER_INIT_DOCKER_FILE = "dockerfiles/driver-init/Dockerfile" private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile" private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) @@ -60,6 +61,7 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } buildImage("spark-driver", DRIVER_V1_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) + buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE) buildImage("spark-driver-v2", DRIVER_V2_DOCKER_FILE) buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE) buildImage("spark-driver-init", DRIVER_INIT_DOCKER_FILE)