Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ export SPARK_HOME="$FWDIR"

SCALA_VERSION=2.10

if [[ "$@" == *--help* ]]; then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this recognize -h also

echo "Usage: ./bin/pyspark [python file] [options]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to deprecate submitting a Python file with this, drop the [python file]

./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi

# Exit if the user hasn't compiled Spark
if [ ! -f "$FWDIR/RELEASE" ]; then
# Exit if the user hasn't compiled Spark
Expand All @@ -51,14 +57,20 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
export PYSPARK_SUBMIT_ARGS="$@"

if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi

# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
exec ipython $IPYTHON_OPTS
# If a python file is provided, directly run spark-submit
if [[ "$1" =~ \.py$ ]]; then
exec $FWDIR/bin/spark-submit $PYSPARK_SUBMIT_ARGS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work with quoted arguments. The problem is that when you convert $@ to a variable the type changes to a string from an array. Check out the way ORIG_ARGS is handled inside of spark-submit.

else
exec "$PYSPARK_PYTHON" "$@"
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" ]]; then
exec ipython $IPYTHON_OPTS
else
exec "$PYSPARK_PYTHON"
fi
fi
4 changes: 2 additions & 2 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ function main(){
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
fi
}

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-shell.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ rem

set SPARK_HOME=%~dp0..

cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-internal %* --class org.apache.spark.repl.Main
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object PythonRunner {
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
val pathElements = new ArrayBuffer[String]
pathElements ++= pyFiles.split(",")
pathElements ++= Option(pyFiles).getOrElse("").split(",")
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
Expand Down
60 changes: 38 additions & 22 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ object SparkSubmit {
private var clusterManager: Int = LOCAL

/**
* A special jar name that indicates the class being run is inside of Spark itself,
* and therefore no user jar is needed.
* Special primary resource names that represent shells rather than application jars.
*/
private val RESERVED_JAR_NAME = "spark-internal"
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
Expand All @@ -71,8 +71,8 @@ object SparkSubmit {
* entries for the child, a list of system properties, a list of env vars
* and the main class for the child
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
if (args.master.startsWith("local")) {
clusterManager = LOCAL
} else if (args.master.startsWith("yarn")) {
Expand Down Expand Up @@ -121,24 +121,30 @@ object SparkSubmit {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}

// If we're running a Python app, set the Java class to run to be our PythonRunner, add
// Python files to deployment list, and pass the main file and Python path to PythonRunner
// If we're running a python app, set the main class to our specific python runner
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster")
}
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource)
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs ++= ArrayBuffer("--die-on-broken-pipe", "0")
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = Utils.mergeFileLists(args.files, args.primaryResource)
}
val pyFiles = Option(args.pyFiles).getOrElse("")
args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs
args.primaryResource = RESERVED_JAR_NAME
args.files = Utils.mergeFileLists(args.files, pyFiles)
sysProps("spark.submit.pyFiles") = pyFiles
}

// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) {
childMainClass = args.mainClass
if (args.primaryResource != RESERVED_JAR_NAME) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
} else if (clusterManager == YARN) {
Expand Down Expand Up @@ -219,7 +225,7 @@ object SparkSubmit {
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
if (args.primaryResource != RESERVED_JAR_NAME) {
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
Expand Down Expand Up @@ -293,8 +299,8 @@ object SparkSubmit {
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(new URI(localJar).getPath())
if (!localJarFile.exists()) {
val localJarFile = new File(new URI(localJar).getPath)
if (!localJarFile.exists) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use parens on exists() because it's not just a field access or getter

printWarning(s"Jar $localJar does not exist, skipping.")
}

Expand All @@ -303,14 +309,24 @@ object SparkSubmit {
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
* Return whether the given primary resource represents a user jar.
*/
private def isUserJar(primaryResource: String): Boolean = {
!isShell(primaryResource) && !isPython(primaryResource)
}

/**
* Return whether the given primary resource represents a shell.
*/
private def isShell(primaryResource: String): Boolean = {
primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
}

/**
* Return whether the given primary resource requires running python.
*/
private[spark] def mergeFileLists(lists: String*): String = {
val merged = lists.filter(_ != null)
.flatMap(_.split(","))
.mkString(",")
if (merged == "") null else merged
private[spark] def isPython(primaryResource: String): Boolean = {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
case v =>
primaryResource = v
inSparkOpts = false
isPython = v.endsWith(".py")
isPython = SparkSubmit.isPython(v)
parse(tail)
}
} else {
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1166,4 +1166,16 @@ private[spark] object Utils extends Logging {
true
}
}

/**
* Merge a sequence of comma-separated file lists into a single comma-separated string.
* The provided strings may be null or empty to indicate no files.
*/
def mergeFileLists(lists: String*): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this moved to Utils? The code to deal with nulls and empty strings and such is pretty specific to spark-submit, we don't need other parts of Spark to use it.

lists
.filter(_ != null)
.filter(_ != "")
.flatMap(_.split(","))
.mkString(",")
}
}
10 changes: 7 additions & 3 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
"--die-on-broken-pipe", "0"]
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
if submit_args is not None:
submit_args = submit_args.split(" ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably won't work if there are quoted strings in the args, e.g. pyspark --name "my app". Do we really need to pass it this way? Can't we use sys.argv or something like that and just pass the args to Python?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We can't just pass the args to python because shell.py is run on PYTHONSTARTUP. In the case of IPython for example, each notebook runs shell.py without arguments.

else:
submit_args = []
command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])

sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files)
sc = SparkContext(appName="PySparkShell", pyFiles=add_files)

print("""Welcome to
____ __
Expand Down