Skip to content

Commit 808fe7e

Browse files
committed
[FLINK-21700][yarn]Allow to disable fetching Hadoop delegation token on Yarn
Change-Id: Id2468edf652ee5c2c4b76aab6b703c6bb18ec30c
1 parent d97c68b commit 808fe7e

File tree

4 files changed

+46
-16
lines changed

4 files changed

+46
-16
lines changed

docs/layouts/shortcodes/generated/yarn_config_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,12 @@
140140
<td>List&lt;String&gt;</td>
141141
<td>A comma-separated list of additional Kerberos-secured Hadoop filesystems Flink is going to access. For example, yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. The client submitting to YARN needs to have access to these file systems to retrieve the security tokens.</td>
142142
</tr>
143+
<tr>
144+
<td><h5>yarn.security.kerberos.fetch.delegationToken.enabled</h5></td>
145+
<td style="word-wrap: break-word;">true</td>
146+
<td>Boolean</td>
147+
<td>When this is true Flink will fetch HDFS/HBase delegation token injected into AM container.</td>
148+
</tr>
143149
<tr>
144150
<td><h5>yarn.security.kerberos.localized-keytab-path</h5></td>
145151
<td style="word-wrap: break-word;">"krb5.keytab"</td>

flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -197,21 +197,28 @@ private static LocalResource registerLocalResource(
197197
}
198198

199199
public static void setTokensFor(
200-
ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)
200+
ContainerLaunchContext amContainer,
201+
List<Path> paths,
202+
Configuration conf,
203+
boolean yarnFetchDelegationEnabled)
201204
throws IOException {
202205
Credentials credentials = new Credentials();
203-
// for HDFS
204-
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
205-
// for HBase
206-
obtainTokenForHBase(credentials, conf);
206+
207+
if (yarnFetchDelegationEnabled) {
208+
// for HDFS
209+
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
210+
// for HBase
211+
obtainTokenForHBase(credentials, conf);
212+
}
213+
207214
// for user
208215
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
209216

210217
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
211218
for (Token<? extends TokenIdentifier> token : usrTok) {
212-
final Text id = new Text(token.getIdentifier());
213-
LOG.info("Adding user token " + id + " with " + token);
214-
credentials.addToken(id, token);
219+
final Text alias = new Text(token.getService());
220+
LOG.info("Adding user token " + alias + " with " + token);
221+
credentials.addToken(alias, token);
215222
}
216223
try (DataOutputBuffer dob = new DataOutputBuffer()) {
217224
credentials.writeTokenStorageToStream(dob);
@@ -560,8 +567,8 @@ static ContainerLaunchContext createTaskExecutorContext(
560567
Collection<Token<? extends TokenIdentifier>> userTokens = cred.getAllTokens();
561568
for (Token<? extends TokenIdentifier> token : userTokens) {
562569
if (!token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
563-
final Text id = new Text(token.getIdentifier());
564-
taskManagerCred.addToken(id, token);
570+
final Text alias = new Text(token.getService());
571+
taskManagerCred.addToken(alias, token);
565572
}
566573
}
567574

flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
import java.net.URI;
106106
import java.net.URLDecoder;
107107
import java.nio.charset.Charset;
108+
import java.util.ArrayList;
108109
import java.util.Collection;
109110
import java.util.Collections;
110111
import java.util.HashMap;
@@ -1079,15 +1080,24 @@ private ApplicationReport startAppMaster(
10791080

10801081
// setup security tokens
10811082
if (UserGroupInformation.isSecurityEnabled()) {
1082-
// set HDFS delegation tokens when security is enabled
1083-
LOG.info("Adding delegation token to the AM container.");
1084-
List<Path> yarnAccessList =
1085-
ConfigUtils.decodeListFromConfig(
1086-
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
1083+
List<Path> yarnAccessList = new ArrayList<>();
1084+
1085+
Boolean yarnFetchDelegationTokenEnabled =
1086+
configuration.getBoolean(YarnConfigOptions.YARN_SECURITY_ENABLED);
1087+
1088+
if (yarnFetchDelegationTokenEnabled) {
1089+
// set HDFS delegation tokens when security is enabled
1090+
LOG.info("Adding delegation token to the AM container.");
1091+
yarnAccessList =
1092+
ConfigUtils.decodeListFromConfig(
1093+
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
1094+
}
1095+
10871096
Utils.setTokensFor(
10881097
amContainer,
10891098
ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
1090-
yarnConfiguration);
1099+
yarnConfiguration,
1100+
yarnFetchDelegationTokenEnabled);
10911101
}
10921102

10931103
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());

flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,13 @@ public class YarnConfigOptions {
344344
.withDescription(
345345
"A comma-separated list of additional Kerberos-secured Hadoop filesystems Flink is going to access. For example, yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. The client submitting to YARN needs to have access to these file systems to retrieve the security tokens.");
346346

347+
public static final ConfigOption<Boolean> YARN_SECURITY_ENABLED =
348+
key("yarn.security.kerberos.fetch.delegationToken.enabled")
349+
.booleanType()
350+
.defaultValue(true)
351+
.withDescription(
352+
"When this is true Flink will fetch HDFS/HBase delegation token injected into AM container.");
353+
347354
@SuppressWarnings("unused")
348355
public static final ConfigOption<String> HADOOP_CONFIG_KEY =
349356
key("flink.hadoop.<key>")

0 commit comments

Comments
 (0)