Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
97a9c6d
Conda options
dansanduleac Mar 3, 2017
225eeef
Add addCondaPackage method to python SparkContext
dansanduleac Mar 2, 2017
63d5d87
Implement CondaRunner, and rewrite python and R runner to extend it. …
dansanduleac Mar 4, 2017
64de33b
Fix PythonRunner unnecessary RedirectThread, redirect both as inherit
dansanduleac Mar 7, 2017
bf2aec7
Exclude CondaRunner.main from mima, it's complaining about main metho…
dansanduleac Mar 8, 2017
cc23dfb
Add conda test
dansanduleac Mar 7, 2017
528d12c
Set up miniconda on circle
dansanduleac Mar 7, 2017
c0440dc
PythonRunner should just say no to setting PYSPARK_*_PYTHON
dansanduleac Mar 10, 2017
78b7fea
Also on the executor, crash if there is a conda env and PYSPARK_PYTHO…
dansanduleac Mar 10, 2017
a4f2c72
Cache the stuff that build/mvn always downloads
dansanduleac Mar 10, 2017
b395ed1
Fix weirdness re: how scala SparkConf.get(x) gets called from python …
dansanduleac Mar 10, 2017
3094034
Added yarn conda test
dansanduleac Mar 11, 2017
1b79cde
Hack CondaEnvironmentManager to use a condarc, and symlink real envdi…
dansanduleac Mar 11, 2017
5c7ebe5
Longer timeout for yarn conda tests
dansanduleac Mar 11, 2017
36cef20
Fix run-tests.py prematurely exiting before copying the reports to ci…
dansanduleac Mar 11, 2017
9633678
Download miniconda through pyenv, and create envs for the tests rathe…
dansanduleac Mar 12, 2017
20d1227
Undo RedirectThread change
dansanduleac Mar 13, 2017
7b12736
Disable RRunner conda for now, it needs more changes and tests anyway…
dansanduleac Mar 13, 2017
aabe5fc
Use CONDA_BIN for tests rather than hard-coding where it is
dansanduleac Mar 13, 2017
c51eebd
fixup! Implement CondaRunner, and rewrite python and R runner to exte…
dansanduleac Mar 13, 2017
ce9020a
fixup! Added yarn conda test
dansanduleac Mar 13, 2017
8843ed6
CondaEnvironment shouldn't call activate anymore. Just replicate what…
dansanduleac Mar 13, 2017
55ce811
Improve comments and set HOME to a bespoke dir too in CondaEnvironmen…
dansanduleac Mar 13, 2017
0006321
Temp changes
dansanduleac Mar 13, 2017
c377c62
Move all the dynamic addCondaPackages / addCondaChannel functionality…
dansanduleac Mar 13, 2017
8d2123a
Undo unnecessary refactoring
dansanduleac Mar 14, 2017
ab66b63
Get rid of the withDeps complication, always install packages with de…
dansanduleac Mar 14, 2017
8a14bc1
Merge remote-tracking branch 'origin/master' into ds/conda-runner
dansanduleac Mar 14, 2017
26f3870
Log that it succeeded
dansanduleac Mar 15, 2017
d27ffcf
Just expose SparkContext.condaEnvironment externally as an Option
dansanduleac Mar 15, 2017
441f1d5
Style, and get rid of potentially unsafe logging
dansanduleac Mar 15, 2017
573d8e8
But log what python binary will be used
dansanduleac Mar 15, 2017
3f8f700
Make conda verbosity configurable
dansanduleac Mar 15, 2017
84c8da4
Undo sensible changes for fewer merge conflicts
dansanduleac Mar 16, 2017
8faea04
Cache the whole (root) build directory rather than individual zinc/ma…
dansanduleac Mar 16, 2017
075e65a
Improve logging with provenance (where a forbidden python exec settin…
dansanduleac Mar 16, 2017
f9ee56b
get rid of needless changes
dansanduleac Mar 16, 2017
f03a35c
Revert changes to RRunner altogether
dansanduleac Mar 16, 2017
ea05895
move CondaRunner mima exclude to the middle so there is less chance o…
dansanduleac Mar 16, 2017
9b9b579
get rid of some more unnecessary refactor
dansanduleac Mar 16, 2017
930186f
Attempt to cache the miniconda install again
dansanduleac Mar 16, 2017
7570365
scalastyle
dansanduleac Mar 16, 2017
4de6f96
Don't show channel urls, might divulge token info
dansanduleac Mar 16, 2017
7832976
Merge remote-tracking branch 'origin/master' into ds/conda-runner
dansanduleac Mar 16, 2017
2430478
Address @aash's comments
dansanduleac Mar 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.conda.CondaEnvironment
import org.apache.spark.api.conda.CondaEnvironment.CondaSetupInstructions

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please revert all the changes to this file that aren't for this pr?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, done, scalafmt got let loose in this file somehow. No wonder scalastyle was going crazy.

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.deploy.{CondaRunner, LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -336,6 +338,9 @@ class SparkContext(config: SparkConf) extends Logging {
override protected def initialValue(): Properties = new Properties()
}

// Retrieve the Conda Environment from CondaRunner if it has set one up for us
val condaEnvironment: Option[CondaEnvironment] = CondaRunner.condaEnvironment

/* ------------------------------------------------------------------------------------- *
| Initialization. This code initializes the context in a manner that is exception-safe. |
| All internal fields holding state are initialized here, and any error prompts the |
Expand Down Expand Up @@ -1851,6 +1856,28 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def listJars(): Seq[String] = addedJars.keySet.toSeq

private[this] def condaEnvironmentOrFail(): CondaEnvironment = {
condaEnvironment.getOrElse(sys.error("A conda environment was not set up."))
}

/**
* Add a set of conda packages (identified by <a href="
* https://conda.io/docs/spec.html#build-version-spec">package match specification</a>
* for all tasks to be executed on this SparkContext in the future.
*/
def addCondaPackages(packages: Seq[String]): Unit = {
condaEnvironmentOrFail().installPackages(packages)
}

def addCondaChannel(url: String): Unit = {
condaEnvironmentOrFail().addChannel(url)
}

private[spark] def buildCondaInstructions(): Option[CondaSetupInstructions] = {
condaEnvironment.map(_.buildSetupInstructions)
}


/**
* When stopping SparkContext inside Spark components, it's easy to cause dead-lock since Spark
* may wait for some internal threads to finish. It's better to use this method to stop
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.Properties
import com.google.common.collect.MapMaker

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.conda.CondaEnvironment.CondaSetupInstructions
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -70,7 +71,10 @@ class SparkEnv (
val conf: SparkConf) extends Logging {

private[spark] var isStopped = false
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

case class PythonWorkerKey(pythonExec: Option[String], envVars: Map[String, String],
condaInstructions: Option[CondaSetupInstructions])
private val pythonWorkers = mutable.HashMap[PythonWorkerKey, PythonWorkerFactory]()

// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
Expand Down Expand Up @@ -110,25 +114,29 @@ class SparkEnv (
}

private[spark]
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
def createPythonWorker(pythonExec: Option[String], envVars: Map[String, String],
condaInstructions: Option[CondaSetupInstructions]): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
val key = PythonWorkerKey(pythonExec, envVars, condaInstructions)
pythonWorkers.getOrElseUpdate(key,
new PythonWorkerFactory(pythonExec, envVars, condaInstructions)).create()
}
}

private[spark]
def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
def destroyPythonWorker(pythonExec: Option[String], envVars: Map[String, String],
condaInstructions: Option[CondaSetupInstructions], worker: Socket) {
synchronized {
val key = (pythonExec, envVars)
val key = PythonWorkerKey(pythonExec, envVars, condaInstructions)
pythonWorkers.get(key).foreach(_.stopWorker(worker))
}
}

private[spark]
def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
def releasePythonWorker(pythonExec: Option[String], envVars: Map[String, String],
condaInstructions: Option[CondaSetupInstructions], worker: Socket) {
synchronized {
val key = (pythonExec, envVars)
val key = PythonWorkerKey(pythonExec, envVars, condaInstructions)
pythonWorkers.get(key).foreach(_.releaseWorker(worker))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.api.conda

import java.io.File
import java.nio.file.Path
import java.util.{Map => JMap}

import scala.collection.mutable

import org.apache.spark.internal.Logging

/**
* A stateful class that describes a Conda environment and also keeps track of packages that have
* been added, as well as additional channels.
*
* @param rootPath The root path under which envs/ and pkgs/ are located.
* @param envName The name of the environment.
*/
final class CondaEnvironment(val manager: CondaEnvironmentManager,
val rootPath: Path,
val envName: String,
bootstrapPackages: Seq[String],
bootstrapChannels: Seq[String]) extends Logging {

import CondaEnvironment._

private[this] val packages = mutable.Buffer(bootstrapPackages: _*)
private[this] val channels = bootstrapChannels.toBuffer

val condaEnvDir: Path = rootPath.resolve("envs").resolve(envName)

def activatedEnvironment(startEnv: Map[String, String] = Map.empty): Map[String, String] = {
require(!startEnv.contains("PATH"), "Defining PATH in a CondaEnvironment's startEnv is " +
s"prohibited; found PATH=${startEnv("PATH")}")
import collection.JavaConverters._
val newVars = System.getenv().asScala.toIterator ++ startEnv ++ List(
"CONDA_PREFIX" -> condaEnvDir.toString,
"CONDA_DEFAULT_ENV" -> condaEnvDir.toString,
"PATH" -> (condaEnvDir.resolve("bin").toString +
sys.env.get("PATH").map(File.pathSeparator + _).getOrElse(""))
)
newVars.toMap
}

def addChannel(url: String): Unit = {
channels += url
}

def installPackages(packages: Seq[String]): Unit = {
manager.runCondaProcess(rootPath,
List("install", "-n", envName, "-y", "--override-channels")
::: channels.iterator.flatMap(Iterator("--channel", _)).toList
::: "--" :: packages.toList,
description = s"install dependencies in conda env $condaEnvDir"
)

this.packages ++= packages
}

/**
* Clears the given java environment and replaces all variables with the environment
* produced after calling `activate` inside this conda environment.
*/
def initializeJavaEnvironment(env: JMap[String, String]): Unit = {
env.clear()
val activatedEnv = activatedEnvironment()
activatedEnv.foreach { case (k, v) => env.put(k, v) }
logDebug(s"Initialised environment from conda: $activatedEnv")
}

/**
* This is for sending the instructions to the executors so they can replicate the same steps.
*/
def buildSetupInstructions: CondaSetupInstructions = {
CondaSetupInstructions(packages.toList, channels.toList)
}
}

object CondaEnvironment {
case class CondaSetupInstructions(packages: Seq[String], channels: Seq[String]) {
require(channels.nonEmpty)
require(packages.nonEmpty)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.api.conda

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths

import scala.collection.JavaConverters._
import scala.sys.process.BasicIO
import scala.sys.process.Process
import scala.sys.process.ProcessBuilder
import scala.sys.process.ProcessIO

import org.apache.spark.SparkConf
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CONDA_BINARY_PATH
import org.apache.spark.internal.config.CONDA_CHANNEL_URLS
import org.apache.spark.internal.config.CONDA_VERBOSITY
import org.apache.spark.util.Utils

final class CondaEnvironmentManager(condaBinaryPath: String, condaChannelUrls: Seq[String],
verbosity: Int = 0)
extends Logging {

require(condaChannelUrls.nonEmpty, "Can't have an empty list of conda channel URLs")
require(verbosity >= 0 && verbosity <= 3, "Verbosity must be between 0 and 3 inclusively")

def create(
baseDir: String,
bootstrapPackages: Seq[String]): CondaEnvironment = {
require(bootstrapPackages.nonEmpty, "Expected at least one bootstrap package.")
val name = "conda-env"

// must link in /tmp to reduce path length in case baseDir is very long...
// If baseDir path is too long, this breaks conda's 220 character limit for binary replacement.
// Don't even try to use java.io.tmpdir - yarn sets this to a very long path
val linkedBaseDir = Utils.createTempDir("/tmp", "conda").toPath.resolve("real")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if /tmp is full? We've seen it get full in many places. Is it reasonable to make this configurable via Spark property?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. The important thing is this must be really short.

However note that the risk is not high at all since all I'm doing is creating a directory with a symlink to the real stuff. I'm not actually creating any files directly under that /tmp directory directly.

logInfo(s"Creating symlink $linkedBaseDir -> $baseDir")
Files.createSymbolicLink(linkedBaseDir, Paths.get(baseDir))

val verbosityFlags = 0.until(verbosity).map(_ => "-v").toList

// Attempt to create environment
runCondaProcess(
linkedBaseDir,
List("create", "-n", name, "-y", "--override-channels", "--no-default-packages")
::: verbosityFlags
::: condaChannelUrls.flatMap(Iterator("--channel", _)).toList
::: "--" :: bootstrapPackages.toList,
description = "create conda env"
)

new CondaEnvironment(this, linkedBaseDir, name, bootstrapPackages, condaChannelUrls)
}

/**
* Create a condarc that only exposes package and env directories under the given baseRoot,
* on top of the from the default pkgs directory inferred from condaBinaryPath.
*
* The file will be placed directly inside the given `baseRoot` dir, and link to `baseRoot/pkgs`
* as the first package cache.
*
* This hack is necessary otherwise conda tries to use the homedir for pkgs cache.
*/
private[this] def generateCondarc(baseRoot: Path): Path = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this if we are faking home?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - the motivation is, we need to ensure our fresh cache dir comes first in pkgs_dirs. Conda will try to write to the first cache (by default anaconda_dir/pkgs) if it's writable, and we don't want it writing to the Anaconda dir

val condaPkgsPath = Paths.get(condaBinaryPath).getParent.getParent.resolve("pkgs")
val condarc = baseRoot.resolve("condarc")
val condarcContents =
s"""pkgs_dirs:
| - $baseRoot/pkgs
| - $condaPkgsPath
|envs_dirs:
| - $baseRoot/envs
|show_channel_urls: false
""".stripMargin
Files.write(condarc, List(condarcContents).asJava)
logInfo(f"Using condarc at $condarc:%n$condarcContents")
condarc
}

private[conda] def runCondaProcess(baseRoot: Path,
args: List[String],
description: String): Unit = {
val condarc = generateCondarc(baseRoot)
val fakeHomeDir = baseRoot.resolve("home")
// Attempt to create fake home dir
Files.createDirectories(fakeHomeDir)

val extraEnv = List(
"CONDARC" -> condarc.toString,
"HOME" -> fakeHomeDir.toString
)

val command = Process(
condaBinaryPath :: args,
None,
extraEnv: _*
)

logInfo(s"About to execute $command with environment $extraEnv")
runOrFail(command, description)
logInfo(s"Successfully executed $command with environment $extraEnv")
}

private[this] def runOrFail(command: ProcessBuilder, description: String): Unit = {
val buffer = new StringBuffer
val collectErrOutToBuffer = new ProcessIO(
BasicIO.input(false),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

BasicIO.processFully(buffer),
BasicIO.processFully(buffer))
val exitCode = command.run(collectErrOutToBuffer).exitValue()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log that it finished?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (exitCode != 0) {
throw new SparkException(s"Attempt to $description exited with code: "
+ f"$exitCode%nCommand was: $command%nOutput was:%n${buffer.toString}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the output of this safe to log?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we worried about tokens being in the log?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it?

Copy link
Author

@dansanduleac dansanduleac Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess they would be if verbose was on. I made an additional change to control the verbosity so it shouldn't be a problem anymore (default verbosity is 0 which means no -v flags)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is %n, is that a newline like \n?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep but platform specific

}
}
}

object CondaEnvironmentManager {
def isConfigured(sparkConf: SparkConf): Boolean = {
sparkConf.contains(CONDA_BINARY_PATH)
}

def fromConf(sparkConf: SparkConf): CondaEnvironmentManager = {
val condaBinaryPath = sparkConf.get(CONDA_BINARY_PATH).getOrElse(
sys.error(s"Expected config ${CONDA_BINARY_PATH.key} to be set"))
val condaChannelUrls = sparkConf.get(CONDA_CHANNEL_URLS)
require(condaChannelUrls.nonEmpty,
s"Must define at least one conda channel in config ${CONDA_CHANNEL_URLS.key}")
val verbosity = sparkConf.get(CONDA_VERBOSITY)
new CondaEnvironmentManager(condaBinaryPath, condaChannelUrls, verbosity)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,15 @@ class JavaSparkContext(val sc: SparkContext)
sc.addJar(path)
}

/**
* Add a set of conda packages (identified by <a href="
* https://conda.io/docs/spec.html#build-version-spec">package match specification</a>
* for all tasks to be executed on this SparkContext in the future.
*/
def addCondaPackages(packages: java.util.List[String]): Unit = {
sc.addCondaPackages(packages.asScala)
}

/**
* Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse.
*
Expand Down
Loading