Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class SparkEnv (
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
pythonWorkers.getOrElseUpdate(key,
new PythonWorkerFactory(pythonExec, envVars, conf)).create()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,26 @@
package org.apache.spark.api.python

import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter}
import java.io.{File, FileInputStream, FileOutputStream, IOException}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
import java.nio.charset.StandardCharsets
import java.nio.file.{Paths, Files}
import java.util.Arrays
import java.util.concurrent.atomic.AtomicInteger
import java.util.zip.{ZipEntry, ZipInputStream}

import scala.collection.mutable
import scala.collection.JavaConverters._

import org.apache.commons.io.IOUtils
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.util.{RedirectThread, Utils}

private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])

private[spark] class PythonWorkerFactory(pythonExec: String,
envVars: Map[String, String],
conf: SparkConf)
extends Logging {

import PythonWorkerFactory._
Expand All @@ -46,6 +54,32 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val daemonWorkers = new mutable.WeakHashMap[Socket, Int]()
val idleWorkers = new mutable.Queue[Socket]()
var lastActivity = 0L
val sparkFiles = conf.getOption("spark.files")
val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", false)
val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
val virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "virtualenv")
val virtualEnvSystemSitePackages = conf.getBoolean(
"spark.pyspark.virtualenv.system_site_packages", false)
val virtualWheelhouse = conf.get("spark.pyspark.virtualenv.wheelhouse", "wheelhouse.zip")
// virtualRequirements is empty string by default
val virtualRequirements = conf.get("spark.pyspark.virtualenv.requirements", "")
val virtualIndexUrl = conf.get("spark.pyspark.virtualenv.index_url", null)
val virtualTrustedHost = conf.get("spark.pyspark.virtualenv.trusted_host", null)
val virtualInstallPackage = conf.get("spark.pyspark.virtualenv.install_package", null)
val upgradePip = conf.getBoolean("spark.pyspark.virtualenv.upgrade_pip", false)
val virtualUseIndex = conf.getBoolean("spark.pyspark.virtualenv.use_index", true)
var virtualEnvName: String = _
var virtualPythonExec: String = _

// search for "wheelhouse.zip" to trigger unzipping and installation of wheelhouse
// also search for "requirements.txt if provided"
for (filename <- sparkFiles.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten) {
logDebug("Looking inside" + filename)
val file = new File(filename)
val prefixes = Iterator.iterate(file)(_.getParentFile).takeWhile(_ != null).toList.reverse
logDebug("=> prefixes" + prefixes)
}

new MonitorThread().start()

var simpleWorkers = new mutable.WeakHashMap[Socket, Process]()
Expand All @@ -55,6 +89,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
envVars.getOrElse("PYTHONPATH", ""),
sys.env.getOrElse("PYTHONPATH", ""))

if (virtualEnvEnabled) {
setupVirtualEnv()
}

def create(): Socket = {
if (useDaemon) {
synchronized {
Expand All @@ -68,6 +106,193 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
}


def unzipWheelhouse(zipFile: String, outputFolder: String): Unit = {
val buffer = new Array[Byte](1024)
try {
// output directory
val folder = new File(outputFolder);
if (!folder.exists()) {
folder.mkdir();
}

// zip file content
val zis: ZipInputStream = new ZipInputStream(new FileInputStream(zipFile));
// get the zipped file list entry
var ze: ZipEntry = zis.getNextEntry();

while (ze != null) {
if (!ze.isDirectory()) {
val fileName = ze.getName();
val newFile = new File(outputFolder + File.separator + fileName);
logDebug("Unzipping file " + newFile.getAbsoluteFile());

// create folders
new File(newFile.getParent()).mkdirs();
val fos = new FileOutputStream(newFile);
var len: Int = zis.read(buffer);

while (len > 0) {
fos.write(buffer, 0, len)
len = zis.read(buffer)
}
fos.close()
}
ze = zis.getNextEntry()
}
zis.closeEntry()
zis.close()
} catch {
case e: IOException => logError("exception caught: " + e.getMessage)
}
}

/**
* Create virtualenv using native virtualenv or conda
*
* Native Virtualenv:
* - Install virtualenv:
* virtualenv -p pythonExec [--system-site-packages] virtualenvName
* - if wheelhouse specified:
* - unzip wheelhouse
* - upgrade pip if set by conf (default: no)
* - install using pip:
*
* pip install -r requirement_file.txt \
* --find-links=wheelhouse \
* [--no-index] \
* [--index-url http://pypi.mirror/simple] [--trusted-host pypi.mirror] \
* [package.whl]
*
* else, if no wheelhouse is set:
*
* pip install -r requirement_file.txt \
* [--no-index] \
* [--index-url http://pypi.mirror/simple] [--trusted-host pypi.mirror] \
* [package.whl]
*
* Conda
* - Execute command: conda create --name virtualenvName --file requirement_file.txt -y
*
*/
def setupVirtualEnv(): Unit = {
logDebug("Start to setup virtualenv...")
virtualEnvName = "virtualenv_" + conf.getAppId + "_" + WORKER_Id.getAndIncrement()
// use the absolute path when it is local mode otherwise just use filename as it would be
// fetched from FileServer
val pyspark_requirements =
if (Utils.isLocalMaster(conf)) {
virtualRequirements
} else {
virtualRequirements.split("/").last
}

logDebug("wheelhouse: " + virtualWheelhouse)
if (virtualWheelhouse != null &&
!virtualWheelhouse.isEmpty &&
Files.exists(Paths.get(virtualWheelhouse))) {
logDebug("Unziping wheelhouse archive " + virtualWheelhouse)
unzipWheelhouse(virtualWheelhouse, "wheelhouse")
}

val createEnvCommand =
if (virtualEnvType == "native") {
if (virtualEnvSystemSitePackages) {
Arrays.asList(virtualEnvPath, "-p", pythonExec, "--system-site-packages", virtualEnvName)
}
else {
Arrays.asList(virtualEnvPath, "-p", pythonExec, virtualEnvName)
}
} else {
// Conda creates everything and install the packages
var basePipArgs = mutable.ListBuffer[String]()
basePipArgs += (virtualEnvPath,
"create",
"--prefix",
System.getProperty("user.dir") + "/" + virtualEnvName)
if (pyspark_requirements != null && !pyspark_requirements.isEmpty) {
basePipArgs += ("--file", pyspark_requirements)
}
basePipArgs += ("-y")
basePipArgs.toList.asJava
}
execCommand(createEnvCommand)
virtualPythonExec = virtualEnvName + "/bin/python"

// virtualenv will be created in the working directory of Executor.
if (virtualEnvType == "native") {
var virtualenvPipExec = virtualEnvName + "/bin/pip"
var pipUpgradeArgs = mutable.ListBuffer[String]()
if (upgradePip){
pipUpgradeArgs += (virtualenvPipExec, "install", "--upgrade", "pip")
}
var basePipArgs = mutable.ListBuffer[String]()
basePipArgs += (virtualenvPipExec, "install")
if (pyspark_requirements != null && !pyspark_requirements.isEmpty) {
basePipArgs += ("-r", pyspark_requirements)
}
if (virtualWheelhouse != null &&
!virtualWheelhouse.isEmpty &&
Files.exists(Paths.get(virtualWheelhouse))) {
basePipArgs += ("--find-links=wheelhouse")
pipUpgradeArgs += ("--find-links=wheelhouse")
}
if (virtualIndexUrl != null && !virtualIndexUrl.isEmpty) {
basePipArgs += ("--index-url", virtualIndexUrl)
pipUpgradeArgs += ("--index-url", virtualIndexUrl)
} else if (! virtualUseIndex){
basePipArgs += ("--no-index")
pipUpgradeArgs += ("--no-index")
}
if (virtualTrustedHost != null && !virtualTrustedHost.isEmpty) {
basePipArgs += ("--trusted-host", virtualTrustedHost)
pipUpgradeArgs += ("--trusted-host", virtualTrustedHost)
}
if (upgradePip){
// upgrade pip in the virtualenv
execCommand(pipUpgradeArgs.toList.asJava)
}
if (virtualInstallPackage != null && !virtualInstallPackage.isEmpty) {
basePipArgs += (virtualInstallPackage)
}
execCommand(basePipArgs.toList.asJava)
}
// do not execute a second command line in "conda" mode
}

def execCommand(commands: java.util.List[String]): Unit = {
logDebug("Running command: " + commands.asScala.mkString(" "))

val pb = new ProcessBuilder(commands)
pb.environment().putAll(envVars.asJava)
pb.environment().putAll(System.getenv())
pb.environment().put("HOME", System.getProperty("user.home"))

val proc = pb.start()

val exitCode = proc.waitFor()
if (exitCode != 0) {
val errString = try {
val err = Option(proc.getErrorStream())
err.map(IOUtils.toString)
} catch {
case io: IOException => None
}

val outString = try {
val out = Option(proc.getInputStream())
out.map(IOUtils.toString)
} catch {
case io: IOException => None
}

throw new RuntimeException("Fail to run command: " + commands.asScala.mkString(" ") +
"\nOutput: " + outString +
"\nStderr: " + errString
)
}
}

/**
* Connect to a worker launched through pyspark/daemon.py, which forks python processes itself
* to avoid the high cost of forking from Java. This currently only works on UNIX-based systems.
Expand Down Expand Up @@ -111,7 +336,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.worker"))
val realPythonExec = if (virtualEnvEnabled) virtualPythonExec else pythonExec
logDebug(s"Starting worker with pythonExec: ${realPythonExec}")
val pb = new ProcessBuilder(Arrays.asList(realPythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
Expand Down Expand Up @@ -154,7 +381,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.daemon"))
val realPythonExec = if (virtualEnvEnabled) virtualPythonExec else pythonExec
logDebug(s"Starting daemon with pythonExec: ${realPythonExec}")
val pb = new ProcessBuilder(Arrays.asList(realPythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
Expand Down Expand Up @@ -307,6 +536,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}

private object PythonWorkerFactory {
val WORKER_Id = new AtomicInteger()
val PROCESS_WAIT_TIMEOUT_MS = 10000
val IDLE_WORKER_TIMEOUT_MS = 60000 // kill idle workers after 1 minute
}
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| dependency conflicts.
| --repositories Comma-separated list of additional remote repositories to
| search for the maven coordinates given with --packages.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --py-files PY_FILES Comma-separated list of .zip, .egg, .whl or .py files to
| place on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
|
Expand Down
15 changes: 12 additions & 3 deletions docs/cluster-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,18 @@ The following table summarizes terms you'll see used to refer to cluster concept
<tr>
<td>Application jar</td>
<td>
A jar containing the user's Spark application. In some cases users will want to create
an "uber jar" containing their application along with its dependencies. The user's jar
should never include Hadoop or Spark libraries, however, these will be added at runtime.
A jar containing the user's Spark application (for Java and Scala driver). In some cases
users will want to create an "uber jar" containing their application along with its
dependencies. The user's jar should never include Hadoop or Spark libraries, however, these
will be added at runtime.
</td>
</tr>
<tr>
<td>Application Wheelhouse</td>
<td>
An archive containing precompiled wheels of the user's PySpark application and dependencies
(for Python driver). The user's wheelhouse should not include jars, only Python Wheel files
for one or more architectures.
</td>
</tr>
<tr>
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.submit.pyFiles</code></td>
<td></td>
<td>
Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
Comma-separated list of .zip, .egg, .whl or .py files to place on the PYTHONPATH for Python apps.
</td>
</tr>
<tr>
Expand Down
Loading