-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-1522] : YARN ClientBase throws a NPE if there is no YARN Application CP #433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer | |
|
|
||
| import scala.collection.JavaConversions._ | ||
| import scala.collection.mutable.{HashMap, ListBuffer, Map} | ||
| import scala.util.{Try, Success, Failure} | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs._ | ||
|
|
@@ -378,7 +379,7 @@ trait ClientBase extends Logging { | |
| } | ||
| } | ||
|
|
||
| object ClientBase { | ||
| object ClientBase extends Logging { | ||
| val SPARK_JAR: String = "__spark__.jar" | ||
| val APP_JAR: String = "__app__.jar" | ||
| val LOG4J_PROP: String = "log4j.properties" | ||
|
|
@@ -388,58 +389,78 @@ object ClientBase { | |
|
|
||
| def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head) | ||
|
|
||
| // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps | ||
| def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) { | ||
| val classpathEntries = Option(conf.getStrings( | ||
| YarnConfiguration.YARN_APPLICATION_CLASSPATH)).getOrElse( | ||
| getDefaultYarnApplicationClasspath()) | ||
| if (classpathEntries != null) { | ||
| for (c <- classpathEntries) { | ||
| YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim, | ||
| File.pathSeparator) | ||
| } | ||
| def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = { | ||
| val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) | ||
| for (c <- classPathElementsToAdd.flatten) { | ||
| YarnSparkHadoopUtil.addToEnvironment( | ||
| env, | ||
| Environment.CLASSPATH.name, | ||
| c.trim, | ||
| File.pathSeparator) | ||
| } | ||
| classPathElementsToAdd | ||
| } | ||
|
|
||
| val mrClasspathEntries = Option(conf.getStrings( | ||
| "mapreduce.application.classpath")).getOrElse( | ||
| getDefaultMRApplicationClasspath()) | ||
| if (mrClasspathEntries != null) { | ||
| for (c <- mrClasspathEntries) { | ||
| YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim, | ||
| File.pathSeparator) | ||
| } | ||
| } | ||
| private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] = | ||
| Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match { | ||
| case Some(s) => Some(s.toSeq) | ||
| case None => getDefaultYarnApplicationClasspath | ||
| } | ||
|
|
||
| def getDefaultYarnApplicationClasspath(): Array[String] = { | ||
| try { | ||
| val field = classOf[MRJobConfig].getField("DEFAULT_YARN_APPLICATION_CLASSPATH") | ||
| field.get(null).asInstanceOf[Array[String]] | ||
| } catch { | ||
| case err: NoSuchFieldError => null | ||
| case err: NoSuchFieldException => null | ||
| private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] = | ||
| Option(conf.getStrings("mapreduce.application.classpath")) match { | ||
| case Some(s) => Some(s.toSeq) | ||
| case None => getDefaultMRApplicationClasspath | ||
| } | ||
|
|
||
| def getDefaultYarnApplicationClasspath: Option[Seq[String]] = { | ||
| val triedDefault = Try[Seq[String]] { | ||
| val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH") | ||
| val value = field.get(null).asInstanceOf[Array[String]] | ||
| value.toSeq | ||
| } recoverWith { | ||
| case e: NoSuchFieldException => Success(Seq.empty[String]) | ||
| } | ||
|
|
||
| triedDefault match { | ||
| case f: Failure[_] => | ||
| logError("Unable to obtain the default YARN Application classpath.", f.exception) | ||
| case s: Success[_] => | ||
| logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}") | ||
| } | ||
|
|
||
| triedDefault.toOption | ||
| } | ||
|
|
||
| /** | ||
| * In Hadoop 0.23, the MR application classpath comes with the YARN application | ||
| * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String. | ||
| * So we need to use reflection to retrieve it. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we not need to handle NoSuchFieldError? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sryza I removed the A linkage error will occur when lets say you have a dependency flagged as provided and your code points to one if its Class fields, for example There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, thanks for clarifying. |
||
| */ | ||
| def getDefaultMRApplicationClasspath(): Array[String] = { | ||
| try { | ||
| def getDefaultMRApplicationClasspath: Option[Seq[String]] = { | ||
| val triedDefault = Try[Seq[String]] { | ||
| val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH") | ||
| if (field.getType == classOf[String]) { | ||
| StringUtils.getStrings(field.get(null).asInstanceOf[String]) | ||
| val value = if (field.getType == classOf[String]) { | ||
| StringUtils.getStrings(field.get(null).asInstanceOf[String]).toArray | ||
| } else { | ||
| field.get(null).asInstanceOf[Array[String]] | ||
| } | ||
| } catch { | ||
| case err: NoSuchFieldError => null | ||
| case err: NoSuchFieldException => null | ||
| value.toSeq | ||
| } recoverWith { | ||
| case e: NoSuchFieldException => Success(Seq.empty[String]) | ||
| } | ||
|
|
||
| triedDefault match { | ||
| case f: Failure[_] => | ||
| logError("Unable to obtain the default MR Application classpath.", f.exception) | ||
| case s: Success[_] => | ||
| logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}") | ||
| } | ||
|
|
||
| triedDefault.toOption | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Returns the java command line argument for setting up log4j. If there is a log4j.properties | ||
| * in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,112 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.deploy.yarn | ||
|
|
||
| import java.net.URI | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.mapreduce.MRJobConfig | ||
| import org.apache.hadoop.yarn.conf.YarnConfiguration | ||
| import org.apache.hadoop.yarn.api.ApplicationConstants.Environment | ||
|
|
||
| import org.scalatest.FunSuite | ||
| import org.scalatest.matchers.ShouldMatchers._ | ||
|
|
||
| import scala.collection.JavaConversions._ | ||
| import scala.collection.mutable.{ HashMap => MutableHashMap } | ||
| import scala.util.Try | ||
|
|
||
|
|
||
| class ClientBaseSuite extends FunSuite { | ||
|
|
||
| test("default Yarn application classpath") { | ||
| ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) | ||
| } | ||
|
|
||
| test("default MR application classpath") { | ||
| ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) | ||
| } | ||
|
|
||
| test("resultant classpath for an application that defines a classpath for YARN") { | ||
| withAppConf(Fixtures.mapYARNAppConf) { conf => | ||
| val env = newEnv | ||
| ClientBase.populateHadoopClasspath(conf, env) | ||
| classpath(env) should be( | ||
| flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath)) | ||
| } | ||
| } | ||
|
|
||
| test("resultant classpath for an application that defines a classpath for MR") { | ||
| withAppConf(Fixtures.mapMRAppConf) { conf => | ||
| val env = newEnv | ||
| ClientBase.populateHadoopClasspath(conf, env) | ||
| classpath(env) should be( | ||
| flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) | ||
| } | ||
| } | ||
|
|
||
| test("resultant classpath for an application that defines both classpaths, YARN and MR") { | ||
| withAppConf(Fixtures.mapAppConf) { conf => | ||
| val env = newEnv | ||
| ClientBase.populateHadoopClasspath(conf, env) | ||
| classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) | ||
| } | ||
| } | ||
|
|
||
| object Fixtures { | ||
|
|
||
| val knownDefYarnAppCP: Seq[String] = | ||
| getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration], | ||
| "DEFAULT_YARN_APPLICATION_CLASSPATH", | ||
| Seq[String]())(a => a.toSeq) | ||
|
|
||
|
|
||
| val knownDefMRAppCP: Seq[String] = | ||
| getFieldValue[String, Seq[String]](classOf[MRJobConfig], | ||
| "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH", | ||
| Seq[String]())(a => a.split(",")) | ||
|
|
||
| val knownYARNAppCP = Some(Seq("/known/yarn/path")) | ||
|
|
||
| val knownMRAppCP = Some(Seq("/known/mr/path")) | ||
|
|
||
| val mapMRAppConf = | ||
| Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get) | ||
|
|
||
| val mapYARNAppConf = | ||
| Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get) | ||
|
|
||
| val mapAppConf = mapYARNAppConf ++ mapMRAppConf | ||
| } | ||
|
|
||
| def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) { | ||
| val conf = new Configuration | ||
| m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") } | ||
| testCode(conf) | ||
| } | ||
|
|
||
| def newEnv = MutableHashMap[String, String]() | ||
|
|
||
| def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;") | ||
|
|
||
| def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray | ||
|
|
||
| def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tgravescs tested for Hadoop 0.23 and 2.4.0. A bit silly that I made such mistake when the main issue was actually accessing through reflection such fields within the |
||
| Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults) | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I have misunderstood something but do we really need to use reflection to get DEFAULT_YARN_APPLICATION_CLASSPATH? Can't we simply get it as YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH (similar to YarnConfiguration.YARN_APPLICATION_CLASSPATH)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent question @rahulsinghaliitd is all related with the "instability" of the YARN API and the different versions people are using. See https://issues.apache.org/jira/browse/SPARK-1233
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it!
I now see that the DEFAULT version is not present in hadoop 0.23. Then I wondered that if we truly want to be YARN API agnostic then shouldn't we get YARN_APPLICATION_CLASSPATH also via reflection. But I guess it is safe to assume that YARN_APPLICATION_CLASSPATH is here to stay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rahulsinghaliitd that I hope.