-
Notifications
You must be signed in to change notification settings - Fork 117
Mount emptyDir volumes for temporary directories on executors in static allocation mode. #486
Mount emptyDir volumes for temporary directories on executors in static allocation mode. #486
Conversation
|
I will be running some benchmarks with this on an internal application. @aash and I have a suspicion that this is a bottleneck for some of our workflows. |
|
|
||
| val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) { | ||
| val kubernetesShuffleManager = if (sparkConf.get( | ||
| org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is technically the correct thing to do, because the shuffle service is allowed to be used independently of dynamic allocation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
|
||
| val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) { | ||
| val kubernetesShuffleManager = if (sparkConf.get( | ||
| org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| // the Docker image, which is very likely what would happen if we tried to mount the | ||
| // volume at Java's temporary directory path, which is /tmp in many JDKs. | ||
| val resolvedLocalDirs = sparkConf.get( | ||
| "spark.local.dir", s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${UUID.randomUUID()}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a user has set spark.local.dir they may have a reason for placing the local directory in a particular mount path. Maybe we should only mount the dirs from spark.local.dir as EmptyDir volumes if it's the default value, rather than user-specified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One use case for wanting to mount if they are user specified is if they specified multiple local directories where the backend can delegate the volumes to mount to different disks. This is something we talked about in #260.
| // Setting the SPARK_LOCAL_DIRS environment variable will force the executor to use the | ||
| // generated directory if the user did not provide one, as opposed to using the Java | ||
| // temporary directory. This also overrides the value of spark.local.dir in SparkConf, | ||
| // which is intended. See Utils#getConfiguredLocalDirs(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why set SPARK_LOCAL_DIRS instead of SPARK_EXECUTOR_DIRS or letting that method fall through to read from spark.local.dir ? Seems like we could not set any envvar and have the same result in the executor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Hope we can stick to one mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't set spark.local.dir for the user on the driver side, which means that when the executor picks up the configuration, it will get whatever the driver has set for spark.local.dir. So we want to override that for the executors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the question boils down to whether or not we want the driver to also have emptyDir volume mounts for its local directories. This didn't seem necessary since the driver should not be doing any write-heavy operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I wonder what's the K8s overhead of creating unused emptyDir for the driver. @foxish?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect the driver does have some operations where its block manager writes to local disk, like large take or collect calls.
If it's a good idea to put executors local dirs onto an EmptyDir, seems like it would be a good idea to put driver's local dirs onto an EmptyDir too for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential mismatch here though. In the case of using the external shuffle service, we won't be mounting the local directories as emptyDir volumes on the executors, but rather as hostPath volumes. But we would still mount them as emptyDir volumes for the driver. I suppose if it's reasonable to have inconsistent volume mounts between the driver and the executor then that's fine - this seems like the right semantics anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. It's already messy a bit and we're not making it worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the code level - we probably don't want to use a bootstrap module that's shared between the submission client and the driver, since the logic is different enough between the two.
|
@mccheah build failed with
|
|
+1. Thanks for doing this, @mccheah Would this also apply to block manager dirs that are also local disk IO? (I didn't look at the change in details. I'd love to review the code actually) |
kimoonkim
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, it looks good to me. Thanks for addressing this need!
I left one minor suggestion below. PTAL.
| new PodBuilder(executorPodWithNodeAffinity) | ||
|
|
||
| val (executorPodWithTempLocalDirs, executorContainerWithTempLocalDirs) = | ||
| if (shuffleManager.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we do not want this check. spark.local.dir is also used for block manager that can cache RDD output in local disks, whether or not external shuffle manager is used. From the documentation:
Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check needs to be a little more elaborate than that, I think.
The problem is that in KubernetesExternalShuffleManager, we have this block here:
private val shuffleDirs = sparkConf.get(KUBERNETES_SHUFFLE_DIR).map {
_.split(",")
}.getOrElse(Utils.getConfiguredLocalDirs(sparkConf))
and then this:
override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = {
shuffleDirs.zipWithIndex.map {
case (shuffleDir, shuffleDirIndex) =>
val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}"
val volume = new VolumeBuilder()
.withName(volumeName)
.withNewHostPath(shuffleDir)
.build()
val volumeMount = new VolumeMountBuilder()
.withName(volumeName)
.withMountPath(shuffleDir)
.build()
(volume, volumeMount)
}
}
}
Given these two blocks, sometimes the local dirs will be mounted as hostPath volumes. However, if the shuffle dirs are specified in the KUBERNETES_SHUFFLE_DIR Spark configuration, and these directories are not equal to what's set in spark.local.dir, then spark.local.dir will default to the Docker disk.
@foxish for thoughts - what's the purpose of having a separate configuration for shuffle directories specific to Kubernetes mode? That is - why do we expose KUBERNETES_SHUFFLE_DIR as a separate configuration option instead of falling back to mounting apark.local.dir as hostPath volumes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Yes, I guess we could be better off by using spark.local.dir instead of KUBERNETES_SHUFFLE_DIR.
But if the user specifies spark.local.dir, should we interpret it as hostPath dirs in the cluster node as opposed to dirs in the container? I think that's a sensible interpretation for k8s, but wonder how other people think about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be one benefit of having KUBERNETES_SHUFFLE_DIR as a separate option. If we use spark.local.dir, then the executor block managers may end up storing RDD caches in the same hostPath that the external shuffle service is overseeing. That could make block managers and external shuffle service step on each other's feet. Like deleting files in the wrong way as clean-up effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a closer look at how things work here. These are the factors we need to consider:
- When the external shuffle service is used, the executor advertises its
spark.local.dirvalue (or the equivalent e.g. SPARK_LOCAL_DIR) to the external shuffle service that is colocated on its node. - The executor uses the
spark.local.dirfor both shuffle data and cache data regardless of if the external shuffle service is being used or not. - The shuffle service must share the folders that the executor is using.
I thus propose the following:
- Remove
KUBERNETES_SHUFFLE_DIRand always usespark.local.dirinstead - Mount paths from
spark.local.dirashostPathvolumes if using the external shuffle service. - Mount paths from
spark.local.dirasemptyDirif not using the external shuffle service.
The downside here is that the directories used for the cache data will also be known and written to the same location as those for the shuffle data. However - I argue that this is already equal to the status quo, in that the shuffle service used in other cluster managers run into the same issues with e.g. disk space.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the proposal. It's not as clean as we would want eventually, but it's a progress nonetheless from where we are. e.g. I can't find how KUBERNETES_SHUFFLE_DIR != spark.local.dir would work and maybe the proposal is fixing a bug in that edge case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chatted with Matt offline and I'm +1 on doing the following in this PR:
- removing
KUBERNETES_SHUFFLE_DIR - putting spill / shuffle data onto a HostPath when there's an external shuffle service
- putting spill / shuffle data onto an EmptyDir when there's no external shuffle service
- leaving a TODO in the code to later move spill onto a EmptyDir volume while leaving shuffle on the HostDir (think this is better for perf, doing some direct disk benchmarks in k8s now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave some more thoughts on this. Logically there are two different work dirs, one for shuffle data and the other for spill data. So it'd be ideal to have two separate config keys.
In case of internal shuffle service is used, it's fine for the two to be same. For external shuffle service, the two are better to be separate. The spark core did not distinguish this and is causing us confusion. For instance, it's still not clear to me how to inform shuffle code to use a separate dir different from spark.local.dir.
I'm personally fine with a TODO addressing this need. @foxish, I wonder what you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TODO reminds us to revisit this when moving into upstream -- as we do that upstreaming I'd like to revisit this issue and potentially propose a change to core Spark to split shuffle data from spill data.
In k8s it's advantageous to have these on different disk types, since shuffle data should persist with the lifetime of the job (shuffle service), whereas spill data is not currently reusable through executor failures. Shuffle should be on something more lasting (like a HostPath) and spill should be on something more ephemeral (like an EmptyDir).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Setting the SPARK_LOCAL_DIRS environment variable will force the executor to use the | ||
| // generated directory if the user did not provide one, as opposed to using the Java | ||
| // temporary directory. This also overrides the value of spark.local.dir in SparkConf, | ||
| // which is intended. See Utils#getConfiguredLocalDirs(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Hope we can stick to one mechanism.
|
Missing a unit test for the new configuration step. Flow has changed; now we set |
kimoonkim
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for writing this change!
| * | ||
| * Note that we have different semantics for scratch space in Kubernetes versus the other cluster | ||
| * managers. In Kubernetes, we cannot allow the local directories to resolve to the Java temporary | ||
| * directory. This is because we will mount either emptyDir volumes for both the driver and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean "Because of this, we will mount ..."? This and the previous sentences don't parse well together.
| } | ||
|
|
||
| override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = { | ||
| // TODO: Using hostPath for the local directory will also make it such that the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Thanks for the TODO.
| val isUsingExternalShuffle = submissionSparkConf.get( | ||
| org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED) | ||
| val resolvedLocalDirsSingleString = if (isUsingExternalShuffle) { | ||
| require(configuredLocalDirs.isDefined, "spark.local.dir must be provided explicitly when" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change the doc (link) for using dynamic allocation as well to make this requirement clear?
|
@kimoonkim good call - I added some documentation to clarify how |
| --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.2.0" \ | ||
| local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.3.0.jar 10 400000 2 | ||
|
|
||
| The external shuffle service has to mount directories that can be shared with the executor pods. The provided example |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. The documentation looks good. Thanks!
|
We did some benchmarks in EC2 and found that disk performance inside a k8s 430 vs 960 mbps write throughput Given this, I'm fully convinced that placing shuffle and spill data on a k8s |
|
Need to rebase this onto |
|
Replaced by #522 |
This avoids annoying issues with IDE integration and building where you have to remember incantation to run correct combination. This entails following changes * Hadoop default changed from 2.7.4 to 2.9.1 * yarn, kubernetes, hadoop-cloud and kinesis modules are on by default SparkR is left out since it requries a bit more invasive changes to enable by default
Closes #439.
This might prove to be important for performance, especially in shuffle-heavy computations where the executors perform a large amount of disk I/O. We only provision these volumes in static allocation mode without using the shuffle service because using a shuffle service requires mounting hostPath volumes, instead.