-
Notifications
You must be signed in to change notification settings - Fork 51
[SPARK-20001] Conda Runner & full Python conda support #115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
97a9c6d
225eeef
63d5d87
64de33b
bf2aec7
cc23dfb
528d12c
c0440dc
78b7fea
a4f2c72
b395ed1
3094034
1b79cde
5c7ebe5
36cef20
9633678
20d1227
7b12736
aabe5fc
c51eebd
ce9020a
8843ed6
55ce811
0006321
c377c62
8d2123a
ab66b63
8a14bc1
26f3870
d27ffcf
441f1d5
573d8e8
3f8f700
84c8da4
8faea04
075e65a
f9ee56b
f03a35c
ea05895
9b9b579
930186f
7570365
4de6f96
7832976
2430478
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.api.conda | ||
|
|
||
| import java.io.File | ||
| import java.nio.file.Path | ||
| import java.util.{Map => JMap} | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
|
|
||
| /** | ||
| * A stateful class that describes a Conda environment and also keeps track of packages that have | ||
| * been added, as well as additional channels. | ||
| * | ||
| * @param rootPath The root path under which envs/ and pkgs/ are located. | ||
| * @param envName The name of the environment. | ||
| */ | ||
| final class CondaEnvironment(val manager: CondaEnvironmentManager, | ||
| val rootPath: Path, | ||
| val envName: String, | ||
| bootstrapPackages: Seq[String], | ||
| bootstrapChannels: Seq[String]) extends Logging { | ||
|
|
||
| import CondaEnvironment._ | ||
|
|
||
| private[this] val packages = mutable.Buffer(bootstrapPackages: _*) | ||
| private[this] val channels = bootstrapChannels.toBuffer | ||
|
|
||
| val condaEnvDir: Path = rootPath.resolve("envs").resolve(envName) | ||
|
|
||
| def activatedEnvironment(startEnv: Map[String, String] = Map.empty): Map[String, String] = { | ||
| require(!startEnv.contains("PATH"), "Defining PATH in a CondaEnvironment's startEnv is " + | ||
| s"prohibited; found PATH=${startEnv("PATH")}") | ||
| import collection.JavaConverters._ | ||
| val newVars = System.getenv().asScala.toIterator ++ startEnv ++ List( | ||
| "CONDA_PREFIX" -> condaEnvDir.toString, | ||
| "CONDA_DEFAULT_ENV" -> condaEnvDir.toString, | ||
| "PATH" -> (condaEnvDir.resolve("bin").toString + | ||
| sys.env.get("PATH").map(File.pathSeparator + _).getOrElse("")) | ||
| ) | ||
| newVars.toMap | ||
| } | ||
|
|
||
| def addChannel(url: String): Unit = { | ||
| channels += url | ||
| } | ||
|
|
||
| def installPackages(packages: Seq[String]): Unit = { | ||
| manager.runCondaProcess(rootPath, | ||
| List("install", "-n", envName, "-y", "--override-channels") | ||
| ::: channels.iterator.flatMap(Iterator("--channel", _)).toList | ||
| ::: "--" :: packages.toList, | ||
| description = s"install dependencies in conda env $condaEnvDir" | ||
| ) | ||
|
|
||
| this.packages ++= packages | ||
| } | ||
|
|
||
| /** | ||
| * Clears the given java environment and replaces all variables with the environment | ||
| * produced after calling `activate` inside this conda environment. | ||
| */ | ||
| def initializeJavaEnvironment(env: JMap[String, String]): Unit = { | ||
| env.clear() | ||
| val activatedEnv = activatedEnvironment() | ||
| activatedEnv.foreach { case (k, v) => env.put(k, v) } | ||
| logDebug(s"Initialised environment from conda: $activatedEnv") | ||
| } | ||
|
|
||
| /** | ||
| * This is for sending the instructions to the executors so they can replicate the same steps. | ||
| */ | ||
| def buildSetupInstructions: CondaSetupInstructions = { | ||
| CondaSetupInstructions(packages.toList, channels.toList) | ||
| } | ||
| } | ||
|
|
||
| object CondaEnvironment { | ||
| case class CondaSetupInstructions(packages: Seq[String], channels: Seq[String]) { | ||
| require(channels.nonEmpty) | ||
| require(packages.nonEmpty) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.api.conda | ||
|
|
||
| import java.nio.file.Files | ||
| import java.nio.file.Path | ||
| import java.nio.file.Paths | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.sys.process.BasicIO | ||
| import scala.sys.process.Process | ||
| import scala.sys.process.ProcessBuilder | ||
| import scala.sys.process.ProcessIO | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config.CONDA_BINARY_PATH | ||
| import org.apache.spark.internal.config.CONDA_CHANNEL_URLS | ||
| import org.apache.spark.internal.config.CONDA_VERBOSITY | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| final class CondaEnvironmentManager(condaBinaryPath: String, condaChannelUrls: Seq[String], | ||
| verbosity: Int = 0) | ||
| extends Logging { | ||
|
|
||
| require(condaChannelUrls.nonEmpty, "Can't have an empty list of conda channel URLs") | ||
| require(verbosity >= 0 && verbosity <= 3, "Verbosity must be between 0 and 3 inclusively") | ||
|
|
||
| def create( | ||
| baseDir: String, | ||
| bootstrapPackages: Seq[String]): CondaEnvironment = { | ||
| require(bootstrapPackages.nonEmpty, "Expected at least one bootstrap package.") | ||
| val name = "conda-env" | ||
|
|
||
| // must link in /tmp to reduce path length in case baseDir is very long... | ||
| // If baseDir path is too long, this breaks conda's 220 character limit for binary replacement. | ||
| // Don't even try to use java.io.tmpdir - yarn sets this to a very long path | ||
| val linkedBaseDir = Utils.createTempDir("/tmp", "conda").toPath.resolve("real") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could. The important thing is this must be really short. However note that the risk is not high at all since all I'm doing is creating a directory with a symlink to the real stuff. I'm not actually creating any files directly under that /tmp directory directly. |
||
| logInfo(s"Creating symlink $linkedBaseDir -> $baseDir") | ||
| Files.createSymbolicLink(linkedBaseDir, Paths.get(baseDir)) | ||
|
|
||
| val verbosityFlags = 0.until(verbosity).map(_ => "-v").toList | ||
|
|
||
| // Attempt to create environment | ||
| runCondaProcess( | ||
| linkedBaseDir, | ||
| List("create", "-n", name, "-y", "--override-channels", "--no-default-packages") | ||
| ::: verbosityFlags | ||
| ::: condaChannelUrls.flatMap(Iterator("--channel", _)).toList | ||
| ::: "--" :: bootstrapPackages.toList, | ||
| description = "create conda env" | ||
| ) | ||
|
|
||
| new CondaEnvironment(this, linkedBaseDir, name, bootstrapPackages, condaChannelUrls) | ||
| } | ||
|
|
||
| /** | ||
| * Create a condarc that only exposes package and env directories under the given baseRoot, | ||
| * on top of the from the default pkgs directory inferred from condaBinaryPath. | ||
| * | ||
| * The file will be placed directly inside the given `baseRoot` dir, and link to `baseRoot/pkgs` | ||
| * as the first package cache. | ||
| * | ||
| * This hack is necessary otherwise conda tries to use the homedir for pkgs cache. | ||
| */ | ||
| private[this] def generateCondarc(baseRoot: Path): Path = { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need this if we are faking home? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes - the motivation is, we need to ensure our fresh cache dir comes first in |
||
| val condaPkgsPath = Paths.get(condaBinaryPath).getParent.getParent.resolve("pkgs") | ||
| val condarc = baseRoot.resolve("condarc") | ||
| val condarcContents = | ||
| s"""pkgs_dirs: | ||
| | - $baseRoot/pkgs | ||
| | - $condaPkgsPath | ||
| |envs_dirs: | ||
| | - $baseRoot/envs | ||
| |show_channel_urls: false | ||
| """.stripMargin | ||
| Files.write(condarc, List(condarcContents).asJava) | ||
| logInfo(f"Using condarc at $condarc:%n$condarcContents") | ||
| condarc | ||
| } | ||
|
|
||
| private[conda] def runCondaProcess(baseRoot: Path, | ||
| args: List[String], | ||
| description: String): Unit = { | ||
| val condarc = generateCondarc(baseRoot) | ||
| val fakeHomeDir = baseRoot.resolve("home") | ||
| // Attempt to create fake home dir | ||
| Files.createDirectories(fakeHomeDir) | ||
|
|
||
| val extraEnv = List( | ||
| "CONDARC" -> condarc.toString, | ||
| "HOME" -> fakeHomeDir.toString | ||
| ) | ||
|
|
||
| val command = Process( | ||
| condaBinaryPath :: args, | ||
| None, | ||
| extraEnv: _* | ||
| ) | ||
|
|
||
| logInfo(s"About to execute $command with environment $extraEnv") | ||
| runOrFail(command, description) | ||
| logInfo(s"Successfully executed $command with environment $extraEnv") | ||
| } | ||
|
|
||
| private[this] def runOrFail(command: ProcessBuilder, description: String): Unit = { | ||
| val buffer = new StringBuffer | ||
| val collectErrOutToBuffer = new ProcessIO( | ||
| BasicIO.input(false), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation |
||
| BasicIO.processFully(buffer), | ||
| BasicIO.processFully(buffer)) | ||
| val exitCode = command.run(collectErrOutToBuffer).exitValue() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Log that it finished? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| if (exitCode != 0) { | ||
| throw new SparkException(s"Attempt to $description exited with code: " | ||
| + f"$exitCode%nCommand was: $command%nOutput was:%n${buffer.toString}") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the output of this safe to log? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we worried about tokens being in the log? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess they would be if verbose was on. I made an additional change to control the verbosity so it shouldn't be a problem anymore (default verbosity is 0 which means no There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is %n, is that a newline like \n? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep but platform specific |
||
| } | ||
| } | ||
| } | ||
|
|
||
| object CondaEnvironmentManager { | ||
| def isConfigured(sparkConf: SparkConf): Boolean = { | ||
| sparkConf.contains(CONDA_BINARY_PATH) | ||
| } | ||
|
|
||
| def fromConf(sparkConf: SparkConf): CondaEnvironmentManager = { | ||
| val condaBinaryPath = sparkConf.get(CONDA_BINARY_PATH).getOrElse( | ||
| sys.error(s"Expected config ${CONDA_BINARY_PATH.key} to be set")) | ||
| val condaChannelUrls = sparkConf.get(CONDA_CHANNEL_URLS) | ||
| require(condaChannelUrls.nonEmpty, | ||
| s"Must define at least one conda channel in config ${CONDA_CHANNEL_URLS.key}") | ||
| val verbosity = sparkConf.get(CONDA_VERBOSITY) | ||
| new CondaEnvironmentManager(condaBinaryPath, condaChannelUrls, verbosity) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please revert all the changes to this file that aren't for this pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, done, scalafmt got let loose in this file somehow. No wonder scalastyle was going crazy.