-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-1808] Route bin/pyspark through Spark submit #799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
a371d26
f04aaa4
afe47bf
fe4c8a7
6fba412
a823661
05879fa
06eb138
b7ba0d8
7eebda8
456d844
1866f85
c8cb3bf
01066fa
bf37e36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,12 @@ export SPARK_HOME="$FWDIR" | |
|
|
||
| SCALA_VERSION=2.10 | ||
|
|
||
| if [[ "$@" == *--help* ]]; then | ||
| echo "Usage: ./bin/pyspark [python file] [options]" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to deprecate submitting a Python file with this, drop the |
||
| ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 | ||
| exit 0 | ||
| fi | ||
|
|
||
| # Exit if the user hasn't compiled Spark | ||
| if [ ! -f "$FWDIR/RELEASE" ]; then | ||
| # Exit if the user hasn't compiled Spark | ||
|
|
@@ -51,14 +57,20 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH | |
| # Load the PySpark shell.py script when ./pyspark is used interactively: | ||
| export OLD_PYTHONSTARTUP=$PYTHONSTARTUP | ||
| export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py | ||
| export PYSPARK_SUBMIT_ARGS="$@" | ||
|
|
||
| if [ -n "$IPYTHON_OPTS" ]; then | ||
| IPYTHON=1 | ||
| fi | ||
|
|
||
| # Only use ipython if no command line arguments were provided [SPARK-1134] | ||
| if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then | ||
| exec ipython $IPYTHON_OPTS | ||
| # If a python file is provided, directly run spark-submit | ||
| if [[ "$1" =~ \.py$ ]]; then | ||
| exec $FWDIR/bin/spark-submit $PYSPARK_SUBMIT_ARGS | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This won't work with quoted arguments. The problem is that when you convert |
||
| else | ||
| exec "$PYSPARK_PYTHON" "$@" | ||
| # Only use ipython if no command line arguments were provided [SPARK-1134] | ||
| if [[ "$IPYTHON" = "1" ]]; then | ||
| exec ipython $IPYTHON_OPTS | ||
| else | ||
| exec "$PYSPARK_PYTHON" | ||
| fi | ||
| fi | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,10 +41,10 @@ object SparkSubmit { | |
| private var clusterManager: Int = LOCAL | ||
|
|
||
| /** | ||
| * A special jar name that indicates the class being run is inside of Spark itself, | ||
| * and therefore no user jar is needed. | ||
| * Special primary resource names that represent shells rather than application jars. | ||
| */ | ||
| private val RESERVED_JAR_NAME = "spark-internal" | ||
| private val SPARK_SHELL = "spark-shell" | ||
| private val PYSPARK_SHELL = "pyspark-shell" | ||
|
|
||
| def main(args: Array[String]) { | ||
| val appArgs = new SparkSubmitArguments(args) | ||
|
|
@@ -71,8 +71,8 @@ object SparkSubmit { | |
| * entries for the child, a list of system properties, a list of env vars | ||
| * and the main class for the child | ||
| */ | ||
| private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String], | ||
| ArrayBuffer[String], Map[String, String], String) = { | ||
| private[spark] def createLaunchEnv(args: SparkSubmitArguments) | ||
| : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { | ||
| if (args.master.startsWith("local")) { | ||
| clusterManager = LOCAL | ||
| } else if (args.master.startsWith("yarn")) { | ||
|
|
@@ -121,24 +121,30 @@ object SparkSubmit { | |
| printErrorAndExit("Cannot currently run driver on the cluster in Mesos") | ||
| } | ||
|
|
||
| // If we're running a Python app, set the Java class to run to be our PythonRunner, add | ||
| // Python files to deployment list, and pass the main file and Python path to PythonRunner | ||
| // If we're running a python app, set the main class to our specific python runner | ||
| if (isPython) { | ||
| if (deployOnCluster) { | ||
| printErrorAndExit("Cannot currently run Python driver programs on cluster") | ||
| } | ||
| args.mainClass = "org.apache.spark.deploy.PythonRunner" | ||
| args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource) | ||
| if (args.primaryResource == PYSPARK_SHELL) { | ||
| args.mainClass = "py4j.GatewayServer" | ||
| args.childArgs ++= ArrayBuffer("--die-on-broken-pipe", "0") | ||
| } else { | ||
| // If a python file is provided, add it to the child arguments and list of files to deploy. | ||
| // Usage: PythonAppRunner <main python file> <extra python files> [app arguments] | ||
| args.mainClass = "org.apache.spark.deploy.PythonRunner" | ||
| args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs | ||
| args.files = Utils.mergeFileLists(args.files, args.primaryResource) | ||
| } | ||
| val pyFiles = Option(args.pyFiles).getOrElse("") | ||
| args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs | ||
| args.primaryResource = RESERVED_JAR_NAME | ||
| args.files = Utils.mergeFileLists(args.files, pyFiles) | ||
| sysProps("spark.submit.pyFiles") = pyFiles | ||
| } | ||
|
|
||
| // If we're deploying into YARN, use yarn.Client as a wrapper around the user class | ||
| if (!deployOnCluster) { | ||
| childMainClass = args.mainClass | ||
| if (args.primaryResource != RESERVED_JAR_NAME) { | ||
| if (isUserJar(args.primaryResource)) { | ||
| childClasspath += args.primaryResource | ||
| } | ||
| } else if (clusterManager == YARN) { | ||
|
|
@@ -219,7 +225,7 @@ object SparkSubmit { | |
| // For python files, the primary resource is already distributed as a regular file | ||
| if (!isYarnCluster && !isPython) { | ||
| var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) | ||
| if (args.primaryResource != RESERVED_JAR_NAME) { | ||
| if (isUserJar(args.primaryResource)) { | ||
| jars = jars ++ Seq(args.primaryResource) | ||
| } | ||
| sysProps.put("spark.jars", jars.mkString(",")) | ||
|
|
@@ -293,8 +299,8 @@ object SparkSubmit { | |
| } | ||
|
|
||
| private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { | ||
| val localJarFile = new File(new URI(localJar).getPath()) | ||
| if (!localJarFile.exists()) { | ||
| val localJarFile = new File(new URI(localJar).getPath) | ||
| if (!localJarFile.exists) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should use parens on |
||
| printWarning(s"Jar $localJar does not exist, skipping.") | ||
| } | ||
|
|
||
|
|
@@ -303,14 +309,24 @@ object SparkSubmit { | |
| } | ||
|
|
||
| /** | ||
| * Merge a sequence of comma-separated file lists, some of which may be null to indicate | ||
| * no files, into a single comma-separated string. | ||
| * Return whether the given primary resource represents a user jar. | ||
| */ | ||
| private def isUserJar(primaryResource: String): Boolean = { | ||
| !isShell(primaryResource) && !isPython(primaryResource) | ||
| } | ||
|
|
||
| /** | ||
| * Return whether the given primary resource represents a shell. | ||
| */ | ||
| private def isShell(primaryResource: String): Boolean = { | ||
| primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL | ||
| } | ||
|
|
||
| /** | ||
| * Return whether the given primary resource requires running python. | ||
| */ | ||
| private[spark] def mergeFileLists(lists: String*): String = { | ||
| val merged = lists.filter(_ != null) | ||
| .flatMap(_.split(",")) | ||
| .mkString(",") | ||
| if (merged == "") null else merged | ||
| private[spark] def isPython(primaryResource: String): Boolean = { | ||
| primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1166,4 +1166,16 @@ private[spark] object Utils extends Logging { | |
| true | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Merge a sequence of comma-separated file lists into a single comma-separated string. | ||
| * The provided strings may be null or empty to indicate no files. | ||
| */ | ||
| def mergeFileLists(lists: String*): String = { | ||
|
||
| lists | ||
| .filter(_ != null) | ||
| .filter(_ != "") | ||
| .flatMap(_.split(",")) | ||
| .mkString(",") | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,9 +34,13 @@ def launch_gateway(): | |
| # Launch the Py4j gateway using Spark's run command so that we pick up the | ||
| # proper classpath and settings from spark-env.sh | ||
| on_windows = platform.system() == "Windows" | ||
| script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class" | ||
| command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", | ||
| "--die-on-broken-pipe", "0"] | ||
| script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit" | ||
| submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS") | ||
| if submit_args is not None: | ||
| submit_args = submit_args.split(" ") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This probably won't work if there are quoted strings in the args, e.g.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. We can't just pass the args to python because |
||
| else: | ||
| submit_args = [] | ||
| command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args | ||
| if not on_windows: | ||
| # Don't send ctrl-c / SIGINT to the Java gateway: | ||
| def preexec_func(): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this recognize
-halso