Skip to content

Conversation

@dansanduleac
Copy link

@dansanduleac dansanduleac commented Mar 6, 2017

What changes were proposed in this pull request?

Add a CondaRunner abstract class that implements main(String[]) where it sets up a conda environment when certain options are set, then delegates its arguments to an abstract method, along with the optional CondaEnvironment.

Then, changed PythonRunner and RRunner to implement CondaRunner and run their respective subprocess from within the conda env, if one is set up.
NOTE: actually R needs more changes, so for now RRunner will just fail early if conda settings are found.

As for the executor-side logic:

  • Updated context.py with a addCondaPackage method, which will add the package to the locally set up conda environment (if one is defined), and for distribution on the executors
  • Updated PythonRDD to set up the desired conda env (that should match the one on the driver, which includes any additional packages supplied to addCondaPackage) before shelling out to python inside that env

How was this patch tested?

End-to-end python test that

  • bootstraps an env with only python installed (python=3.5)
  • calls addCondaPackage('numpy=1.11.1')
  • attempts to import numpy on the driver which should succeed
  • maps on the executor with a function that attempts to use the numpy import, and asserts that the version is the expected one

End-to-end yarn test that

  • runs in both client and cluster mode
  • essentially very similar to the above - tests that an added package is available on both driver & executor

@dansanduleac dansanduleac force-pushed the ds/conda-runner branch 3 times, most recently from 09bcb29 to 530ec4a Compare March 7, 2017 18:04
robert3005 pushed a commit that referenced this pull request Mar 7, 2017
* Add -DskipTests to dev docs

* Remove extraneous skipTests
@justinuang
Copy link

Might be worth updating the PR message

circle.yml Outdated

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove always_yes, unless we want to force all boxes with conda to have it set as well. Otherwise, our test env might be different from production.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree, already removed in next commit

circle.yml Outdated

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why we update conda? It makes it dynamic and we are sure what is running anymore

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we probably shouldn't.. but it's gonna be harder to avoid that now that, in the next commit, I've changed the whole pyenv setup to install a miniconda3 for me, and use that to create two envs (for python 2.7 and 3.4 with numpy installed on both), rather than pip-installing numpy which takes like 2:30min each.
I noticed when doing it that way it always calls update anyway, so even if I pyenv install miniconda3-4.1.11 it will still automatically upgrade it to 4.3.14 post install, and I couldn't figure out how to disable that.


def create(baseDir: String, bootstrapDeps: Seq[String]): CondaEnvironment = {
require(bootstrapDeps.nonEmpty, "Expected at least one bootstrap dependency.")
// Attempt to create environment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment doesn't seem to describe the next line

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, will move it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, will move it down


val command = Process(
List(condaBinaryPath, "create", "-n", name, "-y", "--override-channels", "-vv")
++: condaChannelUrls.flatMap(Iterator("--channel", _))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe let's add a "--" so someone can't have a bootstrap dep that is "--verbose" or something?

Files.createSymbolicLink(linkedBaseDir, Paths get baseDir)

// Expose the symlinked path, through /tmp
val condaEnvDir = (linkedBaseDir resolve name).toString

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, any reason why we don't do linkedBasedDir.resolve(name).toString instead. What is more idiomatic in scala?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just prefer this style, and it passes scalacheck

var serverSocket: ServerSocket = null
try {
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array[Byte](127, 0, 0, 1)))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this is correct. http://stackoverflow.com/a/24010893

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not changing any behaviour - you'd still have to cast'em if you supply >127 values. But it prevents the line from being red in intellij.

if (useDaemon) {
synchronized {
if (idleWorkers.size > 0) {
if (idleWorkers.nonEmpty) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets try not to do unnecessary refactors if it doesn't have big benefits, it will make it harder to merge upstream changes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we plan to merge this upstream, at least not in its current form

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More worried about upstream changes, which we need to merge into our fork.

builder.redirectOutput(Redirect.INHERIT)
builder.redirectError(Redirect.INHERIT)
sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this anymore?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the behaviour so that stderr & stdout are inherited, rather than both go to parent's stdout, which makes more sense. I could probably change this back

// Time to wait for SparkR backend to initialize in seconds
val backendTimeout = sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
val rCommand = {
val rCommand = maybeConda.map(_.condaEnvDir + "/bin/r").getOrElse {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we want to test and support this, maybe we should just throw an exception if maybeConda is not None.

self._conda_env_manager = self._jvm.CondaEnvironmentManager.fromConf(self._conf._jconf)

self._conda_packages = list()
internal_config = getattr(getattr(self._jvm.org.apache.spark.internal.config, 'package$'),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems wrong. Is there not another way to get the spark conf, perhaps off the jvm SparkContext?

Copy link
Author

@dansanduleac dansanduleac Mar 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the typed way to do it.
We already have the SparkConf (both jvm and the python SparkConf that wraps it), but the problem is I want to use the SparkConf.get(ConfigEntry) accessor, and this is the only way to get the ConfigEntry is this way.
Alternatively I could get it as a string and split on , like they do just below, I just don't like duplicated logic like that.

@dansanduleac dansanduleac force-pushed the ds/conda-runner branch 2 times, most recently from 23ef54a to a167c76 Compare March 12, 2017 19:03
Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first time looking at this thoroughly -- pretty cool!

Have you guys opened an issue upstream for this? Josh Rosen might be interested in this activity

circle.yml Outdated
- "build_classes"
- "build/zinc-0.3.11"
- "build/apache-maven-3.3.9"
- "build/scala-2.11.8"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any way to make these globs so we don't get version mismatches in future upgrades?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no :( i might just cache the whole build directory

// must link in /tmp to reduce path length in case baseDir is very long...
// If baseDir path is too long, this breaks conda's 220 character limit for binary replacement.
// Don't even try to use java.io.tmpdir - yarn sets this to a very long path
val linkedBaseDir = Utils.createTempDir("/tmp", "conda").toPath.resolve("real")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if /tmp is full? We've seen it get full in many places. Is it reasonable to make this configurable via Spark property?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. The important thing is this must be really short.

However note that the risk is not high at all since all I'm doing is creating a directory with a symlink to the real stuff. I'm not actually creating any files directly under that /tmp directory directly.

val exitCode = command.run(collectErrOutToBuffer).exitValue()
if (exitCode != 0) {
throw new SparkException(s"Attempt to $description exited with code: "
+ f"$exitCode%nCommand was: $command%nOutput was:%n${buffer.toString}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it?


def activatedEnvironment(startEnv: Map[String, String] = Map.empty): Map[String, String] = {
require(!startEnv.contains("PATH"),
s"It's not allowed to define PATH in the startEnv, but found: ${startEnv("PATH")}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reword to Defining PATH in a CondaEnvironment's startEnv is prohibited; found PATH=${startEnv("PATH")}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


def fromConf(sparkConf: SparkConf): CondaEnvironmentManager = {
val condaBinaryPath = sparkConf.get(CONDA_BINARY_PATH).getOrElse(
sys.error(s"Expected $CONDA_BINARY_PATH to be set"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do Config items provide the toString you expect? I'd expect this to evaluate to spark.conda.binaryPath

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It evaluates to

s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"

would you rather I just print the key?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect just the key is printed in other places so yes would prefer printing just the key. Feel free to check me on that assumption though if you'd like

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't feel strongly but sure


sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
builder.redirectErrorStream(true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needless diff? just looks like merge conflict potential

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep true

if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") {
cmd = sys.props.getOrElse("spark.r.driver.command", cmd)
val rCommand = maybeConda
.map(_ => sys.error("RRunner doesn't support running from a Conda environment yet."))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't know conda supported R -- pretty cool!
https://www.continuum.io/content/preliminary-support-r-conda

Do you plan on making this work anytime soon?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a need, yes. The total changes should be pretty similar with these ones for python

*/
object RRunner {
def main(args: Array[String]): Unit = {
object RRunner extends CondaRunner {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given this doesn't actually support Conda yet, part of me is inclined to not touch anything in RRunner at all to reduce merge conflict potential. Maybe we only touch this class when we're actually adding conda support for R? Rather than just the message now that SparkR doesn't support running in Conda?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"),

// CondaRunner is meant to own the main() method then delegate to another method
ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.deploy.CondaRunner.main")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding to the bottom is prone to merge conflicts when upstream also adds to the bottom -- add to the middle of the list instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.api.conda.*")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move up a couple lines to maintain alphabetization

*
* This hack is necessary otherwise conda tries to use the homedir for pkgs cache.
*/
private[this] def generateCondarc(baseRoot: Path): Path = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this if we are faking home?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - the motivation is, we need to ensure our fresh cache dir comes first in pkgs_dirs. Conda will try to write to the first cache (by default anaconda_dir/pkgs) if it's writable, and we don't want it writing to the Anaconda dir

@justinuang
Copy link

Other than whether we still need the CONDARC and removing unnecessary diffs to minimize conflicts in the future, looks good!

.stringConf
.createOptional

private[spark] val CONDA_BOOTSTRAP_PACKAGES = ConfigBuilder("spark.conda.bootstrapPackages")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These will almost definitely conflict with upstream changes right? Perhaps we should revert to strings from SparkConf and then splitting on commas.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this conflict?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

@justinuang justinuang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 after removing unnecessary diffs. no need for a re-review from me

Copy link
Author

@dansanduleac dansanduleac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, addressed changes

private[this] val localdirs =
SparkEnv.get.blockManager.diskBlockManager.localDirs.map(f => f.getPath).mkString(",")

private[this] val firstFunc = funcs.head.funcs.head
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

val localDirs = env.blockManager.diskBlockManager.localDirs
val hash = Utils.nonNegativeHash(condaPackages)
val dirId = hash % localDirs.length
Utils.createTempDir(localDirs(dirId).getAbsolutePath, "conda").getAbsolutePath
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Do you want this file to maintain a separate list of dirs that it should clean up itself?


if (CondaEnvironmentManager.isConfigured(sparkConf)) {
val condaBootstrapDeps = sparkConf.get(CONDA_BOOTSTRAP_PACKAGES)
val condaBaseDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), "conda").getAbsolutePath
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VM shutdown

val pythonExec = maybeConda.map { conda =>
presetPythonExec.foreach { exec =>
sys.error(
s"It's forbidden to set the PYSPARK python path when using conda, but found: $exec")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure - I've promoted presetPythonExec to Option[Provenance] which includes where the setting was retrieved from

}
conda.condaEnvDir + "/bin/python"
}.orElse(presetPythonExec)
.getOrElse("python")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That breaks the logic - I want to check there is no preset python exec being set, if I fall back to setting it to python how will I know if the user has set it?


sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
builder.redirectErrorStream(true)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep true

*/
object RRunner {
def main(args: Array[String]): Unit = {
object RRunner extends CondaRunner {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") {
cmd = sys.props.getOrElse("spark.r.driver.command", cmd)
val rCommand = maybeConda
.map(_ => sys.error("RRunner doesn't support running from a Conda environment yet."))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a need, yes. The total changes should be pretty similar with these ones for python

dev/run-tests.py Outdated


def exec_sbt(sbt_args=()):
def exec_sbt(sbt_args=(), exit_on_failure=True):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: all these changes in this file can be reverted once #129 merges, because I've moved the copyTestReportsToCircle logic inside a bespoke test task for circle

ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"),

// CondaRunner is meant to own the main() method then delegate to another method
ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.deploy.CondaRunner.main")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

@dansanduleac
Copy link
Author

So tests can't run anymore for now because anaconda is throttling us and keep getting timeouts... we are looking for a work-around

@justinuang
Copy link

@robert3005 @ash211 can one of you please merge this and cut a release =)

@ash211
Copy link

ash211 commented Mar 17, 2017

@justinuang does middle of next week work for you for a release? We're hoping to have the next release include a batch of k8s stuff that's almost finished with review: #133

@ash211
Copy link

ash211 commented Mar 17, 2017

@dansanduleac can you guys please file a ticket upstream and send in this PR? We're trying really hard to only merge stuff into this repo that has a path to getting merged upstream

@justinuang
Copy link

yea, middle of week works. Just found out we can depend on dev builds =)

@ash211 ash211 changed the title Conda Runner & full Python conda support [SPARK-20001] Conda Runner & full Python conda support Mar 17, 2017
@ash211 ash211 merged commit ab03652 into master Mar 17, 2017
@ash211 ash211 deleted the ds/conda-runner branch March 17, 2017 20:22
mccheah pushed a commit that referenced this pull request Apr 27, 2017
* Add -DskipTests to dev docs

* Remove extraneous skipTests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants