Skip to content

Commit ebff732

Browse files
lianhuiwangtgravescs
authored andcommitted
[SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH
Based on #5478 that provide a PYSPARK_ARCHIVES_PATH env. within this PR, we just should export PYSPARK_ARCHIVES_PATH=/user/spark/pyspark.zip,/user/spark/python/lib/py4j-0.8.2.1-src.zip in conf/spark-env.sh when we don't install PySpark on each node of Yarn. i run python application successfully on yarn-client and yarn-cluster with this PR. andrewor14 sryza Sephiroth-Lin Can you take a look at this?thanks. Author: Lianhui Wang <[email protected]> Closes #5580 from lianhuiwang/SPARK-6869 and squashes the following commits: 66ffa43 [Lianhui Wang] Update Client.scala c2ad0f9 [Lianhui Wang] Update Client.scala 1c8f664 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 008850a [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 f0b4ed8 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 150907b [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 20402cd [Lianhui Wang] use ZipEntry 9d87c3f [Lianhui Wang] update scala style e7bd971 [Lianhui Wang] address vanzin's comments 4b8a3ed [Lianhui Wang] use pyArchivesEnvOpt e6b573b [Lianhui Wang] address vanzin's comments f11f84a [Lianhui Wang] zip pyspark archives 5192cca [Lianhui Wang] update import path 3b1e4c8 [Lianhui Wang] address tgravescs's comments 9396346 [Lianhui Wang] put zip to make-distribution.sh 0d2baf7 [Lianhui Wang] update import paths e0179be [Lianhui Wang] add zip pyspark archives in build or sparksubmit 31e8e06 [Lianhui Wang] update code style 9f31dac [Lianhui Wang] update code and add comments f72987c [Lianhui Wang] add archives path to PYTHONPATH
1 parent c2f0821 commit ebff732

File tree

4 files changed

+114
-8
lines changed

4 files changed

+114
-8
lines changed

assembly/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,27 @@
9292
<skip>true</skip>
9393
</configuration>
9494
</plugin>
95+
<!-- zip pyspark archives to run python application on yarn mode -->
96+
<plugin>
97+
<groupId>org.apache.maven.plugins</groupId>
98+
<artifactId>maven-antrun-plugin</artifactId>
99+
<executions>
100+
<execution>
101+
<phase>package</phase>
102+
<goals>
103+
<goal>run</goal>
104+
</goals>
105+
</execution>
106+
</executions>
107+
<configuration>
108+
<target>
109+
<delete dir="${basedir}/../python/lib/pyspark.zip"/>
110+
<zip destfile="${basedir}/../python/lib/pyspark.zip">
111+
<fileset dir="${basedir}/../python/" includes="pyspark/**/*"/>
112+
</zip>
113+
</target>
114+
</configuration>
115+
</plugin>
95116
<!-- Use the shade plugin to create a big JAR with all the dependencies -->
96117
<plugin>
97118
<groupId>org.apache.maven.plugins</groupId>

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,47 @@ object SparkSubmit {
332332
}
333333
}
334334

335+
// In yarn mode for a python app, add pyspark archives to files
336+
// that can be distributed with the job
337+
if (args.isPython && clusterManager == YARN) {
338+
var pyArchives: String = null
339+
val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
340+
if (pyArchivesEnvOpt.isDefined) {
341+
pyArchives = pyArchivesEnvOpt.get
342+
} else {
343+
if (!sys.env.contains("SPARK_HOME")) {
344+
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
345+
}
346+
val pythonPath = new ArrayBuffer[String]
347+
for (sparkHome <- sys.env.get("SPARK_HOME")) {
348+
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
349+
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
350+
if (!pyArchivesFile.exists()) {
351+
printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
352+
}
353+
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
354+
if (!py4jFile.exists()) {
355+
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
356+
"in yarn mode.")
357+
}
358+
pythonPath += pyArchivesFile.getAbsolutePath()
359+
pythonPath += py4jFile.getAbsolutePath()
360+
}
361+
pyArchives = pythonPath.mkString(",")
362+
}
363+
364+
pyArchives = pyArchives.split(",").map { localPath=>
365+
val localURI = Utils.resolveURI(localPath)
366+
if (localURI.getScheme != "local") {
367+
args.files = mergeFileLists(args.files, localURI.toString)
368+
new Path(localPath).getName
369+
} else {
370+
localURI.getPath
371+
}
372+
}.mkString(File.pathSeparator)
373+
sysProps("spark.submit.pyArchives") = pyArchives
374+
}
375+
335376
// If we're running a R app, set the main class to our specific R runner
336377
if (args.isR && deployMode == CLIENT) {
337378
if (args.primaryResource == SPARKR_SHELL) {

project/SparkBuild.scala

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,23 +370,56 @@ object Assembly {
370370
object PySparkAssembly {
371371
import sbtassembly.Plugin._
372372
import AssemblyKeys._
373+
import java.util.zip.{ZipOutputStream, ZipEntry}
373374

374375
lazy val settings = Seq(
375376
unmanagedJars in Compile += { BuildCommons.sparkHome / "python/lib/py4j-0.8.2.1-src.zip" },
376377
// Use a resource generator to copy all .py files from python/pyspark into a managed directory
377378
// to be included in the assembly. We can't just add "python/" to the assembly's resource dir
378379
// list since that will copy unneeded / unwanted files.
379380
resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File =>
381+
val src = new File(BuildCommons.sparkHome, "python/pyspark")
382+
383+
val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
384+
zipFile.delete()
385+
zipRecursive(src, zipFile)
386+
380387
val dst = new File(outDir, "pyspark")
381388
if (!dst.isDirectory()) {
382389
require(dst.mkdirs())
383390
}
384-
385-
val src = new File(BuildCommons.sparkHome, "python/pyspark")
386391
copy(src, dst)
387392
}
388393
)
389394

395+
private def zipRecursive(source: File, destZipFile: File) = {
396+
val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
397+
addFilesToZipStream("", source, destOutput)
398+
destOutput.flush()
399+
destOutput.close()
400+
}
401+
402+
private def addFilesToZipStream(parent: String, source: File, output: ZipOutputStream): Unit = {
403+
if (source.isDirectory()) {
404+
output.putNextEntry(new ZipEntry(parent + source.getName()))
405+
for (file <- source.listFiles()) {
406+
addFilesToZipStream(parent + source.getName() + File.separator, file, output)
407+
}
408+
} else {
409+
val in = new FileInputStream(source)
410+
output.putNextEntry(new ZipEntry(parent + source.getName()))
411+
val buf = new Array[Byte](8192)
412+
var n = 0
413+
while (n != -1) {
414+
n = in.read(buf)
415+
if (n != -1) {
416+
output.write(buf, 0, n)
417+
}
418+
}
419+
in.close()
420+
}
421+
}
422+
390423
private def copy(src: File, dst: File): Seq[File] = {
391424
src.listFiles().flatMap { f =>
392425
val child = new File(dst, f.getName())

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,17 @@ private[spark] class Client(
468468
env("SPARK_YARN_USER_ENV") = userEnvs
469469
}
470470

471+
// if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
472+
// that can be passed on to the ApplicationMaster and the executors.
473+
if (sparkConf.contains("spark.submit.pyArchives")) {
474+
var pythonPath = sparkConf.get("spark.submit.pyArchives")
475+
if (env.contains("PYTHONPATH")) {
476+
pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
477+
}
478+
env("PYTHONPATH") = pythonPath
479+
sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
480+
}
481+
471482
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
472483
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
473484
// SparkContext will not let that set spark* system properties, which is expected behavior for
@@ -1074,7 +1085,7 @@ object Client extends Logging {
10741085
val hiveConf = hiveClass.getMethod("getConf").invoke(hive)
10751086
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
10761087

1077-
val hiveConfGet = (param:String) => Option(hiveConfClass
1088+
val hiveConfGet = (param: String) => Option(hiveConfClass
10781089
.getMethod("get", classOf[java.lang.String])
10791090
.invoke(hiveConf, param))
10801091

@@ -1096,7 +1107,7 @@ object Client extends Logging {
10961107

10971108
val hive2Token = new Token[DelegationTokenIdentifier]()
10981109
hive2Token.decodeFromUrlString(tokenStr)
1099-
credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token)
1110+
credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token)
11001111
logDebug("Added hive.Server2.delegation.token to conf.")
11011112
hiveClass.getMethod("closeCurrent").invoke(null)
11021113
} else {
@@ -1141,13 +1152,13 @@ object Client extends Logging {
11411152

11421153
logInfo("Added HBase security token to credentials.")
11431154
} catch {
1144-
case e:java.lang.NoSuchMethodException =>
1155+
case e: java.lang.NoSuchMethodException =>
11451156
logInfo("HBase Method not found: " + e)
1146-
case e:java.lang.ClassNotFoundException =>
1157+
case e: java.lang.ClassNotFoundException =>
11471158
logDebug("HBase Class not found: " + e)
1148-
case e:java.lang.NoClassDefFoundError =>
1159+
case e: java.lang.NoClassDefFoundError =>
11491160
logDebug("HBase Class not found: " + e)
1150-
case e:Exception =>
1161+
case e: Exception =>
11511162
logError("Exception when obtaining HBase security token: " + e)
11521163
}
11531164
}

0 commit comments

Comments
 (0)