Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ if [[ "$1" =~ \.py$ ]]; then
gatherSparkSubmitOpts "$@"
exec $FWDIR/bin/spark-submit "${SUBMISSION_OPTS[@]}" $primary "${APPLICATION_OPTS[@]}"
else
# PySpark shell requires special handling downstream
export PYSPARK_SHELL=1
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" ]]; then
exec ipython $IPYTHON_OPTS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,25 +132,29 @@ private[spark] object SparkSubmitDriverBootstrapper {
val builder = new ProcessBuilder(filteredCommand)
val process = builder.start()

// Redirect stdin, stdout, and stderr to/from the child JVM
// Redirect stdout and stderr from the child JVM
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
stdoutThread.start()
stderrThread.start()

// In Windows, the subprocess reads directly from our stdin, so we should avoid spawning
// a thread that contends with the subprocess in reading from System.in.
if (Utils.isWindows) {
// For the PySpark shell, the termination of this process is handled in java_gateway.py
process.waitFor()
} else {
// Terminate on broken pipe, which signals that the parent process has exited. This is
// important for the PySpark shell, where Spark submit itself is a python subprocess.
// Redirect stdin to child JVM only if we're not running Windows. This is because the
// subprocess there already reads directly from our stdin, so we should avoid spawning a
// thread that contends with the subprocess in reading from System.in.
val isWindows = Utils.isWindows
val isPySparkShell = sys.env.contains("PYSPARK_SHELL")
if (!isWindows) {
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
stdinThread.start()
stdinThread.join()
process.destroy()
// For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM
// should terminate on broken pipe, which signals that the parent process has exited. In
// Windows, the termination logic for the PySpark shell is handled in java_gateway.py
if (isPySparkShell) {
stdinThread.join()
process.destroy()
}
}
process.waitFor()
}

}