From 12a22e631fe2023f12dc1a89df70c3fec65c481f Mon Sep 17 00:00:00 2001 From: foxish Date: Sun, 14 May 2017 00:15:49 -0700 Subject: [PATCH 1/6] dynamic allocation: shuffle service docker, yaml and test fixture --- conf/kubernetes-shuffle-service.yaml | 53 ++++++++++++++++++ .../main/docker/shuffle-service/Dockerfile | 39 ++++++++++++++ .../integrationtest/jobs/GroupByTest.scala | 54 +++++++++++++++++++ .../integrationtest/KubernetesSuite.scala | 4 ++ 4 files changed, 150 insertions(+) create mode 100644 conf/kubernetes-shuffle-service.yaml create mode 100644 resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile create mode 100644 resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala diff --git a/conf/kubernetes-shuffle-service.yaml b/conf/kubernetes-shuffle-service.yaml new file mode 100644 index 0000000000000..3aeb1f54f301c --- /dev/null +++ b/conf/kubernetes-shuffle-service.yaml @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: extensions/v1beta1 +kind: DaemonSet +metadata: + labels: + app: spark-shuffle-service + spark-version: 2.1.0 + name: shuffle +spec: + template: + metadata: + labels: + app: spark-shuffle-service + spark-version: 2.1.0 + spec: + volumes: + - name: temp-volume + hostPath: + path: '/var/tmp' # change this path according to your cluster configuration. + containers: + - name: shuffle + # This is an official image that is built + # from the dockerfiles/shuffle directory + # in the spark distribution. + image: kubespark/spark-shuffle:v2.1.0-kubernetes-0.1.0-alpha.3 + volumeMounts: + - mountPath: '/tmp' + name: temp-volume + # more volumes can be mounted here. + # The spark job must be configured to use these + # mounts using the configuration: + # spark.kubernetes.shuffle.dir=,,... + resources: + requests: + cpu: "1" + limits: + cpu: "1" \ No newline at end of file diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile new file mode 100644 index 0000000000000..630d3408519ac --- /dev/null +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM openjdk:8-alpine + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-shuffle:latest -f dockerfiles/shuffle/Dockerfile . + +RUN apk upgrade --update +RUN apk add --update bash +RUN mkdir -p /opt/spark +RUN touch /opt/spark/RELEASE + +ADD jars /opt/spark/jars +ADD examples /opt/spark/examples +ADD bin /opt/spark/bin +ADD sbin /opt/spark/sbin +ADD conf /opt/spark/conf + +ENV SPARK_HOME /opt/spark + +WORKDIR /opt/spark + +CMD ["/bin/sh","-c","/opt/spark/bin/spark-class org.apache.spark.deploy.ExternalShuffleService 1"] \ No newline at end of file diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala new file mode 100644 index 0000000000000..ebae9aa6562c1 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.jobs + +import java.util.Random + +import org.apache.spark.sql.SparkSession + +object GroupByTest { + def main(args: Array[String]) { + val spark = SparkSession + .builder + .appName("GroupBy Test") + .getOrCreate() + + val numMappers = if (args.length > 0) args(0).toInt else 5 + val numKVPairs = if (args.length > 1) args(1).toInt else 200000 + val valSize = if (args.length > 2) args(2).toInt else 2 + val numReducers = if (args.length > 3) args(3).toInt else numMappers + + val pairs1 = spark.sparkContext.parallelize(0 until numMappers, numMappers).flatMap { p => + val ranGen = new Random + val arr1 = new Array[(Int, Array[Byte])](numKVPairs) + for (i <- 0 until numKVPairs) { + val byteArr = new Array[Byte](valSize) + ranGen.nextBytes(byteArr) + arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) + } + arr1 + }.cache() + // Enforce that everything has been calculated and in cache + pairs1.count() + + // scalastyle:off println + println(pairs1.groupByKey(numReducers).count()) + // scalastyle:on println + spark.stop() + } +} + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index bd5ff7a005d46..56fcf692b8ff7 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -67,4 +67,8 @@ private[spark] object KubernetesSuite { ".integrationtest.jobs.SparkPiWithInfiniteWait" val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.FileExistenceTest" + val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + + ".integrationtest.jobs.GroupByTest" + + case class ShuffleNotReadyException() extends Exception } From 1d57d46c2c5761968591133141a6866348404225 Mon Sep 17 00:00:00 2001 From: foxish Date: Sun, 14 May 2017 00:21:44 -0700 Subject: [PATCH 2/6] dynamic allocation: changes to spark-core --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../scala/org/apache/spark/storage/BlockManager.scala | 10 ++++++++-- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 92a27902c6696..f0e13aa6bf109 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -200,7 +200,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { new SecurityManager(executorConf), clientMode = true) val driver = fetcher.setupEndpointRefByURI(driverUrl) - val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig) + val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig(executorId)) val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 0a4f19d76073e..2406999f9ee92 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable private[spark] object CoarseGrainedClusterMessages { - case object RetrieveSparkAppConfig extends CoarseGrainedClusterMessage + case class RetrieveSparkAppConfig(executorId: String) extends CoarseGrainedClusterMessage case class SparkAppConfig( sparkProperties: Seq[(String, String)], diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 3452487e72e88..89e59353de845 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -206,7 +206,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeExecutor(executorId, reason) context.reply(true) - case RetrieveSparkAppConfig => + case RetrieveSparkAppConfig(executorId) => val reply = SparkAppConfig(sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey()) context.reply(reply) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 04521c9159eac..18f7d135acdd2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -184,8 +184,14 @@ private[spark] class BlockManager( blockManagerId = if (idFromMaster != null) idFromMaster else id shuffleServerId = if (externalShuffleServiceEnabled) { - logInfo(s"external shuffle service port = $externalShuffleServicePort") - BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) + val shuffleServerHostName = if (blockManagerId.isDriver) { + blockTransferService.hostName + } else { + conf.get("spark.shuffle.service.host", blockTransferService.hostName) + } + logInfo(s"external shuffle service host = $shuffleServerHostName, " + + s"port = $externalShuffleServicePort") + BlockManagerId(executorId, shuffleServerHostName, externalShuffleServicePort) } else { blockManagerId } From ddbc6c911795dbe39c0e14cadfd9254f2cffbabc Mon Sep 17 00:00:00 2001 From: foxish Date: Sun, 14 May 2017 00:26:48 -0700 Subject: [PATCH 3/6] dynamic allocation: tests --- .../integrationtest/jobs/GroupByTest.scala | 2 +- .../integrationtest/KubernetesV2Suite.scala | 99 ++++++++++++++++++- .../docker/SparkDockerImageBuilder.scala | 2 + 3 files changed, 98 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala index ebae9aa6562c1..fe47d42485b24 100644 --- a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala +++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/GroupByTest.scala @@ -46,7 +46,7 @@ object GroupByTest { pairs1.count() // scalastyle:off println - println(pairs1.groupByKey(numReducers).count()) + println("The Result is", pairs1.groupByKey(numReducers).count()) // scalastyle:on println spark.stop() } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala index 8fa7cbd52ee83..ae02de7937c6a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala @@ -18,6 +18,10 @@ package org.apache.spark.deploy.kubernetes.integrationtest import java.util.UUID +import scala.collection.JavaConverters._ + +import com.google.common.collect.ImmutableList +import io.fabric8.kubernetes.client.internal.readiness.Readiness import org.scalatest.{BeforeAndAfter, DoNotDiscover} import org.scalatest.concurrent.Eventually @@ -27,7 +31,10 @@ import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND +import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 +import org.apache.spark.deploy.kubernetes.submit.v1.Client import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl} +import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} @DoNotDiscover private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) @@ -60,7 +67,7 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) assume(testBackend.name == MINIKUBE_TEST_BACKEND) launchStagingServer(SSLOptions()) - runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) + runSparkPiAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) } test("Enable SSL on the submission server") { @@ -84,7 +91,7 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) keyStorePassword = Some("keyStore"), keyPassword = Some("key"), trustStorePassword = Some("trustStore"))) - runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) + runSparkPiAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) } test("Use container-local resources without the resource staging server") { @@ -93,7 +100,22 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) sparkConf.setJars(Seq( KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH)) - runSparkAppAndVerifyCompletion(KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE) + runSparkPiAndVerifyCompletion(KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE) + } + + test("Dynamic executor scaling basic test") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + launchStagingServer(SSLOptions()) + createShuffleServiceDaemonSet() + + sparkConf.setJars(Seq(KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH)) + sparkConf.set("spark.dynamicAllocation.enabled", "true") + sparkConf.set("spark.shuffle.service.enabled", "true") + sparkConf.set("spark.kubernetes.shuffle.labels", "app=spark-shuffle-service") + sparkConf.set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace) + sparkConf.set("spark.app.name", "group-by-test") + runSparkGroupByTestAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) } private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = { @@ -111,7 +133,7 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) s"${Minikube.getMinikubeIp}:$resourceStagingServerPort") } - private def runSparkAppAndVerifyCompletion(appResource: String): Unit = { + private def runSparkPiAndVerifyCompletion(appResource: String): Unit = { val client = new org.apache.spark.deploy.kubernetes.submit.v2.Client( sparkConf = sparkConf, mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS, @@ -136,4 +158,73 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) .contains("Pi is roughly 3"), "The application did not compute the value of pi.") } } + + private def runSparkGroupByTestAndVerifyCompletion(appResource: String): Unit = { + val client = new org.apache.spark.deploy.kubernetes.submit.v2.Client( + sparkConf = sparkConf, + mainClass = KubernetesSuite.GROUP_BY_MAIN_CLASS, + appArgs = Array.empty[String], + mainAppResource = appResource, + kubernetesClientProvider = + new SubmissionKubernetesClientProviderImpl(sparkConf), + mountedDependencyManagerProvider = + new MountedDependencyManagerProviderImpl(sparkConf)) + client.run() + val driverPod = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", APP_LOCATOR_LABEL) + .list() + .getItems + .get(0) + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPod.getMetadata.getName) + .getLog + .contains("The Result is"), "The application did not complete.") + } + } + + private def createShuffleServiceDaemonSet(): Unit = { + val ds = kubernetesTestComponents.kubernetesClient.extensions().daemonSets() + .createNew() + .withNewMetadata() + .withName("shuffle") + .endMetadata() + .withNewSpec() + .withNewTemplate() + .withNewMetadata() + .withLabels(Map("app" -> "spark-shuffle-service").asJava) + .endMetadata() + .withNewSpec() + .addNewVolume() + .withName("shuffle-dir") + .withNewHostPath() + .withPath("/tmp") + .endHostPath() + .endVolume() + .addNewContainer() + .withName("shuffle") + .withImage("spark-shuffle:latest") + .withImagePullPolicy("IfNotPresent") + .addNewVolumeMount() + .withName("shuffle-dir") + .withMountPath("/tmp") + .endVolumeMount() + .endContainer() + .endSpec() + .endTemplate() + .endSpec() + .done() + + // wait for daemonset to become available. + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val pods = kubernetesTestComponents.kubernetesClient.pods() + .withLabel("app", "spark-shuffle-service").list().getItems() + + if (pods.size() == 0 || Readiness.isReady(pods.get(0))) { + throw KubernetesSuite.ShuffleNotReadyException() + } + } + } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index d807c4d81009b..52b8c7d7359a6 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -31,6 +31,7 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, private val DRIVER_V1_DOCKER_FILE = "dockerfiles/driver/Dockerfile" private val DRIVER_V2_DOCKER_FILE = "dockerfiles/driver-v2/Dockerfile" private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" + private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile" private val DRIVER_INIT_DOCKER_FILE = "dockerfiles/driver-init/Dockerfile" private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile" private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) @@ -60,6 +61,7 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } buildImage("spark-driver", DRIVER_V1_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) + buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE) buildImage("spark-driver-v2", DRIVER_V2_DOCKER_FILE) buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE) buildImage("spark-driver-init", DRIVER_INIT_DOCKER_FILE) From be1807bf9a40fe10be01f4590b8ba1e7fd69fce9 Mon Sep 17 00:00:00 2001 From: foxish Date: Sun, 14 May 2017 00:27:28 -0700 Subject: [PATCH 4/6] dynamic allocation: docs --- docs/running-on-kubernetes.md | 66 +++++++++++++++++++++++++- resource-managers/kubernetes/README.md | 6 +-- 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index be410f18b5cfc..5b7bb6cc612c5 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -189,7 +189,7 @@ from the other deployment modes. See the [configuration page](configuration.html The namespace that will be used for running the driver and executor pods. When using spark-submit in cluster mode, this can also be passed to spark-submit via the - --kubernetes-namespace command line argument. The namespace must already exist. + --kubernetes-namespace command line argument. @@ -208,6 +208,37 @@ from the other deployment modes. See the [configuration page](configuration.html Docker tag format. + + spark.kubernetes.shuffle.namespace + default + + Namespace in which the shuffle service pods are present. The shuffle service must be + created in the cluster prior to attempts to use it. + + + + spark.kubernetes.shuffle.labels + (none) + + Labels that will be used to look up shuffle service pods. This should be a comma-separated list of label key-value pairs, + where each label is in the format key=value. The labels chosen must be such that + they match exactly one shuffle service pod on each node that executors are launched. + + + + spark.kubernetes.allocation.batch.size + 5 + + Number of pods to launch at once in each round of executor pod allocation. + + + + spark.kubernetes.allocation.batch.delay + 1 + + Number of seconds to wait between each round of executor pod allocation. + + spark.kubernetes.authenticate.submission.caCertFile (none) @@ -389,10 +420,41 @@ from the other deployment modes. See the [configuration page](configuration.html +## Dynamic Executor Scaling + +Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running +an external shuffle service. This is typically a [daemonset](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/) +with a provisioned [hostpath](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) volume. +This shuffle service may be shared by executors belonging to different SparkJobs. Using Spark with dynamic allocation +on Kubernetes assumes that a cluster administrator has set up one or more shuffle-service daemonsets in the cluster. + +A sample configuration file is provided in `conf/kubernetes-shuffle-service.yaml` which can be customized as needed +for a particular cluster. It is important to note that `spec.template.metadata.labels` are setup appropriately for the shuffle +service because there may be multiple shuffle service instances running in a cluster. The labels give us a way to target a particular +shuffle service. + +For example, if the shuffle service we want to use is in the default namespace, and +has pods with labels `app=spark-shuffle-service` and `spark-version=2.1.0`, we can +use those tags to target that particular shuffle service at job launch time. In order to run a job with dynamic allocation enabled, +the command may then look like the following: + + bin/spark-submit \ + --deploy-mode cluster \ + --class org.apache.spark.examples.GroupByTest \ + --master k8s://: \ + --kubernetes-namespace default \ + --conf spark.app.name=group-by-test \ + --conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \ + --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \ + --conf spark.dynamicAllocation.enabled=true \ + --conf spark.shuffle.service.enabled=true \ + --conf spark.kubernetes.shuffle.namespace=default \ + --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.1.0" \ + examples/jars/spark_examples_2.11-2.2.0.jar 10 400000 2 + ## Current Limitations Running Spark on Kubernetes is currently an experimental feature. Some restrictions on the current implementation that should be lifted in the future include: -* Applications can only use a fixed number of executors. Dynamic allocation is not supported. * Applications can only run in cluster mode. * Only Scala and Java applications can be run. diff --git a/resource-managers/kubernetes/README.md b/resource-managers/kubernetes/README.md index 734c29947b6d9..fd1ad29eb795d 100644 --- a/resource-managers/kubernetes/README.md +++ b/resource-managers/kubernetes/README.md @@ -14,10 +14,10 @@ important matters to keep in mind when developing this feature. # Building Spark with Kubernetes Support -To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. +To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile +the Kubernetes core implementation module along with its dependencies: - git checkout branch-2.1-kubernetes - build/mvn package -Pkubernetes -DskipTests + build/mvn compile -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests To build a distribution of Spark with Kubernetes support, use the `dev/make-distribution.sh` script, and add the `kubernetes` profile as part of the build arguments. Any other build arguments can be specified as one would expect when From 66e22b2af8ce81949a987a757f0d5c02b36c02d9 Mon Sep 17 00:00:00 2001 From: foxish Date: Sun, 14 May 2017 13:10:11 -0700 Subject: [PATCH 5/6] dynamic allocation: kubernetes allocator and executor accounting --- .../spark/deploy/kubernetes/config.scala | 45 ++++++++++++++ .../KubernetesClusterSchedulerBackend.scala | 60 ++++++++++++++----- 2 files changed, 91 insertions(+), 14 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index e379b40e376fc..09b2d38cb8e38 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -157,6 +157,13 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val SPARK_SHUFFLE_SERVICE_HOST = + ConfigBuilder("spark.shuffle.service.host") + .doc("Host for Spark Shuffle Service") + .internal() + .stringConf + .createOptional + // Note that while we set a default for this when we start up the // scheduler, the specific default value is dynamically determined // based on the executor memory. @@ -270,6 +277,44 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_SHUFFLE_NAMESPACE = + ConfigBuilder("spark.kubernetes.shuffle.namespace") + .doc("Namespace of the shuffle service") + .stringConf + .createWithDefault("default") + + private[spark] val KUBERNETES_SHUFFLE_SVC_IP = + ConfigBuilder("spark.kubernetes.shuffle.ip") + .doc("This setting is for debugging only. Setting this " + + "allows overriding the IP that the executor thinks its colocated " + + "shuffle service is on") + .stringConf + .createOptional + + private[spark] val KUBERNETES_SHUFFLE_LABELS = + ConfigBuilder("spark.kubernetes.shuffle.labels") + .doc("Labels to identify the shuffle service") + .stringConf + .createOptional + + private[spark] val KUBERNETES_SHUFFLE_DIR = + ConfigBuilder("spark.kubernetes.shuffle.dir") + .doc("Path to the shared shuffle directories.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_ALLOCATION_BATCH_SIZE = + ConfigBuilder("spark.kubernetes.allocation.batch.size") + .doc("Number of pods to launch at once in each round of dynamic allocation. ") + .intConf + .createWithDefault(5) + + private[spark] val KUBERNETES_ALLOCATION_BATCH_DELAY = + ConfigBuilder("spark.kubernetes.allocation.batch.delay") + .doc("Number of seconds to wait between each round of executor allocation. ") + .longConf + .createWithDefault(1) + private[spark] val DRIVER_SERVICE_MANAGER_TYPE = ConfigBuilder("spark.kubernetes.driver.serviceManagerType") .doc("A tag indicating which class to use for creating the Kubernetes service and" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index a2294a6766980..322aad199c57b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -17,20 +17,21 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} -import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, - EnvVarSourceBuilder, Pod, QuantityBuilder} -import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.Logging import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -105,6 +106,38 @@ private[spark] class KubernetesClusterSchedulerBackend( private val initialExecutors = getInitialTargetExecutorNumber(1) + private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + require(podAllocationInterval > 0, s"Allocation batch delay " + + s"${KUBERNETES_ALLOCATION_BATCH_DELAY} " + + s"is ${podAllocationInterval}, should be a positive integer") + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + require(podAllocationSize > 0, s"Allocation batch size " + + s"${KUBERNETES_ALLOCATION_BATCH_SIZE} " + + s"is ${podAllocationSize}, should be a positive integer") + + private val allocator = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") + + private val allocatorRunnable: Runnable = new Runnable { + override def run(): Unit = { + if (totalRegisteredExecutors.get() < runningExecutorPods.size) { + logDebug("Waiting for pending executors before scaling") + } else if (totalExpectedExecutors.get() <= runningExecutorPods.size) { + logDebug("Maximum allowed executor limit reached. Not scaling up further.") + } else { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + for (i <- 0 until math.min( + totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) { + runningExecutorPods += allocateNewExecutorPod() + logInfo( + s"Requesting a new executor, total executors is now ${runningExecutorPods.size}") + } + } + } + } + } + private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) @@ -118,6 +151,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } else { conf.getInt("spark.executor.instances", defaultNumExecutors) } + } override def applicationId(): String = conf.get("spark.app.id", super.applicationId()) @@ -130,12 +164,20 @@ private[spark] class KubernetesClusterSchedulerBackend( super.start() executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) .watch(new ExecutorPodsWatcher())) + + allocator.scheduleWithFixedDelay( + allocatorRunnable, 0, podAllocationInterval.get, TimeUnit.SECONDS) + if (!Utils.isDynamicAllocationEnabled(sc.conf)) { doRequestTotalExecutors(initialExecutors) } } override def stop(): Unit = { + // stop allocation of new resources. + allocator.shutdown() + + // send stop message to executors so they shut down cleanly super.stop() @@ -253,17 +295,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { - RUNNING_EXECUTOR_PODS_LOCK.synchronized { - if (requestedTotal > totalExpectedExecutors.get) { - logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" - + s" additional executors, expecting total $requestedTotal and currently" + - s" expected ${totalExpectedExecutors.get}") - for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { - runningExecutorPods += allocateNewExecutorPod() - } - } - totalExpectedExecutors.set(requestedTotal) - } + totalExpectedExecutors.set(requestedTotal) true } From 4dd4715547ab0065eb7c3e0db086f54431578c00 Mon Sep 17 00:00:00 2001 From: foxish Date: Mon, 15 May 2017 10:13:42 -0700 Subject: [PATCH 6/6] dynamic allocation: shuffle service, node caching --- .../kubernetes/ConfigurationUtils.scala | 41 ++++ .../spark/deploy/kubernetes/constants.scala | 1 + .../KubernetesClusterSchedulerBackend.scala | 176 ++++++++++++++---- .../cluster/kubernetes/ShufflePodCache.scala | 91 +++++++++ 4 files changed, 272 insertions(+), 37 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala new file mode 100644 index 0000000000000..f3bd598556019 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes + +import org.apache.spark.SparkException + +object ConfigurationUtils { + def parseKeyValuePairs( + maybeKeyValues: Option[String], + configKey: String, + keyValueType: String): Map[String, String] = { + + maybeKeyValues.map(keyValues => { + keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => { + keyValue.split("=", 2).toSeq match { + case Seq(k, v) => + (k, v) + case _ => + throw new SparkException(s"Custom $keyValueType set by $configKey must be a" + + s" comma-separated list of key-value pairs, with format =." + + s" Got value: $keyValue. All values: $keyValues") + } + }).toMap + }).getOrElse(Map.empty[String, String]) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index f82cb88b4c622..27e47eb61933f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -113,4 +113,5 @@ package object constants { s"$INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH/$INIT_CONTAINER_PROPERTIES_FILE_NAME" private[spark] val DOWNLOAD_JARS_VOLUME_NAME = "download-jars" private[spark] val DOWNLOAD_FILES_VOLUME_NAME = "download-files" + private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 322aad199c57b..669a073b1fab6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -25,15 +25,17 @@ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.commons.io.FilenameUtils -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{SparkContext, SparkEnv, SparkException} +import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.internal.Logging -import org.apache.spark.rpc.RpcEndpointAddress +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} @@ -50,6 +52,7 @@ private[spark] class KubernetesClusterSchedulerBackend( private val EXECUTOR_PODS_BY_IPS_LOCK = new Object private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs. + private var shufflePodCache: Option[ShufflePodCache] = None private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) @@ -89,6 +92,28 @@ private[spark] class KubernetesClusterSchedulerBackend( throw new SparkException(s"Executor cannot find driver pod", throwable) } + private val shuffleServiceConfig: Option[ShuffleServiceConfig] = + if (Utils.isDynamicAllocationEnabled(sc.conf)) { + val shuffleNamespace = conf.get(KUBERNETES_SHUFFLE_NAMESPACE) + val parsedShuffleLabels = ConfigurationUtils.parseKeyValuePairs( + conf.get(KUBERNETES_SHUFFLE_LABELS), KUBERNETES_SHUFFLE_LABELS.key, + "shuffle-labels") + if (parsedShuffleLabels.size == 0) { + throw new SparkException(s"Dynamic allocation enabled " + + s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") + } + + val shuffleDirs = conf.get(KUBERNETES_SHUFFLE_DIR).map { + _.split(",") + }.getOrElse(Utils.getConfiguredLocalDirs(conf)) + Some( + ShuffleServiceConfig(shuffleNamespace, + parsedShuffleLabels, + shuffleDirs)) + } else { + None + } + override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { 0.8 @@ -166,17 +191,22 @@ private[spark] class KubernetesClusterSchedulerBackend( .watch(new ExecutorPodsWatcher())) allocator.scheduleWithFixedDelay( - allocatorRunnable, 0, podAllocationInterval.get, TimeUnit.SECONDS) + allocatorRunnable, 0, podAllocationInterval, TimeUnit.SECONDS) if (!Utils.isDynamicAllocationEnabled(sc.conf)) { doRequestTotalExecutors(initialExecutors) + } else { + shufflePodCache = shuffleServiceConfig + .map { config => new ShufflePodCache( + kubernetesClient, config.shuffleNamespace, config.shuffleLabels) } + shufflePodCache.foreach(_.start()) } } override def stop(): Unit = { - // stop allocation of new resources. + // stop allocation of new resources and caches. allocator.shutdown() - + shufflePodCache.foreach(_.stop()) // send stop message to executors so they shut down cleanly super.stop() @@ -256,37 +286,60 @@ private[spark] class KubernetesClusterSchedulerBackend( .withContainerPort(port._2) .build() }) + + val basePodBuilder = new PodBuilder() + .withNewMetadata() + .withName(name) + .withLabels(selectors) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() + .withNewSpec() + .withHostname(hostname) + .addNewContainer() + .withName(s"executor") + .withImage(executorDockerImage) + .withImagePullPolicy("IfNotPresent") + .withNewResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .addToRequests("cpu", executorCpuQuantity) + .addToLimits("cpu", executorCpuQuantity) + .endResources() + .withEnv(requiredEnv.asJava) + .withPorts(requiredPorts.asJava) + .endContainer() + .endSpec() + + val resolvedPodBuilder = shuffleServiceConfig + .map { config => + config.shuffleDirs.foldLeft(basePodBuilder) { (builder, dir) => + builder + .editSpec() + .addNewVolume() + .withName(FilenameUtils.getBaseName(dir)) + .withNewHostPath() + .withPath(dir) + .endHostPath() + .endVolume() + .editFirstContainer() + .addNewVolumeMount() + .withName(FilenameUtils.getBaseName(dir)) + .withMountPath(dir) + .endVolumeMount() + .endContainer() + .endSpec() + } + }.getOrElse(basePodBuilder) + try { - (executorId, kubernetesClient.pods().createNew() - .withNewMetadata() - .withName(name) - .withLabels(selectors) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() - .withNewSpec() - .withHostname(hostname) - .addNewContainer() - .withName(s"executor") - .withImage(executorDockerImage) - .withImagePullPolicy("IfNotPresent") - .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .addToLimits("cpu", executorCpuQuantity) - .endResources() - .withEnv(requiredEnv.asJava) - .withPorts(requiredPorts.asJava) - .endContainer() - .endSpec() - .done()) + (executorId, kubernetesClient.pods().create(resolvedPodBuilder.build())) } catch { case throwable: Throwable => logError("Failed to allocate executor pod.", throwable) @@ -294,6 +347,11 @@ private[spark] class KubernetesClusterSchedulerBackend( } } + override def createDriverEndpoint( + properties: Seq[(String, String)]): DriverEndpoint = { + new KubernetesDriverEndpoint(rpcEnv, properties) + } + override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { totalExpectedExecutors.set(requestedTotal) true @@ -345,6 +403,50 @@ private[spark] class KubernetesClusterSchedulerBackend( logDebug("Executor pod watch closed.", cause) } } + + private class KubernetesDriverEndpoint( + rpcEnv: RpcEnv, + sparkProperties: Seq[(String, String)]) + extends DriverEndpoint(rpcEnv, sparkProperties) { + override def receiveAndReply( + context: RpcCallContext): PartialFunction[Any, Unit] = { + new PartialFunction[Any, Unit]() { + override def isDefinedAt(msg: Any): Boolean = { + msg match { + case RetrieveSparkAppConfig(executorId) => + Utils.isDynamicAllocationEnabled(sc.conf) + case _ => false + } + } + + override def apply(msg: Any): Unit = { + msg match { + case RetrieveSparkAppConfig(executorId) => + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + var resolvedProperties = sparkProperties + val runningExecutorPod = kubernetesClient + .pods() + .withName(runningExecutorPods(executorId).getMetadata.getName) + .get() + val nodeName = runningExecutorPod.getSpec.getNodeName + val shufflePodIp = shufflePodCache.get.getShufflePodForExecutor(nodeName) + resolvedProperties = resolvedProperties ++ Seq( + (SPARK_SHUFFLE_SERVICE_HOST.key, shufflePodIp)) + + val reply = SparkAppConfig( + resolvedProperties, + SparkEnv.get.securityManager.getIOEncryptionKey()) + context.reply(reply) + } + } + } + }.orElse(super.receiveAndReply(context)) + } + } + + case class ShuffleServiceConfig(shuffleNamespace: String, + shuffleLabels: Map[String, String], + shuffleDirs: Seq[String]) } private object KubernetesClusterSchedulerBackend { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala new file mode 100644 index 0000000000000..53b4e745ce7c7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.kubernetes + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.readiness.Readiness + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging + +private[spark] class ShufflePodCache ( + client: KubernetesClient, + dsNamespace: String, + dsLabels: Map[String, String]) extends Logging { + + private var shufflePodCache = scala.collection.mutable.Map[String, String]() + private var watcher: Watch = _ + + def start(): Unit = { + // seed the initial cache. + val pods = client.pods().withLabels(dsLabels.asJava).list() + pods.getItems.asScala.foreach { + pod => + if (Readiness.isReady(pod)) { + addShufflePodToCache(pod) + } else { + logWarning(s"Found unready shuffle pod ${pod.getMetadata.getName} " + + s"on node ${pod.getSpec.getNodeName}") + } + } + + watcher = client + .pods() + .withLabels(dsLabels.asJava) + .watch(new Watcher[Pod] { + override def eventReceived(action: Watcher.Action, p: Pod): Unit = { + action match { + case Action.DELETED | Action.ERROR => + shufflePodCache.remove(p.getSpec.getNodeName) + case Action.ADDED | Action.MODIFIED if Readiness.isReady(p) => + addShufflePodToCache(p) + } + } + override def onClose(e: KubernetesClientException): Unit = {} + }) + } + + private def addShufflePodToCache(pod: Pod): Unit = { + if (shufflePodCache.contains(pod.getSpec.getNodeName)) { + val registeredPodName = shufflePodCache.get(pod.getSpec.getNodeName).get + logError(s"Ambiguous specification of shuffle service pod. " + + s"Found multiple matching pods: ${pod.getMetadata.getName}, " + + s"${registeredPodName} on ${pod.getSpec.getNodeName}") + + throw new SparkException(s"Ambiguous specification of shuffle service pod. " + + s"Found multiple matching pods: ${pod.getMetadata.getName}, " + + s"${registeredPodName} on ${pod.getSpec.getNodeName}") + } else { + shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP + } + } + + def stop(): Unit = { + watcher.close() + } + + def getShufflePodForExecutor(executorNode: String): String = { + shufflePodCache.get(executorNode) + .getOrElse(throw new SparkException(s"Unable to find shuffle pod on node $executorNode")) + } +} +