Skip to content

Commit a63d747

Browse files
author
Marcelo Vanzin
committed
[SPARK-13478] [yarn] Use real user when fetching delegation tokens.
The Hive client library is not smart enough to notice that the current user is a proxy user; so when using a proxy user, it fails to fetch delegation tokens from the metastore because of a missing kerberos TGT for the current user. To fix it, just run the code that fetches the delegation token as the real logged in user. Tested on a kerberos cluster both submitting normally and with a proxy user; Hive and HBase tokens are retrieved correctly in both cases.
1 parent 230bbea commit a63d747

File tree

2 files changed

+38
-12
lines changed

2 files changed

+38
-12
lines changed

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.spark.deploy.yarn
1919

2020
import java.io.File
21+
import java.lang.reflect.UndeclaredThrowableException
2122
import java.nio.charset.StandardCharsets.UTF_8
23+
import java.security.PrivilegedExceptionAction
2224
import java.util.regex.Matcher
2325
import java.util.regex.Pattern
2426

@@ -156,7 +158,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
156158
*/
157159
def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
158160
try {
159-
obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
161+
obtainTokenForHiveMetastoreInner(conf)
160162
} catch {
161163
case e: ClassNotFoundException =>
162164
logInfo(s"Hive class not found $e")
@@ -171,8 +173,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
171173
* @param username the username of the principal requesting the delegating token.
172174
* @return a delegation token
173175
*/
174-
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
175-
username: String): Option[Token[DelegationTokenIdentifier]] = {
176+
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration):
177+
Option[Token[DelegationTokenIdentifier]] = {
176178
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
177179

178180
// the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
@@ -187,11 +189,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
187189

188190
// Check for local metastore
189191
if (metastoreUri.nonEmpty) {
190-
require(username.nonEmpty, "Username undefined")
191192
val principalKey = "hive.metastore.kerberos.principal"
192193
val principal = hiveConf.getTrimmed(principalKey, "")
193194
require(principal.nonEmpty, "Hive principal $principalKey undefined")
194-
logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
195+
val currentUser = UserGroupInformation.getCurrentUser()
196+
logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
197+
s"$principal at $metastoreUri")
195198
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
196199
val closeCurrent = hiveClass.getMethod("closeCurrent")
197200
try {
@@ -200,12 +203,14 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
200203
classOf[String], classOf[String])
201204
val getHive = hiveClass.getMethod("get", hiveConfClass)
202205

203-
// invoke
204-
val hive = getHive.invoke(null, hiveConf)
205-
val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
206-
val hive2Token = new Token[DelegationTokenIdentifier]()
207-
hive2Token.decodeFromUrlString(tokenStr)
208-
Some(hive2Token)
206+
doAsRealUser {
207+
val hive = getHive.invoke(null, hiveConf)
208+
val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
209+
.asInstanceOf[String]
210+
val hive2Token = new Token[DelegationTokenIdentifier]()
211+
hive2Token.decodeFromUrlString(tokenStr)
212+
Some(hive2Token)
213+
}
209214
} finally {
210215
Utils.tryLogNonFatalError {
211216
closeCurrent.invoke(null)
@@ -253,9 +258,11 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
253258
val confCreate = mirror.classLoader.
254259
loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
255260
getMethod("create", classOf[Configuration])
261+
256262
val obtainToken = mirror.classLoader.
257263
loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
258264
getMethod("obtainToken", classOf[Configuration])
265+
259266
val hbaseConf = confCreate.invoke(null, conf).asInstanceOf[Configuration]
260267
if ("kerberos" == hbaseConf.get("hbase.security.authentication")) {
261268
logDebug("Attempting to fetch HBase security token.")
@@ -265,6 +272,25 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
265272
}
266273
}
267274

275+
/**
276+
* Run some code as the real logged in user (which may differ from the current user, for
277+
* example, when using proxying).
278+
*/
279+
private def doAsRealUser[T](fn: => T): T = {
280+
val currentUser = UserGroupInformation.getCurrentUser()
281+
val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
282+
283+
// For some reason the Scala-generated anonymous class ends up causing an
284+
// UndeclaredThrowableException, even if you annotate the method with @throws.
285+
try {
286+
realUser.doAs(new PrivilegedExceptionAction[T]() {
287+
override def run(): T = fn
288+
})
289+
} catch {
290+
case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
291+
}
292+
}
293+
268294
}
269295

270296
object YarnSparkHadoopUtil {

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
255255
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
256256
val util = new YarnSparkHadoopUtil
257257
assertNestedHiveException(intercept[InvocationTargetException] {
258-
util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
258+
util.obtainTokenForHiveMetastoreInner(hadoopConf)
259259
})
260260
assertNestedHiveException(intercept[InvocationTargetException] {
261261
util.obtainTokenForHiveMetastore(hadoopConf)

0 commit comments

Comments
 (0)