-
Notifications
You must be signed in to change notification settings - Fork 117
Staging server for receiving application dependencies. #212
Conversation
…es' into submission-v2-file-server
|
@foxish @ash211 @erikerlandson @kimoonkim This isn't quite done yet, but I wanted to get the proof of concept out. There's still some work left to do here on the unit testing side. |
| * | ||
| * @param driverPodName Name of the driver pod. | ||
| * @param driverPodNamespace Namespace for the driver pod. | ||
| * @param jars Application jars to upload, compacted together in tar + gzip format. The tarball |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The APIs here are worded in a Spark-opinionated way, but there could also be a world where we just take one tarball as input "files" and just have spark-submit call uploadFiles() twice - once for jars and once for local files. If we were to do this, we would need a different unique key for the application other than name and namespace indicating the pod(s) to watch - probably a label(s).
|
rerun unit tests please |
|
This is pretty much done now as a workable unit. Of course there is still the rest of the submission process that will revolve around this to build but this is the first step. |
|
@ssuchter @erikerlandson @foxish @ash211 Can you guys please take a look at this PR? Essentially I’m looking to prototype what a second version of submission would look like end to end, but will be trying to build it out in increments. We could generalize this service later on, but I would like to get a proof of concept of how Spark could benefit from such a file-staging API. Thus it would be good to implement and review a quick and naive version of the staging-server based submission process, and after completing a prototype think about how we can generalize the concept for Kubernetes applications in general. The desired end result is this: #167 (comment) |
| @Consumes(Array(MediaType.APPLICATION_JSON)) | ||
| @Produces(Array(MediaType.APPLICATION_JSON)) | ||
| @Path("/dependencies/credentials") | ||
| def getKubernetesCredentials( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more I think about this the more I would prefer this to not be part of the API. The submitter can mount the credentials themselves as secrets or else expect the pod to have them in the local disk.
We still want to post them because in the future we can use these credentials to monitor the API server and handle cleaning up the data accordingly.
| @QueryParam("driverPodName") driverPodName: String, | ||
| @QueryParam("driverPodNamespace") driverPodNamespace: String, | ||
| @FormDataParam("jars") jars: InputStream, | ||
| @FormDataParam("files") files: InputStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
love that these are InputStreams now -- no more OOMs!
| private val SPARK_APPLICATION_DEPENDENCIES_LOCK = new Object | ||
| private val SECURE_RANDOM = new SecureRandom() | ||
| // TODO clean up these resources based on the driver's lifecycle | ||
| private val registeredDrivers = mutable.Set.empty[PodNameAndNamespace] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now, using an LRU cache with maximum size and expireAfterWrite of about 7 days and expireAfterRead of about 24 hours at least prevents OOMs in this service
We'll need to rethink this for streaming and other long-running applications though
| val namespaceDir = new File(dependenciesRootDir, podNameAndNamespace.namespace) | ||
| val applicationDir = new File(namespaceDir, podNameAndNamespace.name) | ||
| DIRECTORIES_LOCK.synchronized { | ||
| if (!applicationDir.exists()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to throw here if the directory already exists? that would mean we're about to overwrite files from another upload
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a second iteration of this I'm making the directory have a unique identifier.
| } | ||
| } | ||
| val jarsTgz = new File(applicationDir, "jars.tgz") | ||
| // TODO encrypt the written data with the secret. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this TODO up a line
| * when it runs. | ||
| * | ||
| * @param driverPodName Name of the driver pod. | ||
| * @param driverPodNamespace Namespace for the driver pod. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this take a driver pod name and a driver pod namespace? I'd imagine there would be times when you want to upload resources to this service before you know what the name of your driver pod will be, or maybe even in what namespace it will run in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later on the service should watch the API server for the pod with the given name and namespace, and will clean up the resources being used by that pod once the pod either exits cleanly or fails without a retry for a specified amount of time. This would be more robust than an LRU cache for long-running jobs.
We probably want to be more general than just a pod or a pod namespace, however. Names are dynamic if a replication controller were to be used in the general case, for example. Perhaps monitoring pods with a given label would be better.
| * Retrofit-compatible variant of {@link KubernetesSparkDependencyService}. For documentation on | ||
| * how to use this service, see the aforementioned JAX-RS based interface. | ||
| */ | ||
| private[spark] trait KubernetesSparkDependencyServiceRetrofit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to move these annotations onto the other trait definition? I'm worried this could get out of sync with the other trait if they were changing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't have both Retrofit and Jax-RS on the same trait because they take different types of parameters. Retrofit expects Retrofit-Specific types everywhere while Jax-RS expects POJOs.
|
What do you think of the terminology |
…server' into submission-v2-file-server
| // TODO encrypt the written data with the secret. | ||
| val resourcesTgz = new File(resourcesDir, "resources.tgz") | ||
| Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } | ||
| SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this lock serialize all dependency accesses (read/write) using the staging server, or just for a particular dependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can allow for both - provide it as an optional field in the API and default to the server's config otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Edit: The above comment is for the wrong feedback...
Regarding the lock - it serializes for all but only blocks on making the directories. Would be great to have something that doesn't have to serialize on file system operations, however.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure we need this lock here -- isn't it highly highly unlikely that we'd have an applicationSecret conflict from multiple uploads, given it contains a UUID as well as an additional 1024 bytes of entropy? This is the only writer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If one thread runs mkdir -p a/b and another thread runs mkdir -p a/c, do the two threads attempting to make the parent directory collide?
| * any directories. We take a stream here to avoid holding these entirely in | ||
| * memory. | ||
| * @param podLabels Labels of pods to monitor. When no more pods are running with the given label, | ||
| * after some period of time, these dependencies will be cleared. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after some period of time
We should define how the period of time is defined? Do you see it being a per-file decision, or part of the configuration when the server is started?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can allow for both - provide it as an optional field in the API and default to the server's config otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add the cleanup in a followup rather than block this PR on it -- filed #237 to track
|
rerun unit tests please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah couple minor changes, but this looks good. I'm likely good to merge after your response commit
| * any directories. We take a stream here to avoid holding these entirely in | ||
| * memory. | ||
| * @param podLabels Labels of pods to monitor. When no more pods are running with the given label, | ||
| * after some period of time, these dependencies will be cleared. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add the cleanup in a followup rather than block this PR on it -- filed #237 to track
| } | ||
| } | ||
| // TODO encrypt the written data with the secret. | ||
| val resourcesTgz = new File(resourcesDir, "resources.tgz") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we plan to upload tgz files but from the staging server's perspective it's just byte stream. Perhaps just resources.data would be better here (though certain cli tools used in debugging may work better with .tgz extension)
| } | ||
|
|
||
| override def downloadResources(applicationSecret: String): StreamingOutput = { | ||
| val applicationDependencies = SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want this lock here -- it looks like it would prevent concurrent download of the resources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only locks on accessing the map, not the underlying streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to do this because we don't use a ConcurrentMap here. I don't think we can use a Concurrent Map to get the atomicity we need here either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind, a concurrent map would probably work. We can use scala.collection.concurrent.TrieMap.
| // TODO encrypt the written data with the secret. | ||
| val resourcesTgz = new File(resourcesDir, "resources.tgz") | ||
| Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } | ||
| SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure we need this lock here -- isn't it highly highly unlikely that we'd have an applicationSecret conflict from multiple uploads, given it contains a UUID as well as an additional 1024 bytes of entropy? This is the only writer
| server.addConnector(connector) | ||
| server.setHandler(contextHandler) | ||
| server.start() | ||
| jettyServer = Some(server) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need ssl in here, though I see that's coming in a followup so doesn't block this PR
| import org.apache.spark.util.Utils | ||
|
|
||
| /** | ||
| * Tests for KubernetesSparkDependencyServer and its APIs. Note that this is not an end-to-end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KubernetesSparkDependencyServer has been renamed
| * implementation methods directly as opposed to over HTTP, as well as check the | ||
| * data written to the underlying disk. | ||
| */ | ||
| class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you still need with BeforeAndAfter if you don't override before or after ?
|
@foxish @ash211 @erikerlandson Are there any more comments that I should address for this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM -- will merge today unless I hear otherwise from anyone.
|
Will do around ~5pm Pacific |
| private[spark] trait ResourceStagingServiceRetrofit { | ||
|
|
||
| @Multipart | ||
| @retrofit2.http.PUT("/api/resources/upload") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why the API has /upload and /download here?
I think it is breaking some rules associated with RESTful APIs. I'd have thought more along the lines of:
PUT /api/resources/:resourceID-> (as PUT needs to be idempotent)GET /api/resources-> returns a list of resources (later to be used by some metrics/dashboard/...)GET /api/resources/:resourceID-> same as/downloadhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that the resource ID is sensitive information in this context. So we have to pass the identifier which is a secret through a request body. We can return an identifier token as well as the secret - would that be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partly, the endpoints /upload and /download are expressing things that the HTTP verbs GET/PUT also express. I'm not sure if we want to also have versioning somewhere in that string, to make /api/v2/resources/.... My first thought is that having versioning is probably a bit much for this use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make both of these paths the same and just with different HTTP actions and method signatures in the Java interface? I also think having versioning is a little much for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe then we should use a POST instead, which isn't expected to be idempotent. POST /api/resources would return the secret that needs to be used when fetching it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If/when we want to later list things, it would help to identify each set of files submitted by some token other than the secret.
We can cross that bridge if/when we get there, but I anticipate this service to be very lightweight with only put/get operations on individual bundles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least in the context of Spark - a more generic version of this in upstream Kubernetes could have more features.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it would be hard to change this later, after we've established the API. Unless there is a strong reason not to separate the token and the secret, I feel that we should do it. @ash211, do you have thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is just an early API, so I'd expect it to change somewhat going forward. But we should also try at least somewhat to not break it unnecessarily.
In that spirit, I'd support:
- putting
v0/in the URLs in the (probable) case this changes down the road - splitting the overloaded purpose of the secret into a separate resource ID and resource secret. That way in the future if we need an ID for a highly-available version of this service, or want to log in various places the resource that's being downloaded/uploaded, or want to collect metrics on this service for upload/download counts on a per-resource basis, we don't have to use the sensitive secret as the resource identifier everywhere.
It's going to take some work to plumb the ID+secret everywhere vs just the secret that we use now, but I'm optimistic that it will be ultimately worth it for future flexibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as for ID generation, I think it should be the responsibility of the service to create the ID rather than the client. Otherwise clients need to coordinate so that they don't have ID conflicts. The server is much better able to do this.
|
Thanks for making the change @mccheah. Sorry about the last minute flurry of review comments. LGTM after tests pass. |
* Staging server for receiving application dependencies. * Add unit test for file writing * Minor fixes * Remove getting credentials from the API We still want to post them because in the future we can use these credentials to monitor the API server and handle cleaning up the data accordingly. * Generalize to resource staging server outside of Spark * Update code documentation * Val instead of var * Fix naming, remove unused import * Move suites from integration test package to core * Use TrieMap instead of locks * Address comments * Fix imports * Change paths, use POST instead of PUT * Use a resource identifier as well as a resource secret
…on-k8s#212) * Staging server for receiving application dependencies. * Add unit test for file writing * Minor fixes * Remove getting credentials from the API We still want to post them because in the future we can use these credentials to monitor the API server and handle cleaning up the data accordingly. * Generalize to resource staging server outside of Spark * Update code documentation * Val instead of var * Fix naming, remove unused import * Move suites from integration test package to core * Use TrieMap instead of locks * Address comments * Fix imports * Change paths, use POST instead of PUT * Use a resource identifier as well as a resource secret
No description provided.