Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Conversation

@mccheah
Copy link

@mccheah mccheah commented Apr 3, 2017

No description provided.

@mccheah
Copy link
Author

mccheah commented Apr 3, 2017

@foxish @ash211 @erikerlandson @kimoonkim This isn't quite done yet, but I wanted to get the proof of concept out. There's still some work left to do here on the unit testing side.

*
* @param driverPodName Name of the driver pod.
* @param driverPodNamespace Namespace for the driver pod.
* @param jars Application jars to upload, compacted together in tar + gzip format. The tarball
Copy link
Author

@mccheah mccheah Apr 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The APIs here are worded in a Spark-opinionated way, but there could also be a world where we just take one tarball as input "files" and just have spark-submit call uploadFiles() twice - once for jars and once for local files. If we were to do this, we would need a different unique key for the application other than name and namespace indicating the pod(s) to watch - probably a label(s).

@mccheah
Copy link
Author

mccheah commented Apr 3, 2017

rerun unit tests please

@mccheah
Copy link
Author

mccheah commented Apr 6, 2017

This is pretty much done now as a workable unit. Of course there is still the rest of the submission process that will revolve around this to build but this is the first step.

@mccheah
Copy link
Author

mccheah commented Apr 6, 2017

@ssuchter @erikerlandson @foxish @ash211

Can you guys please take a look at this PR? Essentially I’m looking to prototype what a second version of submission would look like end to end, but will be trying to build it out in increments. We could generalize this service later on, but I would like to get a proof of concept of how Spark could benefit from such a file-staging API. Thus it would be good to implement and review a quick and naive version of the staging-server based submission process, and after completing a prototype think about how we can generalize the concept for Kubernetes applications in general.

The desired end result is this: #167 (comment)

@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/dependencies/credentials")
def getKubernetesCredentials(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about this the more I would prefer this to not be part of the API. The submitter can mount the credentials themselves as secrets or else expect the pod to have them in the local disk.

mccheah and others added 2 commits April 11, 2017 15:33
We still want to post them because in the future we can use these
credentials to monitor the API server and handle cleaning up the data
accordingly.
@QueryParam("driverPodName") driverPodName: String,
@QueryParam("driverPodNamespace") driverPodNamespace: String,
@FormDataParam("jars") jars: InputStream,
@FormDataParam("files") files: InputStream,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love that these are InputStreams now -- no more OOMs!

private val SPARK_APPLICATION_DEPENDENCIES_LOCK = new Object
private val SECURE_RANDOM = new SecureRandom()
// TODO clean up these resources based on the driver's lifecycle
private val registeredDrivers = mutable.Set.empty[PodNameAndNamespace]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now, using an LRU cache with maximum size and expireAfterWrite of about 7 days and expireAfterRead of about 24 hours at least prevents OOMs in this service

We'll need to rethink this for streaming and other long-running applications though

val namespaceDir = new File(dependenciesRootDir, podNameAndNamespace.namespace)
val applicationDir = new File(namespaceDir, podNameAndNamespace.name)
DIRECTORIES_LOCK.synchronized {
if (!applicationDir.exists()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to throw here if the directory already exists? that would mean we're about to overwrite files from another upload

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a second iteration of this I'm making the directory have a unique identifier.

}
}
val jarsTgz = new File(applicationDir, "jars.tgz")
// TODO encrypt the written data with the secret.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this TODO up a line

* when it runs.
*
* @param driverPodName Name of the driver pod.
* @param driverPodNamespace Namespace for the driver pod.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this take a driver pod name and a driver pod namespace? I'd imagine there would be times when you want to upload resources to this service before you know what the name of your driver pod will be, or maybe even in what namespace it will run in.

Copy link
Author

@mccheah mccheah Apr 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later on the service should watch the API server for the pod with the given name and namespace, and will clean up the resources being used by that pod once the pod either exits cleanly or fails without a retry for a specified amount of time. This would be more robust than an LRU cache for long-running jobs.

We probably want to be more general than just a pod or a pod namespace, however. Names are dynamic if a replication controller were to be used in the general case, for example. Perhaps monitoring pods with a given label would be better.

* Retrofit-compatible variant of {@link KubernetesSparkDependencyService}. For documentation on
* how to use this service, see the aforementioned JAX-RS based interface.
*/
private[spark] trait KubernetesSparkDependencyServiceRetrofit {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move these annotations onto the other trait definition? I'm worried this could get out of sync with the other trait if they were changing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't have both Retrofit and Jax-RS on the same trait because they take different types of parameters. Retrofit expects Retrofit-Specific types everywhere while Jax-RS expects POJOs.

@ash211
Copy link

ash211 commented Apr 12, 2017

What do you think of the terminology ResourceStagingService instead of KubernetesSparkDependencyService?

// TODO encrypt the written data with the secret.
val resourcesTgz = new File(resourcesDir, "resources.tgz")
Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) }
SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this lock serialize all dependency accesses (read/write) using the staging server, or just for a particular dependency?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can allow for both - provide it as an optional field in the API and default to the server's config otherwise.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: The above comment is for the wrong feedback...

Regarding the lock - it serializes for all but only blocks on making the directories. Would be great to have something that doesn't have to serialize on file system operations, however.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure we need this lock here -- isn't it highly highly unlikely that we'd have an applicationSecret conflict from multiple uploads, given it contains a UUID as well as an additional 1024 bytes of entropy? This is the only writer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one thread runs mkdir -p a/b and another thread runs mkdir -p a/c, do the two threads attempting to make the parent directory collide?

* any directories. We take a stream here to avoid holding these entirely in
* memory.
* @param podLabels Labels of pods to monitor. When no more pods are running with the given label,
* after some period of time, these dependencies will be cleared.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after some period of time

We should define how the period of time is defined? Do you see it being a per-file decision, or part of the configuration when the server is started?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can allow for both - provide it as an optional field in the API and default to the server's config otherwise.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the cleanup in a followup rather than block this PR on it -- filed #237 to track

@mccheah
Copy link
Author

mccheah commented Apr 14, 2017

rerun unit tests please

Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah couple minor changes, but this looks good. I'm likely good to merge after your response commit

* any directories. We take a stream here to avoid holding these entirely in
* memory.
* @param podLabels Labels of pods to monitor. When no more pods are running with the given label,
* after some period of time, these dependencies will be cleared.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the cleanup in a followup rather than block this PR on it -- filed #237 to track

}
}
// TODO encrypt the written data with the secret.
val resourcesTgz = new File(resourcesDir, "resources.tgz")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we plan to upload tgz files but from the staging server's perspective it's just byte stream. Perhaps just resources.data would be better here (though certain cli tools used in debugging may work better with .tgz extension)

}

override def downloadResources(applicationSecret: String): StreamingOutput = {
val applicationDependencies = SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want this lock here -- it looks like it would prevent concurrent download of the resources

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only locks on accessing the map, not the underlying streams.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do this because we don't use a ConcurrentMap here. I don't think we can use a Concurrent Map to get the atomicity we need here either.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, a concurrent map would probably work. We can use scala.collection.concurrent.TrieMap.

// TODO encrypt the written data with the secret.
val resourcesTgz = new File(resourcesDir, "resources.tgz")
Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) }
SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure we need this lock here -- isn't it highly highly unlikely that we'd have an applicationSecret conflict from multiple uploads, given it contains a UUID as well as an additional 1024 bytes of entropy? This is the only writer

server.addConnector(connector)
server.setHandler(contextHandler)
server.start()
jettyServer = Some(server)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need ssl in here, though I see that's coming in a followup so doesn't block this PR

#221

import org.apache.spark.util.Utils

/**
* Tests for KubernetesSparkDependencyServer and its APIs. Note that this is not an end-to-end
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KubernetesSparkDependencyServer has been renamed

* implementation methods directly as opposed to over HTTP, as well as check the
* data written to the underlying disk.
*/
class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you still need with BeforeAndAfter if you don't override before or after ?

@mccheah
Copy link
Author

mccheah commented Apr 20, 2017

@foxish @ash211 @erikerlandson Are there any more comments that I should address for this?

Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM -- will merge today unless I hear otherwise from anyone.

@ash211
Copy link

ash211 commented Apr 20, 2017

Will do around ~5pm Pacific

private[spark] trait ResourceStagingServiceRetrofit {

@Multipart
@retrofit2.http.PUT("/api/resources/upload")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why the API has /upload and /download here?
I think it is breaking some rules associated with RESTful APIs. I'd have thought more along the lines of:

  • PUT /api/resources/:resourceID -> (as PUT needs to be idempotent)
  • GET /api/resources -> returns a list of resources (later to be used by some metrics/dashboard/...)
  • GET /api/resources/:resourceID -> same as /download here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the resource ID is sensitive information in this context. So we have to pass the identifier which is a secret through a request body. We can return an identifier token as well as the secret - would that be better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partly, the endpoints /upload and /download are expressing things that the HTTP verbs GET/PUT also express. I'm not sure if we want to also have versioning somewhere in that string, to make /api/v2/resources/.... My first thought is that having versioning is probably a bit much for this use case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make both of these paths the same and just with different HTTP actions and method signatures in the Java interface? I also think having versioning is a little much for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe then we should use a POST instead, which isn't expected to be idempotent. POST /api/resources would return the secret that needs to be used when fetching it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If/when we want to later list things, it would help to identify each set of files submitted by some token other than the secret.

We can cross that bridge if/when we get there, but I anticipate this service to be very lightweight with only put/get operations on individual bundles.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in the context of Spark - a more generic version of this in upstream Kubernetes could have more features.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it would be hard to change this later, after we've established the API. Unless there is a strong reason not to separate the token and the secret, I feel that we should do it. @ash211, do you have thoughts on this?

Copy link

@ash211 ash211 Apr 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just an early API, so I'd expect it to change somewhat going forward. But we should also try at least somewhat to not break it unnecessarily.

In that spirit, I'd support:

  1. putting v0/ in the URLs in the (probable) case this changes down the road
  2. splitting the overloaded purpose of the secret into a separate resource ID and resource secret. That way in the future if we need an ID for a highly-available version of this service, or want to log in various places the resource that's being downloaded/uploaded, or want to collect metrics on this service for upload/download counts on a per-resource basis, we don't have to use the sensitive secret as the resource identifier everywhere.

It's going to take some work to plumb the ID+secret everywhere vs just the secret that we use now, but I'm optimistic that it will be ultimately worth it for future flexibility.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as for ID generation, I think it should be the responsibility of the service to create the ID rather than the client. Otherwise clients need to coordinate so that they don't have ID conflicts. The server is much better able to do this.

@mccheah
Copy link
Author

mccheah commented Apr 20, 2017

@foxish @ash211 made the change to use a resource identifier, please take a look. I also updated everything downstream.

@foxish
Copy link
Member

foxish commented Apr 20, 2017

Thanks for making the change @mccheah. Sorry about the last minute flurry of review comments. LGTM after tests pass.

@ash211 ash211 merged commit 3f6e5ea into branch-2.1-kubernetes Apr 21, 2017
@ash211 ash211 deleted the submission-v2-file-server branch April 21, 2017 06:34
foxish pushed a commit that referenced this pull request Jul 24, 2017
* Staging server for receiving application dependencies.

* Add unit test for file writing

* Minor fixes

* Remove getting credentials from the API

We still want to post them because in the future we can use these
credentials to monitor the API server and handle cleaning up the data
accordingly.

* Generalize to resource staging server outside of Spark

* Update code documentation

* Val instead of var

* Fix naming, remove unused import

* Move suites from integration test package to core

* Use TrieMap instead of locks

* Address comments

* Fix imports

* Change paths, use POST instead of PUT

* Use a resource identifier as well as a resource secret
ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 26, 2019
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
…on-k8s#212)

* Staging server for receiving application dependencies.

* Add unit test for file writing

* Minor fixes

* Remove getting credentials from the API

We still want to post them because in the future we can use these
credentials to monitor the API server and handle cleaning up the data
accordingly.

* Generalize to resource staging server outside of Spark

* Update code documentation

* Val instead of var

* Fix naming, remove unused import

* Move suites from integration test package to core

* Use TrieMap instead of locks

* Address comments

* Fix imports

* Change paths, use POST instead of PUT

* Use a resource identifier as well as a resource secret
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants