1818package org .apache .spark .deploy .yarn
1919
2020import java .io .File
21+ import java .lang .reflect .UndeclaredThrowableException
2122import java .nio .charset .StandardCharsets .UTF_8
23+ import java .security .PrivilegedExceptionAction
2224import java .util .regex .Matcher
2325import java .util .regex .Pattern
2426
@@ -156,7 +158,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
156158 */
157159 def obtainTokenForHiveMetastore (conf : Configuration ): Option [Token [DelegationTokenIdentifier ]] = {
158160 try {
159- obtainTokenForHiveMetastoreInner(conf, UserGroupInformation .getCurrentUser().getUserName )
161+ obtainTokenForHiveMetastoreInner(conf)
160162 } catch {
161163 case e : ClassNotFoundException =>
162164 logInfo(s " Hive class not found $e" )
@@ -171,8 +173,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
171173 * @param username the username of the principal requesting the delegating token.
172174 * @return a delegation token
173175 */
174- private [yarn] def obtainTokenForHiveMetastoreInner (conf : Configuration ,
175- username : String ) : Option [Token [DelegationTokenIdentifier ]] = {
176+ private [yarn] def obtainTokenForHiveMetastoreInner (conf : Configuration ) :
177+ Option [Token [DelegationTokenIdentifier ]] = {
176178 val mirror = universe.runtimeMirror(Utils .getContextOrSparkClassLoader)
177179
178180 // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
@@ -187,11 +189,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
187189
188190 // Check for local metastore
189191 if (metastoreUri.nonEmpty) {
190- require(username.nonEmpty, " Username undefined" )
191192 val principalKey = " hive.metastore.kerberos.principal"
192193 val principal = hiveConf.getTrimmed(principalKey, " " )
193194 require(principal.nonEmpty, " Hive principal $principalKey undefined" )
194- logDebug(s " Getting Hive delegation token for $username against $principal at $metastoreUri" )
195+ val currentUser = UserGroupInformation .getCurrentUser()
196+ logDebug(s " Getting Hive delegation token for ${currentUser.getUserName()} against " +
197+ s " $principal at $metastoreUri" )
195198 val hiveClass = mirror.classLoader.loadClass(" org.apache.hadoop.hive.ql.metadata.Hive" )
196199 val closeCurrent = hiveClass.getMethod(" closeCurrent" )
197200 try {
@@ -200,12 +203,14 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
200203 classOf [String ], classOf [String ])
201204 val getHive = hiveClass.getMethod(" get" , hiveConfClass)
202205
203- // invoke
204- val hive = getHive.invoke(null , hiveConf)
205- val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf [String ]
206- val hive2Token = new Token [DelegationTokenIdentifier ]()
207- hive2Token.decodeFromUrlString(tokenStr)
208- Some (hive2Token)
206+ doAsRealUser {
207+ val hive = getHive.invoke(null , hiveConf)
208+ val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
209+ .asInstanceOf [String ]
210+ val hive2Token = new Token [DelegationTokenIdentifier ]()
211+ hive2Token.decodeFromUrlString(tokenStr)
212+ Some (hive2Token)
213+ }
209214 } finally {
210215 Utils .tryLogNonFatalError {
211216 closeCurrent.invoke(null )
@@ -216,6 +221,26 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
216221 None
217222 }
218223 }
224+
225+ /**
226+ * Run some code as the real logged in user (which may differ from the current user, for
227+ * example, when using proxying).
228+ */
229+ private def doAsRealUser [T ](fn : => T ): T = {
230+ val currentUser = UserGroupInformation .getCurrentUser()
231+ val realUser = Option (currentUser.getRealUser()).getOrElse(currentUser)
232+
233+ // For some reason the Scala-generated anonymous class ends up causing an
234+ // UndeclaredThrowableException, even if you annotate the method with @throws.
235+ try {
236+ realUser.doAs(new PrivilegedExceptionAction [T ]() {
237+ override def run (): T = fn
238+ })
239+ } catch {
240+ case e : UndeclaredThrowableException => throw Option (e.getCause()).getOrElse(e)
241+ }
242+ }
243+
219244}
220245
221246object YarnSparkHadoopUtil {
0 commit comments