-
Notifications
You must be signed in to change notification settings - Fork 117
Mount emptyDir volumes for temporary directories on executors in static allocation mode. #486
Changes from all commits
0e0ddad
7af34c2
9bc5261
cff05ec
fb74418
c01bfd1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.kubernetes.submit.submitsteps | ||
|
|
||
| import java.nio.file.Paths | ||
| import java.util.UUID | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder} | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.deploy.kubernetes.constants._ | ||
|
|
||
| /** | ||
| * Configures local directories that the driver and executors should use for temporary storage. | ||
| * | ||
| * Note that we have different semantics for scratch space in Kubernetes versus the other cluster | ||
| * managers. In Kubernetes, we cannot allow the local directories to resolve to the Java temporary | ||
| * directory. This is because we will mount either emptyDir volumes for both the driver and | ||
| * executors, or hostPath volumes for the executors and an emptyDir for the driver. In either | ||
| * case, the mount paths need to be directories that do not exist in the base container images. | ||
| * But the Java temporary directory is typically a directory like /tmp which exists in most | ||
| * container images. | ||
| * | ||
| * The solution is twofold: | ||
| * - When not using an external shuffle service, a reasonable default is to create a new directory | ||
| * with a UUID name and set that to be the value of `spark.local.dir`. | ||
| * - When using the external shuffle service, it is risky to assume that the user intends to mount | ||
| * the JVM temporary directory into the pod as a hostPath volume. We therefore enforce that | ||
| * spark.local.dir must be set in dynamic allocation mode so that the user explicitly sets the | ||
| * paths that have to be mounted. | ||
| */ | ||
| private[spark] class LocalDirectoryMountConfigurationStep( | ||
| submissionSparkConf: SparkConf, | ||
| randomDirProvider: () => String = () => s"spark-${UUID.randomUUID()}") | ||
| extends DriverConfigurationStep { | ||
|
|
||
| override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { | ||
| val configuredLocalDirs = submissionSparkConf.getOption("spark.local.dir") | ||
| val isUsingExternalShuffle = submissionSparkConf.get( | ||
| org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED) | ||
| val resolvedLocalDirsSingleString = if (isUsingExternalShuffle) { | ||
| require(configuredLocalDirs.isDefined, "spark.local.dir must be provided explicitly when" + | ||
| " using the external shuffle service in Kubernetes. These directories should map to" + | ||
| " the paths that are mounted into the external shuffle service pods.") | ||
| configuredLocalDirs.get | ||
| } else { | ||
| // If we don't use the external shuffle service, local directories should be randomized if | ||
| // not provided. | ||
| configuredLocalDirs.getOrElse(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${randomDirProvider()}") | ||
| } | ||
| val resolvedLocalDirs = resolvedLocalDirsSingleString.split(",") | ||
| // It's worth noting that we always use an emptyDir volume for the directories on the driver, | ||
| // because the driver does not need a hostPath to share its scratch space with any other pod. | ||
| // The driver itself will decide on whether to use a hostPath volume or an emptyDir volume for | ||
| // these directories on the executors. (see ExecutorPodFactory and | ||
| // KubernetesExternalClusterManager) | ||
| val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => | ||
| new VolumeBuilder() | ||
| .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") | ||
| .withNewEmptyDir().endEmptyDir() | ||
| .build() | ||
| } | ||
| val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map { | ||
| case (volume, path) => | ||
| new VolumeMountBuilder() | ||
| .withName(volume.getName) | ||
| .withMountPath(path) | ||
| .build() | ||
| } | ||
| val resolvedDriverSparkConf = driverSpec.driverSparkConf.clone().set( | ||
| "spark.local.dir", resolvedLocalDirsSingleString) | ||
| driverSpec.copy( | ||
| driverPod = new PodBuilder(driverSpec.driverPod) | ||
| .editSpec() | ||
| .addToVolumes(localDirVolumes: _*) | ||
| .endSpec() | ||
| .build(), | ||
| driverContainer = new ContainerBuilder(driverSpec.driverContainer) | ||
| .addToVolumeMounts(localDirVolumeMounts: _*) | ||
| .build(), | ||
| driverSparkConf = resolvedDriverSparkConf | ||
| ) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,10 +16,13 @@ | |
| */ | ||
| package org.apache.spark.scheduler.cluster.kubernetes | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import java.io.File | ||
| import java.nio.file.Paths | ||
| import java.util.UUID | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} | ||
| import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder} | ||
| import org.apache.commons.io.FilenameUtils | ||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkException} | ||
| import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} | ||
|
|
@@ -265,9 +268,47 @@ private[spark] class ExecutorPodFactoryImpl( | |
| val executorPodWithNodeAffinity = | ||
| nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( | ||
| executorPodWithInitContainer, nodeToLocalTaskCount) | ||
| new PodBuilder(executorPodWithNodeAffinity) | ||
|
|
||
| val (executorPodWithTempLocalDirs, executorContainerWithTempLocalDirs) = | ||
| if (shuffleManager.isEmpty) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we do not want this check.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The check needs to be a little more elaborate than that, I think. The problem is that in and then this: Given these two blocks, sometimes the local dirs will be mounted as hostPath volumes. However, if the shuffle dirs are specified in the @foxish for thoughts - what's the purpose of having a separate configuration for shuffle directories specific to Kubernetes mode? That is - why do we expose
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Yes, I guess we could be better off by using spark.local.dir instead of KUBERNETES_SHUFFLE_DIR. But if the user specifies spark.local.dir, should we interpret it as
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There might be one benefit of having KUBERNETES_SHUFFLE_DIR as a separate option. If we use spark.local.dir, then the executor block managers may end up storing RDD caches in the same hostPath that the external shuffle service is overseeing. That could make block managers and external shuffle service step on each other's feet. Like deleting files in the wrong way as clean-up effort.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I took a closer look at how things work here. These are the factors we need to consider:
I thus propose the following:
The downside here is that the directories used for the cache data will also be known and written to the same location as those for the shuffle data. However - I argue that this is already equal to the status quo, in that the shuffle service used in other cluster managers run into the same issues with e.g. disk space.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the proposal. It's not as clean as we would want eventually, but it's a progress nonetheless from where we are. e.g. I can't find how KUBERNETES_SHUFFLE_DIR != spark.local.dir would work and maybe the proposal is fixing a bug in that edge case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I chatted with Matt offline and I'm +1 on doing the following in this PR:
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I gave some more thoughts on this. Logically there are two different work dirs, one for shuffle data and the other for spill data. So it'd be ideal to have two separate config keys. In case of internal shuffle service is used, it's fine for the two to be same. For external shuffle service, the two are better to be separate. The spark core did not distinguish this and is causing us confusion. For instance, it's still not clear to me how to inform shuffle code to use a separate dir different from spark.local.dir. I'm personally fine with a TODO addressing this need. @foxish, I wonder what you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The TODO reminds us to revisit this when moving into upstream -- as we do that upstreaming I'd like to revisit this issue and potentially propose a change to core Spark to split shuffle data from spill data. In k8s it's advantageous to have these on different disk types, since shuffle data should persist with the lifetime of the job (shuffle service), whereas spill data is not currently reusable through executor failures. Shuffle should be on something more lasting (like a HostPath) and spill should be on something more ephemeral (like an EmptyDir).
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| // If we're not using the external shuffle manager, we should use emptyDir volumes for | ||
| // shuffle directories since it's important for disk I/O for these directories to be | ||
| // performant. If the user has not provided a local directory, instead of using the | ||
| // Java temporary directory, we create one instead, because we want to avoid | ||
| // mounting an emptyDir which overlaps with an existing path in the Docker image. | ||
| // Java's temporary directory path is typically /tmp or a similar path, which is | ||
| // likely to exist in most images. | ||
| val resolvedLocalDirs = Utils.getConfiguredLocalDirs(sparkConf) | ||
| val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => | ||
| new VolumeBuilder() | ||
| .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") | ||
| .withNewEmptyDir().endEmptyDir() | ||
| .build() | ||
| } | ||
| val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map { | ||
| case (volume, path) => | ||
| new VolumeMountBuilder() | ||
| .withName(volume.getName) | ||
| .withMountPath(path) | ||
| .build() | ||
| } | ||
| // Setting the SPARK_LOCAL_DIRS environment variable will force the executor to use the | ||
| // generated directory if the user did not provide one, as opposed to using the Java | ||
| // temporary directory. This also overrides the value of spark.local.dir in SparkConf, | ||
| // which is intended. See Utils#getConfiguredLocalDirs(). | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why set
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. Hope we can stick to one mechanism.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't set
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the question boils down to whether or not we want the driver to also have emptyDir volume mounts for its local directories. This didn't seem necessary since the driver should not be doing any write-heavy operations.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I wonder what's the K8s overhead of creating unused emptyDir for the driver. @foxish? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suspect the driver does have some operations where its block manager writes to local disk, like large If it's a good idea to put executors local dirs onto an EmptyDir, seems like it would be a good idea to put driver's local dirs onto an EmptyDir too for consistency
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a potential mismatch here though. In the case of using the external shuffle service, we won't be mounting the local directories as
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM. It's already messy a bit and we're not making it worse.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the code level - we probably don't want to use a bootstrap module that's shared between the submission client and the driver, since the logic is different enough between the two. |
||
| (new PodBuilder(executorPodWithNodeAffinity) | ||
| .editSpec() | ||
| .addToVolumes(localDirVolumes: _*) | ||
| .endSpec() | ||
| .build(), | ||
| new ContainerBuilder(initBootstrappedExecutorContainer) | ||
| .addToVolumeMounts(localDirVolumeMounts: _*) | ||
| .build()) | ||
| } else (executorPodWithNodeAffinity, initBootstrappedExecutorContainer) | ||
|
|
||
| new PodBuilder(executorPodWithTempLocalDirs) | ||
| .editSpec() | ||
| .addToContainers(initBootstrappedExecutorContainer) | ||
| .addToContainers(executorContainerWithTempLocalDirs) | ||
| .endSpec() | ||
| .build() | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -113,15 +113,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit | |
| Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), | ||
| Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) | ||
|
|
||
| val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) { | ||
| val kubernetesShuffleManager = if (sparkConf.get( | ||
| org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) { | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is technically the correct thing to do, because the shuffle service is allowed to be used independently of dynamic allocation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
| val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl( | ||
| SparkTransportConf.fromSparkConf(sparkConf, "shuffle"), | ||
| sc.env.securityManager, | ||
| sc.env.securityManager.isAuthenticationEnabled()) | ||
| SparkTransportConf.fromSparkConf(sparkConf, "shuffle"), | ||
| sc.env.securityManager, | ||
| sc.env.securityManager.isAuthenticationEnabled()) | ||
| Some(new KubernetesExternalShuffleManagerImpl( | ||
| sparkConf, | ||
| kubernetesClient, | ||
| kubernetesExternalShuffleClient)) | ||
| sparkConf, | ||
| kubernetesClient, | ||
| kubernetesExternalShuffleClient)) | ||
| } else None | ||
|
|
||
| val executorPodFactory = new ExecutorPodFactoryImpl( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,9 +67,7 @@ private[spark] class KubernetesExternalShuffleManagerImpl( | |
| s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") | ||
| } | ||
| private val externalShufflePort = sparkConf.getInt("spark.shuffle.service.port", 7337) | ||
| private val shuffleDirs = sparkConf.get(KUBERNETES_SHUFFLE_DIR).map { | ||
| _.split(",") | ||
| }.getOrElse(Utils.getConfiguredLocalDirs(sparkConf)) | ||
| private val shuffleDirs = Utils.getConfiguredLocalDirs(sparkConf) | ||
| private var shufflePodCache = scala.collection.mutable.Map[String, String]() | ||
| private var watcher: Watch = _ | ||
|
|
||
|
|
@@ -140,6 +138,12 @@ private[spark] class KubernetesExternalShuffleManagerImpl( | |
| } | ||
|
|
||
| override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = { | ||
| // TODO: Using hostPath for the local directory will also make it such that the | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. Thanks for the TODO. |
||
| // other uses of the local directory - broadcasting and caching - will also write | ||
| // to the directory that the shuffle service is aware of. It would be better for | ||
| // these directories to be separate so that the lifetime of the non-shuffle scratch | ||
| // space is tied to an emptyDir instead of the hostPath. This requires a change in | ||
| // core Spark as well. | ||
| shuffleDirs.zipWithIndex.map { | ||
| case (shuffleDir, shuffleDirIndex) => | ||
| val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. The documentation looks good. Thanks!