@@ -19,6 +19,7 @@ package org.apache.spark.deploy
1919
2020import java .io .File
2121import java .net .{InetAddress , URI }
22+ import java .nio .file .Files
2223
2324import scala .collection .JavaConverters ._
2425import scala .collection .mutable .ArrayBuffer
@@ -48,7 +49,7 @@ object PythonRunner {
4849
4950 // Format python file paths before adding them to the PYTHONPATH
5051 val formattedPythonFile = formatPath(pythonFile)
51- val formattedPyFiles = formatPaths(pyFiles)
52+ val formattedPyFiles = resolvePyFiles( formatPaths(pyFiles) )
5253
5354 // Launch a Py4J gateway server for the process to connect to; this will let it see our
5455 // Java system properties and such
@@ -153,4 +154,30 @@ object PythonRunner {
153154 .map { p => formatPath(p, testWindows) }
154155 }
155156
157+ /**
158+ * Resolves the ".py" files. ".py" file should not be added as is because PYTHONPATH does
159+ * not expect a file. This method creates a temporary directory and puts the ".py" files
160+ * if exist in the given paths.
161+ */
162+ private def resolvePyFiles (pyFiles : Array [String ]): Array [String ] = {
163+ lazy val dest = Utils .createTempDir(namePrefix = " localPyFiles" )
164+ pyFiles.flatMap { pyFile =>
165+ // In case of client with submit, the python paths should be set before context
166+ // initialization because the context initialization can be done later.
167+ // We will copy the local ".py" files because ".py" file shouldn't be added
168+ // alone but its parent directory in PYTHONPATH. See SPARK-24384.
169+ if (pyFile.endsWith(" .py" )) {
170+ val source = new File (pyFile)
171+ if (source.exists() && source.canRead) {
172+ Files .copy(source.toPath, new File (dest, source.getName).toPath)
173+ Some (dest.getAbsolutePath)
174+ } else {
175+ // Don't have to add it if it doesn't exist or isn't readable.
176+ None
177+ }
178+ } else {
179+ Some (pyFile)
180+ }
181+ }.distinct
182+ }
156183}
0 commit comments