From 0e0ddad1af4124af4782a1e643be7981502eec79 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 7 Sep 2017 18:58:44 -0700 Subject: [PATCH 1/6] Use emptyDir volume mounts for executor local directories. --- .../spark/deploy/kubernetes/constants.scala | 2 + .../kubernetes/ExecutorPodFactory.scala | 55 +++++++++++++++++-- .../kubernetes/KubernetesClusterManager.scala | 15 ++--- 3 files changed, 61 insertions(+), 11 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 2c2ccf31b9dd9..9db5ef8c0369d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -71,6 +71,7 @@ package object constants { private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR" + private[spark] val ENV_SPARK_LOCAL_DIRS = "SPARK_LOCAL_DIRS" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" @@ -102,4 +103,5 @@ package object constants { private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L + private[spark] val GENERATED_LOCAL_DIR_MOUNT_ROOT = "/mnt/tmp/spark-local" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala index 2339612658537..c0a5d575ee668 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala @@ -16,9 +16,12 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import scala.collection.JavaConverters._ +import java.io.File +import java.nio.file.Paths +import java.util.UUID -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} +import scala.collection.JavaConverters._ +import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder} import org.apache.commons.io.FilenameUtils import org.apache.spark.{SparkConf, SparkException} @@ -265,9 +268,53 @@ private[spark] class ExecutorPodFactoryImpl( val executorPodWithNodeAffinity = nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( executorPodWithInitContainer, nodeToLocalTaskCount) - new PodBuilder(executorPodWithNodeAffinity) + + val (executorPodWithTempLocalDirs, executorContainerWithTempLocalDirs) = + if (shuffleManager.isEmpty) { + // If we're not using the external shuffle manager, we should use emptyDir volumes for + // shuffle directories since it's important for disk I/O for these directories to be + // performant. If the user has not provided a local directory, instead of using the + // Java temporary directory, we create one instead. This is because we want to avoid + // as much as possible mounting an emptyDir which overlaps with an existing path in + // the Docker image, which is very likely what would happen if we tried to mount the + // volume at Java's temporary directory path, which is /tmp in many JDKs. + val resolvedLocalDirs = sparkConf.get( + "spark.local.dir", s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${UUID.randomUUID()}") + .split(",") + val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => + new VolumeBuilder() + .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") + .withNewEmptyDir().endEmptyDir() + .build() + } + val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map { + case (volume, path) => + new VolumeMountBuilder() + .withName(volume.getName) + .withMountPath(path) + .build() + } + // Setting the SPARK_LOCAL_DIRS environment variable will force the executor to use the + // generated directory if the user did not provide one, as opposed to using the Java + // temporary directory. This also overrides the value of spark.local.dir in SparkConf, + // which is intended. See Utils#getConfiguredLocalDirs(). + (new PodBuilder(executorPodWithNodeAffinity) + .editSpec() + .addToVolumes(localDirVolumes: _*) + .endSpec() + .build(), + new ContainerBuilder(initBootstrappedExecutorContainer) + .addToVolumeMounts(localDirVolumeMounts: _*) + .addNewEnv() + .withName(ENV_SPARK_LOCAL_DIRS) + .withValue(resolvedLocalDirs.mkString(",")) + .endEnv() + .build()) + } else (executorPodWithNodeAffinity, initBootstrappedExecutorContainer) + + new PodBuilder(executorPodWithTempLocalDirs) .editSpec() - .addToContainers(initBootstrappedExecutorContainer) + .addToContainers(executorContainerWithTempLocalDirs) .endSpec() .build() } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index 165dd04ad7e4f..4ad5738525635 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -113,15 +113,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) - val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) { + val kubernetesShuffleManager = if (sparkConf.get( + org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) { val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl( - SparkTransportConf.fromSparkConf(sparkConf, "shuffle"), - sc.env.securityManager, - sc.env.securityManager.isAuthenticationEnabled()) + SparkTransportConf.fromSparkConf(sparkConf, "shuffle"), + sc.env.securityManager, + sc.env.securityManager.isAuthenticationEnabled()) Some(new KubernetesExternalShuffleManagerImpl( - sparkConf, - kubernetesClient, - kubernetesExternalShuffleClient)) + sparkConf, + kubernetesClient, + kubernetesExternalShuffleClient)) } else None val executorPodFactory = new ExecutorPodFactoryImpl( From 7af34c2e0d839af21c91020d29b0e4815bd9b66a Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 8 Sep 2017 18:37:37 -0700 Subject: [PATCH 2/6] Mount local dirs in the driver. Remove shuffle dir configuration. --- .../spark/deploy/kubernetes/config.scala | 6 -- ...DriverConfigurationStepsOrchestrator.scala | 6 +- ...LocalDirectoryMountConfigurationStep.scala | 99 +++++++++++++++++++ .../kubernetes/ExecutorPodFactory.scala | 8 +- .../KubernetesExternalShuffleManager.scala | 4 +- ...rConfigurationStepsOrchestratorSuite.scala | 7 +- 6 files changed, 112 insertions(+), 18 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 9dfd13e1817f8..38a51f4137f9b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -188,12 +188,6 @@ package object config extends Logging { .stringConf .createOptional - private[spark] val KUBERNETES_SHUFFLE_DIR = - ConfigBuilder("spark.kubernetes.shuffle.dir") - .doc("Path to the shared shuffle directories.") - .stringConf - .createOptional - private[spark] val KUBERNETES_SHUFFLE_APISERVER_URI = ConfigBuilder("spark.kubernetes.shuffle.apiServer.url") .doc("URL to the Kubernetes API server that the shuffle service will monitor for Spark pods.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala index 1bb336fa616d0..344777dad649b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala @@ -101,6 +101,9 @@ private[spark] class DriverConfigurationStepsOrchestrator( val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep( submissionSparkConf, kubernetesResourceNamePrefix) + val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep( + submissionSparkConf) + val pythonStep = mainAppResource match { case PythonMainAppResource(mainPyResource) => Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath)) @@ -177,7 +180,8 @@ private[spark] class DriverConfigurationStepsOrchestrator( Seq( initialSubmissionStep, kubernetesCredentialsStep, - dependencyResolutionStep) ++ + dependencyResolutionStep, + localDirectoryMountConfigurationStep) ++ submittedDependenciesBootstrapSteps ++ pythonStep.toSeq ++ mountSecretsStep.toSeq diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala new file mode 100644 index 0000000000000..bfad4e3ec7c63 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala @@ -0,0 +1,99 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import java.nio.file.Paths +import java.util.UUID + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{DriverConfigurationStep, KubernetesDriverSpec} + +/** + * Configures local directories that the driver and executors should use for temporary storage. + * + * Note that we have different semantics for scratch space in Kubernetes versus the other cluster + * managers. In Kubernetes, we cannot allow the local directories to resolve to the Java temporary + * directory. This is because we will mount either emptyDir volumes for both the driver and + * executors, or hostPath volumes for the executors and an emptyDir for the driver. In either + * case, the mount paths need to be directories that do not exist in the base container images. + * But the Java temporary directory is typically a directory like /tmp which exists in most + * container images. + * + * The solution is twofold: + * - When not using an external shuffle service, a reasonable default is to create a new directory with + * a random name and set that to be the value of `spark.local.dir`. + * - When using the external shuffle service, it is risky to assume that the user intends to mount + * the JVM temporary directory into the pod as a hostPath volume. We therefore enforce that + * spark.local.dir must be set in dynamic allocation mode so that the user explicitly sets the paths + * that have to be mounted. + */ +private[spark] class LocalDirectoryMountConfigurationStep( + submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + val configuredLocalDirs = submissionSparkConf.getOption("spark.local.dir") + val isUsingExternalShuffle = submissionSparkConf.get( + org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED) + val resolvedLocalDirsSingleString = if (isUsingExternalShuffle) { + require(configuredLocalDirs.isDefined, "spark.local.dir must be provided explicitly when" + + " using the external shuffle service in Kubernetes. These directories should map to" + + " the paths that are mounted into the external shuffle service pods.") + configuredLocalDirs.get + } else { + // If we don't use the external shuffle service, local directories should be randomized if + // not provided. + configuredLocalDirs.getOrElse(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/spark-${UUID.randomUUID}") + } + val resolvedLocalDirs = resolvedLocalDirsSingleString.split(",") + // It's worth noting that we always use an emptyDir volume for the directories on the driver, + // because the driver does not need a hostPath to share its scratch space with any other pod. + // The driver itself will decide on whether to use a hostPath volume or an emptyDir volume for + // these directories on the executors. (see ExecutorPodFactory and + // KubernetesExternalClusterManager) + val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => + new VolumeBuilder() + .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") + .withNewEmptyDir().endEmptyDir() + .build() + } + val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map { + case (volume, path) => + new VolumeMountBuilder() + .withName(volume.getName) + .withMountPath(path) + .build() + } + val resolvedDriverSparkConf = driverSpec.driverSparkConf.clone().set( + "spark.local.dir", resolvedLocalDirsSingleString) + driverSpec.copy( + driverPod = new PodBuilder(driverSpec.driverPod) + .editSpec() + .addToVolumes(localDirVolumes: _*) + .endSpec() + .build(), + driverContainer = new ContainerBuilder(driverSpec.driverContainer) + .addToVolumeMounts(localDirVolumeMounts: _*) + .build(), + driverSparkConf = resolvedDriverSparkConf + ) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala index c0a5d575ee668..d1b73871069cb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala @@ -278,9 +278,7 @@ private[spark] class ExecutorPodFactoryImpl( // as much as possible mounting an emptyDir which overlaps with an existing path in // the Docker image, which is very likely what would happen if we tried to mount the // volume at Java's temporary directory path, which is /tmp in many JDKs. - val resolvedLocalDirs = sparkConf.get( - "spark.local.dir", s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${UUID.randomUUID()}") - .split(",") + val resolvedLocalDirs = Utils.getConfiguredLocalDirs(sparkConf) val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => new VolumeBuilder() .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") @@ -305,10 +303,6 @@ private[spark] class ExecutorPodFactoryImpl( .build(), new ContainerBuilder(initBootstrappedExecutorContainer) .addToVolumeMounts(localDirVolumeMounts: _*) - .addNewEnv() - .withName(ENV_SPARK_LOCAL_DIRS) - .withValue(resolvedLocalDirs.mkString(",")) - .endEnv() .build()) } else (executorPodWithNodeAffinity, initBootstrappedExecutorContainer) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala index d00783b84a948..48ff850d6c734 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala @@ -67,9 +67,7 @@ private[spark] class KubernetesExternalShuffleManagerImpl( s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") } private val externalShufflePort = sparkConf.getInt("spark.shuffle.service.port", 7337) - private val shuffleDirs = sparkConf.get(KUBERNETES_SHUFFLE_DIR).map { - _.split(",") - }.getOrElse(Utils.getConfiguredLocalDirs(sparkConf)) + private val shuffleDirs = Utils.getConfiguredLocalDirs(sparkConf) private var shufflePodCache = scala.collection.mutable.Map[String, String]() private var watcher: Watch = _ diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala index 6bad594629f76..fb7990e2411cd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -51,7 +51,8 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS orchestrator, classOf[BaseDriverConfigurationStep], classOf[DriverKubernetesCredentialsStep], - classOf[DependencyResolutionStep]) + classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep]) } test("Submission steps with an init-container.") { @@ -74,6 +75,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[BaseDriverConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[InitContainerBootstrapStep]) } @@ -95,6 +97,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[BaseDriverConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[PythonStep]) } @@ -116,6 +119,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[BaseDriverConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[MountSmallLocalFilesStep]) } @@ -139,6 +143,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[BaseDriverConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[MountSecretsStep]) } From 9bc526121e5fd6e540af637c71677a50155dcf0f Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 8 Sep 2017 18:47:53 -0700 Subject: [PATCH 3/6] Arrange imports --- .../spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala index d1b73871069cb..9726ee2fbbdab 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala @@ -20,9 +20,9 @@ import java.io.File import java.nio.file.Paths import java.util.UUID -import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder} import org.apache.commons.io.FilenameUtils +import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} From cff05ec06fb17896e20c82c12319ba7823045da0 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 8 Sep 2017 20:31:37 -0700 Subject: [PATCH 4/6] Fix style and integration tests. --- .../submit/LocalDirectoryMountConfigurationStep.scala | 9 ++++----- .../kubernetes/integrationtest/KubernetesSuite.scala | 1 + 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala index bfad4e3ec7c63..c0b4c5819ba6d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -39,12 +38,12 @@ import org.apache.spark.deploy.kubernetes.submit.submitsteps.{DriverConfiguratio * container images. * * The solution is twofold: - * - When not using an external shuffle service, a reasonable default is to create a new directory with - * a random name and set that to be the value of `spark.local.dir`. + * - When not using an external shuffle service, a reasonable default is to create a new directory + * with a UUID name and set that to be the value of `spark.local.dir`. * - When using the external shuffle service, it is risky to assume that the user intends to mount * the JVM temporary directory into the pod as a hostPath volume. We therefore enforce that - * spark.local.dir must be set in dynamic allocation mode so that the user explicitly sets the paths - * that have to be mounted. + * spark.local.dir must be set in dynamic allocation mode so that the user explicitly sets the + * paths that have to be mounted. */ private[spark] class LocalDirectoryMountConfigurationStep( submissionSparkConf: SparkConf) extends DriverConfigurationStep { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index e204d0173aff8..169776d12ff7f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -150,6 +150,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) sparkConf.set("spark.dynamicAllocation.enabled", "true") + sparkConf.set("spark.local.dir", "/tmp") sparkConf.set("spark.shuffle.service.enabled", "true") sparkConf.set("spark.kubernetes.shuffle.labels", "app=spark-shuffle-service") sparkConf.set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace) From fb74418be3bcc4d4ab0e0a8f96a55cf093517bde Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 8 Sep 2017 20:34:54 -0700 Subject: [PATCH 5/6] Add TODO note for volume types to change. --- .../kubernetes/KubernetesExternalShuffleManager.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala index 48ff850d6c734..08593d5c10390 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala @@ -138,6 +138,12 @@ private[spark] class KubernetesExternalShuffleManagerImpl( } override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = { + // TODO: Using hostPath for the local directory will also make it such that the + // other uses of the local directory - broadcasting and caching - will also write + // to the directory that the shuffle service is aware of. It would be better for + // these directories to be separate so that the lifetime of the non-shuffle scratch + // space is tied to an emptyDir instead of the hostPath. This requires a change in + // core Spark as well. shuffleDirs.zipWithIndex.map { case (shuffleDir, shuffleDirIndex) => val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}" From c01bfd1f512a0a0c4919eae1d766f585c529957d Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 11 Sep 2017 15:10:07 -0700 Subject: [PATCH 6/6] Add unit test and extra documentation. --- conf/kubernetes-shuffle-service.yaml | 4 +- docs/running-on-kubernetes.md | 11 ++- ...LocalDirectoryMountConfigurationStep.scala | 10 +-- .../kubernetes/ExecutorPodFactory.scala | 8 +- ...DirectoryMountConfigurationStepSuite.scala | 90 +++++++++++++++++++ 5 files changed, 111 insertions(+), 12 deletions(-) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/{ => submitsteps}/LocalDirectoryMountConfigurationStep.scala (93%) create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala diff --git a/conf/kubernetes-shuffle-service.yaml b/conf/kubernetes-shuffle-service.yaml index 8ab0b362ea32e..8f7ddb3aafc9f 100644 --- a/conf/kubernetes-shuffle-service.yaml +++ b/conf/kubernetes-shuffle-service.yaml @@ -32,7 +32,7 @@ spec: volumes: - name: temp-volume hostPath: - path: '/var/tmp' # change this path according to your cluster configuration. + path: '/tmp/spark-local' # change this path according to your cluster configuration. containers: - name: shuffle # This is an official image that is built @@ -41,7 +41,7 @@ spec: image: kubespark/spark-shuffle:v2.2.0-kubernetes-0.3.0 imagePullPolicy: IfNotPresent volumeMounts: - - mountPath: '/tmp' + - mountPath: '/tmp/spark-local' name: temp-volume # more volumes can be mounted here. # The spark job must be configured to use these diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 604f2941e107c..d00c9aed660e2 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -220,7 +220,7 @@ Below is an example submission: local:///opt/spark/examples/src/main/python/pi.py 100 ``` -## Dynamic Executor Scaling +## Dynamic Allocation in Kubernetes Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running an external shuffle service. This is typically a [daemonset](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/) @@ -243,6 +243,7 @@ the command may then look like the following: --class org.apache.spark.examples.GroupByTest \ --master k8s://: \ --kubernetes-namespace default \ + --conf spark.local.dir=/tmp/spark-local --conf spark.app.name=group-by-test \ --conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \ --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \ @@ -252,6 +253,14 @@ the command may then look like the following: --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.2.0" \ local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.3.0.jar 10 400000 2 +The external shuffle service has to mount directories that can be shared with the executor pods. The provided example +YAML spec mounts a hostPath volume to the external shuffle service pods, but these hostPath volumes must also be mounted +into the executors. When using the external shuffle service, the directories specified in the `spark.local.dir` +configuration are mounted as hostPath volumes into all of the executor containers. To ensure that one does not +accidentally mount the incorrect hostPath volumes, the value of `spark.local.dir` must be specified in your +application's configuration when using Kubernetes, even though it defaults to the JVM's temporary directory when using +other cluster managers. + ## Advanced ### Securing the Resource Staging Server with TLS diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStep.scala similarity index 93% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStep.scala index c0b4c5819ba6d..d99edaa225000 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStep.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes.submit +package org.apache.spark.deploy.kubernetes.submit.submitsteps import java.nio.file.Paths import java.util.UUID @@ -22,9 +22,7 @@ import java.util.UUID import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder} import org.apache.spark.SparkConf -import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{DriverConfigurationStep, KubernetesDriverSpec} /** * Configures local directories that the driver and executors should use for temporary storage. @@ -46,7 +44,9 @@ import org.apache.spark.deploy.kubernetes.submit.submitsteps.{DriverConfiguratio * paths that have to be mounted. */ private[spark] class LocalDirectoryMountConfigurationStep( - submissionSparkConf: SparkConf) extends DriverConfigurationStep { + submissionSparkConf: SparkConf, + randomDirProvider: () => String = () => s"spark-${UUID.randomUUID()}") + extends DriverConfigurationStep { override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { val configuredLocalDirs = submissionSparkConf.getOption("spark.local.dir") @@ -60,7 +60,7 @@ private[spark] class LocalDirectoryMountConfigurationStep( } else { // If we don't use the external shuffle service, local directories should be randomized if // not provided. - configuredLocalDirs.getOrElse(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/spark-${UUID.randomUUID}") + configuredLocalDirs.getOrElse(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${randomDirProvider()}") } val resolvedLocalDirs = resolvedLocalDirsSingleString.split(",") // It's worth noting that we always use an emptyDir volume for the directories on the driver, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala index 9726ee2fbbdab..1e50ca585ad12 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala @@ -274,10 +274,10 @@ private[spark] class ExecutorPodFactoryImpl( // If we're not using the external shuffle manager, we should use emptyDir volumes for // shuffle directories since it's important for disk I/O for these directories to be // performant. If the user has not provided a local directory, instead of using the - // Java temporary directory, we create one instead. This is because we want to avoid - // as much as possible mounting an emptyDir which overlaps with an existing path in - // the Docker image, which is very likely what would happen if we tried to mount the - // volume at Java's temporary directory path, which is /tmp in many JDKs. + // Java temporary directory, we create one instead, because we want to avoid + // mounting an emptyDir which overlaps with an existing path in the Docker image. + // Java's temporary directory path is typically /tmp or a similar path, which is + // likely to exist in most images. val resolvedLocalDirs = Utils.getConfiguredLocalDirs(sparkConf) val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => new VolumeBuilder() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala new file mode 100644 index 0000000000000..259875c1f343b --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import java.nio.file.Paths + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.constants._ + +private[spark] class LocalDirectoryMountConfigurationStepSuite extends SparkFunSuite { + + test("When using the external shuffle service, the local directories must be provided.") { + val sparkConf = new SparkConf(false) + .set(org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED, true) + val configurationStep = new LocalDirectoryMountConfigurationStep(sparkConf) + try { + configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf)) + fail("The configuration step should have failed without local dirs.") + } catch { + case e: Throwable => + assert(e.getMessage === "requirement failed: spark.local.dir must be provided explicitly" + + " when using the external shuffle service in Kubernetes. These directories should map" + + " to the paths that are mounted into the external shuffle service pods.") + } + } + + test("When not using the external shuffle service, a random directory should be set" + + " for local dirs if one is not provided.") { + val sparkConf = new SparkConf(false) + .set(org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED, false) + val configurationStep = new LocalDirectoryMountConfigurationStep( + sparkConf, () => "local-dir") + val resolvedDriverSpec = configurationStep.configureDriver( + KubernetesDriverSpec.initialSpec(sparkConf)) + testLocalDirsMatch(resolvedDriverSpec, Seq(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/local-dir")) + } + + test("When not using the external shuffle service, provided local dirs should be mounted as" + + " emptyDirs.") { + val sparkConf = new SparkConf(false) + .set(org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED, false) + .set("spark.local.dir", "/mnt/tmp/spark-local,/var/tmp/spark-local") + val configurationStep = new LocalDirectoryMountConfigurationStep( + sparkConf) + val resolvedDriverSpec = configurationStep.configureDriver( + KubernetesDriverSpec.initialSpec(sparkConf)) + testLocalDirsMatch(resolvedDriverSpec, Seq("/mnt/tmp/spark-local", "/var/tmp/spark-local")) + } + + private def testLocalDirsMatch( + resolvedDriverSpec: KubernetesDriverSpec, expectedLocalDirs: Seq[String]): Unit = { + assert(resolvedDriverSpec.driverSparkConf.get("spark.local.dir").split(",") === + expectedLocalDirs) + expectedLocalDirs + .zip(resolvedDriverSpec.driverPod.getSpec.getVolumes.asScala) + .zipWithIndex + .foreach { + case ((dir, volume), index) => + assert(volume.getEmptyDir != null) + val fileName = Paths.get(dir).getFileName.toString + assert(volume.getName === s"spark-local-dir-$index-$fileName") + } + + expectedLocalDirs + .zip(resolvedDriverSpec.driverContainer.getVolumeMounts.asScala) + .zipWithIndex + .foreach { + case ((dir, volumeMount), index) => + val fileName = Paths.get(dir).getFileName.toString + assert(volumeMount.getName === s"spark-local-dir-$index-$fileName") + assert(volumeMount.getMountPath === dir) + } + } +}