Skip to content

Commit e273447

Browse files
berngptgravescs
authored andcommitted
[SPARK-1522] : YARN ClientBase throws a NPE if there is no YARN Application CP
The current implementation of ClientBase.getDefaultYarnApplicationClasspath inspects the MRJobConfig class for the field DEFAULT_YARN_APPLICATION_CLASSPATH when it should be really looking into YarnConfiguration. If the Application Configuration has no yarn.application.classpath defined a NPE exception will be thrown. Additional Changes include: * Test Suite for ClientBase added [ticket: SPARK-1522] : https://issues.apache.org/jira/browse/SPARK-1522 Author : [email protected] Testing : SPARK_HADOOP_VERSION=2.3.0 SPARK_YARN=true ./sbt/sbt test Author: Bernardo Gomez Palacio <[email protected]> Closes #433 from berngp/feature/SPARK-1522 and squashes the following commits: 2c2e118 [Bernardo Gomez Palacio] [SPARK-1522]: YARN ClientBase throws a NPE if there is no YARN Application specific CP
1 parent 6cf335d commit e273447

File tree

2 files changed

+167
-34
lines changed

2 files changed

+167
-34
lines changed

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 55 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
2323

2424
import scala.collection.JavaConversions._
2525
import scala.collection.mutable.{HashMap, ListBuffer, Map}
26+
import scala.util.{Try, Success, Failure}
2627

2728
import org.apache.hadoop.conf.Configuration
2829
import org.apache.hadoop.fs._
@@ -378,7 +379,7 @@ trait ClientBase extends Logging {
378379
}
379380
}
380381

381-
object ClientBase {
382+
object ClientBase extends Logging {
382383
val SPARK_JAR: String = "__spark__.jar"
383384
val APP_JAR: String = "__app__.jar"
384385
val LOG4J_PROP: String = "log4j.properties"
@@ -388,58 +389,78 @@ object ClientBase {
388389

389390
def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
390391

391-
// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
392-
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
393-
val classpathEntries = Option(conf.getStrings(
394-
YarnConfiguration.YARN_APPLICATION_CLASSPATH)).getOrElse(
395-
getDefaultYarnApplicationClasspath())
396-
if (classpathEntries != null) {
397-
for (c <- classpathEntries) {
398-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim,
399-
File.pathSeparator)
400-
}
392+
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = {
393+
val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
394+
for (c <- classPathElementsToAdd.flatten) {
395+
YarnSparkHadoopUtil.addToEnvironment(
396+
env,
397+
Environment.CLASSPATH.name,
398+
c.trim,
399+
File.pathSeparator)
401400
}
401+
classPathElementsToAdd
402+
}
402403

403-
val mrClasspathEntries = Option(conf.getStrings(
404-
"mapreduce.application.classpath")).getOrElse(
405-
getDefaultMRApplicationClasspath())
406-
if (mrClasspathEntries != null) {
407-
for (c <- mrClasspathEntries) {
408-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim,
409-
File.pathSeparator)
410-
}
411-
}
404+
private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
405+
Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match {
406+
case Some(s) => Some(s.toSeq)
407+
case None => getDefaultYarnApplicationClasspath
412408
}
413409

414-
def getDefaultYarnApplicationClasspath(): Array[String] = {
415-
try {
416-
val field = classOf[MRJobConfig].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
417-
field.get(null).asInstanceOf[Array[String]]
418-
} catch {
419-
case err: NoSuchFieldError => null
420-
case err: NoSuchFieldException => null
410+
private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] =
411+
Option(conf.getStrings("mapreduce.application.classpath")) match {
412+
case Some(s) => Some(s.toSeq)
413+
case None => getDefaultMRApplicationClasspath
414+
}
415+
416+
def getDefaultYarnApplicationClasspath: Option[Seq[String]] = {
417+
val triedDefault = Try[Seq[String]] {
418+
val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
419+
val value = field.get(null).asInstanceOf[Array[String]]
420+
value.toSeq
421+
} recoverWith {
422+
case e: NoSuchFieldException => Success(Seq.empty[String])
421423
}
424+
425+
triedDefault match {
426+
case f: Failure[_] =>
427+
logError("Unable to obtain the default YARN Application classpath.", f.exception)
428+
case s: Success[_] =>
429+
logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}")
430+
}
431+
432+
triedDefault.toOption
422433
}
423434

424435
/**
425436
* In Hadoop 0.23, the MR application classpath comes with the YARN application
426437
* classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String.
427438
* So we need to use reflection to retrieve it.
428439
*/
429-
def getDefaultMRApplicationClasspath(): Array[String] = {
430-
try {
440+
def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
441+
val triedDefault = Try[Seq[String]] {
431442
val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH")
432-
if (field.getType == classOf[String]) {
433-
StringUtils.getStrings(field.get(null).asInstanceOf[String])
443+
val value = if (field.getType == classOf[String]) {
444+
StringUtils.getStrings(field.get(null).asInstanceOf[String]).toArray
434445
} else {
435446
field.get(null).asInstanceOf[Array[String]]
436447
}
437-
} catch {
438-
case err: NoSuchFieldError => null
439-
case err: NoSuchFieldException => null
448+
value.toSeq
449+
} recoverWith {
450+
case e: NoSuchFieldException => Success(Seq.empty[String])
440451
}
452+
453+
triedDefault match {
454+
case f: Failure[_] =>
455+
logError("Unable to obtain the default MR Application classpath.", f.exception)
456+
case s: Success[_] =>
457+
logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}")
458+
}
459+
460+
triedDefault.toOption
441461
}
442462

463+
443464
/**
444465
* Returns the java command line argument for setting up log4j. If there is a log4j.properties
445466
* in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.yarn
19+
20+
import java.net.URI
21+
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.mapreduce.MRJobConfig
24+
import org.apache.hadoop.yarn.conf.YarnConfiguration
25+
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
26+
27+
import org.scalatest.FunSuite
28+
import org.scalatest.matchers.ShouldMatchers._
29+
30+
import scala.collection.JavaConversions._
31+
import scala.collection.mutable.{ HashMap => MutableHashMap }
32+
import scala.util.Try
33+
34+
35+
class ClientBaseSuite extends FunSuite {
36+
37+
test("default Yarn application classpath") {
38+
ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
39+
}
40+
41+
test("default MR application classpath") {
42+
ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
43+
}
44+
45+
test("resultant classpath for an application that defines a classpath for YARN") {
46+
withAppConf(Fixtures.mapYARNAppConf) { conf =>
47+
val env = newEnv
48+
ClientBase.populateHadoopClasspath(conf, env)
49+
classpath(env) should be(
50+
flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath))
51+
}
52+
}
53+
54+
test("resultant classpath for an application that defines a classpath for MR") {
55+
withAppConf(Fixtures.mapMRAppConf) { conf =>
56+
val env = newEnv
57+
ClientBase.populateHadoopClasspath(conf, env)
58+
classpath(env) should be(
59+
flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
60+
}
61+
}
62+
63+
test("resultant classpath for an application that defines both classpaths, YARN and MR") {
64+
withAppConf(Fixtures.mapAppConf) { conf =>
65+
val env = newEnv
66+
ClientBase.populateHadoopClasspath(conf, env)
67+
classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
68+
}
69+
}
70+
71+
object Fixtures {
72+
73+
val knownDefYarnAppCP: Seq[String] =
74+
getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration],
75+
"DEFAULT_YARN_APPLICATION_CLASSPATH",
76+
Seq[String]())(a => a.toSeq)
77+
78+
79+
val knownDefMRAppCP: Seq[String] =
80+
getFieldValue[String, Seq[String]](classOf[MRJobConfig],
81+
"DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH",
82+
Seq[String]())(a => a.split(","))
83+
84+
val knownYARNAppCP = Some(Seq("/known/yarn/path"))
85+
86+
val knownMRAppCP = Some(Seq("/known/mr/path"))
87+
88+
val mapMRAppConf =
89+
Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get)
90+
91+
val mapYARNAppConf =
92+
Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get)
93+
94+
val mapAppConf = mapYARNAppConf ++ mapMRAppConf
95+
}
96+
97+
def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) {
98+
val conf = new Configuration
99+
m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") }
100+
testCode(conf)
101+
}
102+
103+
def newEnv = MutableHashMap[String, String]()
104+
105+
def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;")
106+
107+
def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray
108+
109+
def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B =
110+
Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults)
111+
112+
}

0 commit comments

Comments
 (0)