Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.ByteBuffer

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
import scala.util.{Try, Success, Failure}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
Expand Down Expand Up @@ -378,7 +379,7 @@ trait ClientBase extends Logging {
}
}

object ClientBase {
object ClientBase extends Logging {
val SPARK_JAR: String = "__spark__.jar"
val APP_JAR: String = "__app__.jar"
val LOG4J_PROP: String = "log4j.properties"
Expand All @@ -388,58 +389,78 @@ object ClientBase {

def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)

// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
val classpathEntries = Option(conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH)).getOrElse(
getDefaultYarnApplicationClasspath())
if (classpathEntries != null) {
for (c <- classpathEntries) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim,
File.pathSeparator)
}
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = {
val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
for (c <- classPathElementsToAdd.flatten) {
YarnSparkHadoopUtil.addToEnvironment(
env,
Environment.CLASSPATH.name,
c.trim,
File.pathSeparator)
}
classPathElementsToAdd
}

val mrClasspathEntries = Option(conf.getStrings(
"mapreduce.application.classpath")).getOrElse(
getDefaultMRApplicationClasspath())
if (mrClasspathEntries != null) {
for (c <- mrClasspathEntries) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim,
File.pathSeparator)
}
}
private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match {
case Some(s) => Some(s.toSeq)
case None => getDefaultYarnApplicationClasspath
}

def getDefaultYarnApplicationClasspath(): Array[String] = {
try {
val field = classOf[MRJobConfig].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
field.get(null).asInstanceOf[Array[String]]
} catch {
case err: NoSuchFieldError => null
case err: NoSuchFieldException => null
private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] =
Option(conf.getStrings("mapreduce.application.classpath")) match {
case Some(s) => Some(s.toSeq)
case None => getDefaultMRApplicationClasspath
}

def getDefaultYarnApplicationClasspath: Option[Seq[String]] = {
val triedDefault = Try[Seq[String]] {
val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I have misunderstood something but do we really need to use reflection to get DEFAULT_YARN_APPLICATION_CLASSPATH? Can't we simply get it as YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH (similar to YarnConfiguration.YARN_APPLICATION_CLASSPATH)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent question @rahulsinghaliitd is all related with the "instability" of the YARN API and the different versions people are using. See https://issues.apache.org/jira/browse/SPARK-1233

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!

I now see that the DEFAULT version is not present in hadoop 0.23. Then I wondered that if we truly want to be YARN API agnostic then shouldn't we get YARN_APPLICATION_CLASSPATH also via reflection. But I guess it is safe to assume that YARN_APPLICATION_CLASSPATH is here to stay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahulsinghaliitd that I hope.

val value = field.get(null).asInstanceOf[Array[String]]
value.toSeq
} recoverWith {
case e: NoSuchFieldException => Success(Seq.empty[String])
}

triedDefault match {
case f: Failure[_] =>
logError("Unable to obtain the default YARN Application classpath.", f.exception)
case s: Success[_] =>
logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}")
}

triedDefault.toOption
}

/**
* In Hadoop 0.23, the MR application classpath comes with the YARN application
* classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String.
* So we need to use reflection to retrieve it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need to handle NoSuchFieldError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sryza I removed the NoSuchFieldError since in this context it can't happen, both classes and fields are accessed through reflection.

A linkage error will occur when lets say you have a dependency flagged as provided and your code points to one if its Class fields, for example MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH. Now since your version of such dependency has that class and field you can compile it. The error is thrown when at runtime the provided MRJobConfig is missing that field, ergo a linkage error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for clarifying.

*/
def getDefaultMRApplicationClasspath(): Array[String] = {
try {
def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
val triedDefault = Try[Seq[String]] {
val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH")
if (field.getType == classOf[String]) {
StringUtils.getStrings(field.get(null).asInstanceOf[String])
val value = if (field.getType == classOf[String]) {
StringUtils.getStrings(field.get(null).asInstanceOf[String]).toArray
} else {
field.get(null).asInstanceOf[Array[String]]
}
} catch {
case err: NoSuchFieldError => null
case err: NoSuchFieldException => null
value.toSeq
} recoverWith {
case e: NoSuchFieldException => Success(Seq.empty[String])
}

triedDefault match {
case f: Failure[_] =>
logError("Unable to obtain the default MR Application classpath.", f.exception)
case s: Success[_] =>
logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}")
}

triedDefault.toOption
}


/**
* Returns the java command line argument for setting up log4j. If there is a log4j.properties
* in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.yarn

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment

import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers._

import scala.collection.JavaConversions._
import scala.collection.mutable.{ HashMap => MutableHashMap }
import scala.util.Try


class ClientBaseSuite extends FunSuite {

test("default Yarn application classpath") {
ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
}

test("default MR application classpath") {
ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
}

test("resultant classpath for an application that defines a classpath for YARN") {
withAppConf(Fixtures.mapYARNAppConf) { conf =>
val env = newEnv
ClientBase.populateHadoopClasspath(conf, env)
classpath(env) should be(
flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath))
}
}

test("resultant classpath for an application that defines a classpath for MR") {
withAppConf(Fixtures.mapMRAppConf) { conf =>
val env = newEnv
ClientBase.populateHadoopClasspath(conf, env)
classpath(env) should be(
flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
}
}

test("resultant classpath for an application that defines both classpaths, YARN and MR") {
withAppConf(Fixtures.mapAppConf) { conf =>
val env = newEnv
ClientBase.populateHadoopClasspath(conf, env)
classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
}
}

object Fixtures {

val knownDefYarnAppCP: Seq[String] =
getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration],
"DEFAULT_YARN_APPLICATION_CLASSPATH",
Seq[String]())(a => a.toSeq)


val knownDefMRAppCP: Seq[String] =
getFieldValue[String, Seq[String]](classOf[MRJobConfig],
"DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH",
Seq[String]())(a => a.split(","))

val knownYARNAppCP = Some(Seq("/known/yarn/path"))

val knownMRAppCP = Some(Seq("/known/mr/path"))

val mapMRAppConf =
Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get)

val mapYARNAppConf =
Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get)

val mapAppConf = mapYARNAppConf ++ mapMRAppConf
}

def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) {
val conf = new Configuration
m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") }
testCode(conf)
}

def newEnv = MutableHashMap[String, String]()

def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;")

def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray

def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs tested for Hadoop 0.23 and 2.4.0. A bit silly that I made such mistake when the main issue was actually accessing through reflection such fields within the ClientBase classs.

Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults)

}