-
Notifications
You must be signed in to change notification settings - Fork 51
[SPARK-20001] Conda Runner & full Python conda support #115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
09bcb29 to
530ec4a
Compare
* Add -DskipTests to dev docs * Remove extraneous skipTests
|
Might be worth updating the PR message |
…Also install the conda environment on python executors
…d being final for some reason
a2b6204 to
f6460e1
Compare
0dc941c to
b395ed1
Compare
circle.yml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets remove always_yes, unless we want to force all boxes with conda to have it set as well. Otherwise, our test env might be different from production.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i agree, already removed in next commit
circle.yml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why we update conda? It makes it dynamic and we are sure what is running anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we probably shouldn't.. but it's gonna be harder to avoid that now that, in the next commit, I've changed the whole pyenv setup to install a miniconda3 for me, and use that to create two envs (for python 2.7 and 3.4 with numpy installed on both), rather than pip-installing numpy which takes like 2:30min each.
I noticed when doing it that way it always calls update anyway, so even if I pyenv install miniconda3-4.1.11 it will still automatically upgrade it to 4.3.14 post install, and I couldn't figure out how to disable that.
|
|
||
| def create(baseDir: String, bootstrapDeps: Seq[String]): CondaEnvironment = { | ||
| require(bootstrapDeps.nonEmpty, "Expected at least one bootstrap dependency.") | ||
| // Attempt to create environment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment doesn't seem to describe the next line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, will move it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, will move it down
|
|
||
| val command = Process( | ||
| List(condaBinaryPath, "create", "-n", name, "-y", "--override-channels", "-vv") | ||
| ++: condaChannelUrls.flatMap(Iterator("--channel", _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe let's add a "--" so someone can't have a bootstrap dep that is "--verbose" or something?
| Files.createSymbolicLink(linkedBaseDir, Paths get baseDir) | ||
|
|
||
| // Expose the symlinked path, through /tmp | ||
| val condaEnvDir = (linkedBaseDir resolve name).toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general, any reason why we don't do linkedBasedDir.resolve(name).toString instead. What is more idiomatic in scala?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just prefer this style, and it passes scalacheck
| var serverSocket: ServerSocket = null | ||
| try { | ||
| serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) | ||
| serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array[Byte](127, 0, 0, 1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure this is correct. http://stackoverflow.com/a/24010893
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not changing any behaviour - you'd still have to cast'em if you supply >127 values. But it prevents the line from being red in intellij.
| if (useDaemon) { | ||
| synchronized { | ||
| if (idleWorkers.size > 0) { | ||
| if (idleWorkers.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets try not to do unnecessary refactors if it doesn't have big benefits, it will make it harder to merge upstream changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we plan to merge this upstream, at least not in its current form
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More worried about upstream changes, which we need to merge into our fork.
| builder.redirectOutput(Redirect.INHERIT) | ||
| builder.redirectError(Redirect.INHERIT) | ||
| sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _)) | ||
| builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the behaviour so that stderr & stdout are inherited, rather than both go to parent's stdout, which makes more sense. I could probably change this back
| // Time to wait for SparkR backend to initialize in seconds | ||
| val backendTimeout = sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt | ||
| val rCommand = { | ||
| val rCommand = maybeConda.map(_.condaEnvDir + "/bin/r").getOrElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless we want to test and support this, maybe we should just throw an exception if maybeConda is not None.
python/pyspark/context.py
Outdated
| self._conda_env_manager = self._jvm.CondaEnvironmentManager.fromConf(self._conf._jconf) | ||
|
|
||
| self._conda_packages = list() | ||
| internal_config = getattr(getattr(self._jvm.org.apache.spark.internal.config, 'package$'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems wrong. Is there not another way to get the spark conf, perhaps off the jvm SparkContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the typed way to do it.
We already have the SparkConf (both jvm and the python SparkConf that wraps it), but the problem is I want to use the SparkConf.get(ConfigEntry) accessor, and this is the only way to get the ConfigEntry is this way.
Alternatively I could get it as a string and split on , like they do just below, I just don't like duplicated logic like that.
23ef54a to
a167c76
Compare
…, will do that later
a167c76 to
aabe5fc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My first time looking at this thoroughly -- pretty cool!
Have you guys opened an issue upstream for this? Josh Rosen might be interested in this activity
circle.yml
Outdated
| - "build_classes" | ||
| - "build/zinc-0.3.11" | ||
| - "build/apache-maven-3.3.9" | ||
| - "build/scala-2.11.8" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any way to make these globs so we don't get version mismatches in future upgrades?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no :( i might just cache the whole build directory
| // must link in /tmp to reduce path length in case baseDir is very long... | ||
| // If baseDir path is too long, this breaks conda's 220 character limit for binary replacement. | ||
| // Don't even try to use java.io.tmpdir - yarn sets this to a very long path | ||
| val linkedBaseDir = Utils.createTempDir("/tmp", "conda").toPath.resolve("real") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if /tmp is full? We've seen it get full in many places. Is it reasonable to make this configurable via Spark property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could. The important thing is this must be really short.
However note that the risk is not high at all since all I'm doing is creating a directory with a symlink to the real stuff. I'm not actually creating any files directly under that /tmp directory directly.
| val exitCode = command.run(collectErrOutToBuffer).exitValue() | ||
| if (exitCode != 0) { | ||
| throw new SparkException(s"Attempt to $description exited with code: " | ||
| + f"$exitCode%nCommand was: $command%nOutput was:%n${buffer.toString}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it?
|
|
||
| def activatedEnvironment(startEnv: Map[String, String] = Map.empty): Map[String, String] = { | ||
| require(!startEnv.contains("PATH"), | ||
| s"It's not allowed to define PATH in the startEnv, but found: ${startEnv("PATH")}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reword to Defining PATH in a CondaEnvironment's startEnv is prohibited; found PATH=${startEnv("PATH")}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
|
||
| def fromConf(sparkConf: SparkConf): CondaEnvironmentManager = { | ||
| val condaBinaryPath = sparkConf.get(CONDA_BINARY_PATH).getOrElse( | ||
| sys.error(s"Expected $CONDA_BINARY_PATH to be set")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do Config items provide the toString you expect? I'd expect this to evaluate to spark.conda.binaryPath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It evaluates to
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"would you rather I just print the key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect just the key is printed in other places so yes would prefer printing just the key. Feel free to check me on that assumption though if you'd like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't feel strongly but sure
|
|
||
| sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _)) | ||
| builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize | ||
| builder.redirectErrorStream(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needless diff? just looks like merge conflict potential
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep true
| if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") { | ||
| cmd = sys.props.getOrElse("spark.r.driver.command", cmd) | ||
| val rCommand = maybeConda | ||
| .map(_ => sys.error("RRunner doesn't support running from a Conda environment yet.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't know conda supported R -- pretty cool!
https://www.continuum.io/content/preliminary-support-r-conda
Do you plan on making this work anytime soon?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a need, yes. The total changes should be pretty similar with these ones for python
| */ | ||
| object RRunner { | ||
| def main(args: Array[String]): Unit = { | ||
| object RRunner extends CondaRunner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given this doesn't actually support Conda yet, part of me is inclined to not touch anything in RRunner at all to reduce merge conflict potential. Maybe we only touch this class when we're actually adding conda support for R? Rather than just the message now that SparkR doesn't support running in Conda?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
project/MimaExcludes.scala
Outdated
| ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"), | ||
|
|
||
| // CondaRunner is meant to own the main() method then delegate to another method | ||
| ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.deploy.CondaRunner.main") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding to the bottom is prone to merge conflicts when upstream also adds to the bottom -- add to the middle of the list instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
python/pyspark/java_gateway.py
Outdated
| java_import(gateway.jvm, "org.apache.spark.SparkConf") | ||
| java_import(gateway.jvm, "org.apache.spark.api.java.*") | ||
| java_import(gateway.jvm, "org.apache.spark.api.python.*") | ||
| java_import(gateway.jvm, "org.apache.spark.api.conda.*") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move up a couple lines to maintain alphabetization
| * | ||
| * This hack is necessary otherwise conda tries to use the homedir for pkgs cache. | ||
| */ | ||
| private[this] def generateCondarc(baseRoot: Path): Path = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this if we are faking home?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes - the motivation is, we need to ensure our fresh cache dir comes first in pkgs_dirs. Conda will try to write to the first cache (by default anaconda_dir/pkgs) if it's writable, and we don't want it writing to the Anaconda dir
|
Other than whether we still need the CONDARC and removing unnecessary diffs to minimize conflicts in the future, looks good! |
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val CONDA_BOOTSTRAP_PACKAGES = ConfigBuilder("spark.conda.bootstrapPackages") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These will almost definitely conflict with upstream changes right? Perhaps we should revert to strings from SparkConf and then splitting on commas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this conflict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 after removing unnecessary diffs. no need for a re-review from me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright, addressed changes
| private[this] val localdirs = | ||
| SparkEnv.get.blockManager.diskBlockManager.localDirs.map(f => f.getPath).mkString(",") | ||
|
|
||
| private[this] val firstFunc = funcs.head.funcs.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
| val localDirs = env.blockManager.diskBlockManager.localDirs | ||
| val hash = Utils.nonNegativeHash(condaPackages) | ||
| val dirId = hash % localDirs.length | ||
| Utils.createTempDir(localDirs(dirId).getAbsolutePath, "conda").getAbsolutePath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. Do you want this file to maintain a separate list of dirs that it should clean up itself?
|
|
||
| if (CondaEnvironmentManager.isConfigured(sparkConf)) { | ||
| val condaBootstrapDeps = sparkConf.get(CONDA_BOOTSTRAP_PACKAGES) | ||
| val condaBaseDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), "conda").getAbsolutePath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VM shutdown
| val pythonExec = maybeConda.map { conda => | ||
| presetPythonExec.foreach { exec => | ||
| sys.error( | ||
| s"It's forbidden to set the PYSPARK python path when using conda, but found: $exec") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure - I've promoted presetPythonExec to Option[Provenance] which includes where the setting was retrieved from
| } | ||
| conda.condaEnvDir + "/bin/python" | ||
| }.orElse(presetPythonExec) | ||
| .getOrElse("python") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That breaks the logic - I want to check there is no preset python exec being set, if I fall back to setting it to python how will I know if the user has set it?
|
|
||
| sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _)) | ||
| builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize | ||
| builder.redirectErrorStream(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep true
| */ | ||
| object RRunner { | ||
| def main(args: Array[String]): Unit = { | ||
| object RRunner extends CondaRunner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
| if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") { | ||
| cmd = sys.props.getOrElse("spark.r.driver.command", cmd) | ||
| val rCommand = maybeConda | ||
| .map(_ => sys.error("RRunner doesn't support running from a Conda environment yet.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a need, yes. The total changes should be pretty similar with these ones for python
dev/run-tests.py
Outdated
|
|
||
|
|
||
| def exec_sbt(sbt_args=()): | ||
| def exec_sbt(sbt_args=(), exit_on_failure=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: all these changes in this file can be reverted once #129 merges, because I've moved the copyTestReportsToCircle logic inside a bespoke test task for circle
project/MimaExcludes.scala
Outdated
| ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"), | ||
|
|
||
| // CondaRunner is meant to own the main() method then delegate to another method | ||
| ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.deploy.CondaRunner.main") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
|
So tests can't run anymore for now because anaconda is throttling us and keep getting timeouts... we are looking for a work-around |
|
@robert3005 @ash211 can one of you please merge this and cut a release =) |
|
@justinuang does middle of next week work for you for a release? We're hoping to have the next release include a batch of k8s stuff that's almost finished with review: #133 |
|
@dansanduleac can you guys please file a ticket upstream and send in this PR? We're trying really hard to only merge stuff into this repo that has a path to getting merged upstream |
|
yea, middle of week works. Just found out we can depend on dev builds =) |
* Add -DskipTests to dev docs * Remove extraneous skipTests
What changes were proposed in this pull request?
Add a CondaRunner abstract class that implements
main(String[])where it sets up a conda environment when certain options are set, then delegates its arguments to an abstract method, along with the optionalCondaEnvironment.Then, changed
PythonRunnerandRRunnerto implementCondaRunnerand run their respective subprocess from within the conda env, if one is set up.NOTE: actually R needs more changes, so for now
RRunnerwill just fail early if conda settings are found.As for the executor-side logic:
context.pywith aaddCondaPackagemethod, which will add the package to the locally set up conda environment (if one is defined), and for distribution on the executorsPythonRDDto set up the desired conda env (that should match the one on the driver, which includes any additional packages supplied toaddCondaPackage) before shelling out to python inside that envHow was this patch tested?
End-to-end python test that
python=3.5)addCondaPackage('numpy=1.11.1')End-to-end yarn test that