-
Couldn't load subscription status.
- Fork 117
Download remotely-located resources on driver startup. #240
Download remotely-located resources on driver startup. #240
Conversation
|
Will work on fixing the unit tests. |
7216b86 to
7bfb342
Compare
Augments the init-container so that we don't need to use a separate image, but on submission two containers are bootstrapped instead for a cleaner architecture.
1849ff3 to
e84737d
Compare
|
This change is ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor suggestions, nothing major. Looks good!
| initContainerImage: String) extends DownloadRemoteDependencyManager { | ||
|
|
||
| private val jarsToDownload = KubernetesFileUtils.getOnlyRemoteFiles(sparkJars) | ||
| private val filesToDownload = KubernetesFileUtils.getOnlyRemoteFiles(sparkFiles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move these into buildInitContainerConfigMap since that's the only place they're used. Otherwise these run in the object constructor which I don't think we want (constructors should be cheap)
| import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder} | ||
|
|
||
| /** | ||
| * Creates a config map from a map object, with a single given key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creates a Kubernetes ConfigMap
| } | ||
|
|
||
| /** | ||
| * Process that fetches files from a resource staging server and/or arbi trary remote locations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: arbitrary
| downloadJarsSecretLocation, | ||
| stagingServerJarsDownloadDir, | ||
| "Starting to download jars from resource staging server...", | ||
| "Finished downloading jars from resource staging server.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this two starting/finished message params to just "jars" or "files" and do the logging in the method. This also lets us drop the last two failure message parameters too right?
| val remoteJarsDownload = Future[Unit] { | ||
| downloadFiles(remoteJars, | ||
| remoteJarsDownloadDir, | ||
| s"Remote jars download directory specified at $remoteJarsDownloadDir does not exist" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this to just "jars" or "files" too
| remoteDependencyManagerProvider = mock[DownloadRemoteDependencyManagerProvider] | ||
| remoteDependencyManager = mock[DownloadRemoteDependencyManager] | ||
| when(remoteDependencyManagerProvider.getDownloadRemoteDependencyManager(any(), any(), any())) | ||
| .thenAnswer(new Answer[DownloadRemoteDependencyManager] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you use ArgumentCaptor.forClass instead of any() to eliminate this custom Answer implementation?
| s"$assetServerUri/${KubernetesSuite.EXAMPLES_JAR_FILE.getName}", | ||
| s"$assetServerUri/${KubernetesSuite.HELPER_JAR_FILE.getName}" | ||
| )) | ||
| runSparkAppAndVerifyCompletion(SparkLauncher.NO_RESOURCE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this PR fix #213 as a side effect?
| import io.fabric8.kubernetes.client.Watcher.Action | ||
| import io.fabric8.kubernetes.client.internal.readiness.Readiness | ||
|
|
||
| private[spark] class SparkReadinessWatcher[T <: HasMetadata] extends Watcher[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem Spark specific, can we call ResourceReadinessWatcher instead?
| val podIP = kubernetesClient.pods().withName(pod.getMetadata.getName).get() | ||
| .getStatus | ||
| .getPodIP | ||
| s"http://$podIP:8080" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pull port 8080 out to a val
|
|
||
| private[spark] val INIT_CONTAINER_REMOTE_JARS = | ||
| ConfigBuilder("spark.kubernetes.driver.initcontainer.remoteJars") | ||
| .doc("Comma-separated list of jar URIs to download in the init-container. This is inferred" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: inferred -> calculated
| private[spark] val DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION = | ||
| ConfigBuilder("spark.kubernetes.driver.mountdependencies.jarsDownloadDir") | ||
| private[spark] val DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION = | ||
| ConfigBuilder("spark.kubernetes.driver.mountdependencies.submittedJars.downloadDir") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The config string is getting long and unwieldy. spark.kubernetes.driver.jars and spark.kubernetes.driver.files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to indicate that this is where the jars are being downloaded to. spark.kubernetes.driver.jars is ambiguous in the sense that it could be jars that need to be uploaded or downloaded or added to its classpath, etc.
| "download-submitted-files" | ||
| private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME = | ||
| "download-submitted-files-properties" | ||
| private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a lot of growth in complexity here in terms of mount paths and other parameters. Is it possible for us to use fewer init-containers, or group these better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could download all of the jars from all locations to the same directory (resource staging server and remote) as well as the files, but I'm concerned about file name conflicts and how to deduplicate those. Perhaps we should explicitly forbid that multiple URIs end with the same file name. @aash for thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having identically named jars coming from different places sounds like an anti-pattern. I don't think prohibiting that is all that bad.
There's already the contract that file names must be unique as they're all downloaded into the cwd
Why do we need two separate init containers? If we group these in terms of intent, we would have one init-container to download dependencies and supply them to the main container. I think it would help if you could explain what the separate init containers do and what problem this solves. |
It's primarily a code-level decision, but it's arguable either way. With the two init-containers we only use an instance of |
|
Thanks good catch @foxish -- I didn't see that we were using two init containers. In my experience init containers add several seconds before pod readiness, something like 3-10 for a Go-based container we've been using internally. So if we want to make job startup time a priority (this is valuable to us at least) I think I'd want to try pretty hard to have only one init container to download resources from both sources (submission server + remote resources). |
|
In my experience, idiomatic applications I've seen so far have a single init container, and it gets harder to debug when there are multiple init containers. |
|
The resultant PR will be rather large, so I apologize for that. But it's the easiest way to make sure we don't create large merge conflicts for ourselves. |
|
Yeah sorry for not jumping on this right away -- we might've been able to catch the 1 vs 2 container design choice earlier. Adding the init container to driver and executor seems semantically close enough that it should go well in one PR (though a large one). Let's do that I think. |
|
There's also #246 but I just did some rebasing locally and found that the merge conflicts resolved there by rearranging the commits wasn't too bad. |
|
Hold off on reviewing anything until I've pushed everything fresh. |
|
Superceded by #251 |
Augments the init-container so that we don't need to use a separate image, but on submission two containers are bootstrapped for a cleaner architecture.