Skip to content

Commit 1f6f583

Browse files
committed
[FLINK-21700][yarn]Allow to disable fetching Hadoop delegation token on Yarn
1 parent 7e49b1a commit 1f6f583

File tree

5 files changed

+66
-11
lines changed

5 files changed

+66
-11
lines changed

docs/layouts/shortcodes/generated/security_auth_kerberos_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88
</tr>
99
</thead>
1010
<tbody>
11+
<tr>
12+
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
13+
<td style="word-wrap: break-word;">true</td>
14+
<td>Boolean</td>
15+
<td>Indicates whether to fetch delegation token. If true, Flink will fetch HDFS/HBase delegation tokens and inject into AM container.If false, Flink will assume that the job has delegation tokens and will not fetch them.This applies to submission mechanisms like Oozie, which will obtain delegation tokens before submitting Flink Job.</td>
16+
</tr>
1117
<tr>
1218
<td><h5>security.kerberos.login.contexts</h5></td>
1319
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/security_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>List&lt;String&gt;</td>
1515
<td>List of factories that should be used to instantiate a security context. If multiple are configured, Flink will use the first compatible factory. You should have a NoOpSecurityContextFactory in this list as a fallback.</td>
1616
</tr>
17+
<tr>
18+
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
19+
<td style="word-wrap: break-word;">true</td>
20+
<td>Boolean</td>
21+
<td>Indicates whether to fetch delegation token. If true, Flink will fetch HDFS/HBase delegation tokens and inject into AM container.If false, Flink will assume that the job has delegation tokens and will not fetch them.This applies to submission mechanisms like Oozie, which will obtain delegation tokens before submitting Flink Job.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>security.kerberos.krb5-conf.path</h5></td>
1925
<td style="word-wrap: break-word;">(none)</td>

flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@ public class SecurityOptions {
108108
+ " (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for"
109109
+ " Kafka authentication)");
110110

111+
@Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
112+
public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN =
113+
key("security.kerberos.fetch.delegation-token")
114+
.booleanType()
115+
.defaultValue(true)
116+
.withDescription(
117+
"Indicates whether to fetch delegation token. If true, Flink will fetch "
118+
+ "HDFS/HBase delegation tokens and inject into AM container."
119+
+ "If false, Flink will assume that the job has delegation tokens and will not fetch them."
120+
+ "This applies to submission mechanisms like Oozie, which will obtain delegation tokens"
121+
+ " before submitting Flink Job.");
122+
111123
// ------------------------------------------------------------------------
112124
// ZooKeeper Security Options
113125
// ------------------------------------------------------------------------

flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,23 @@ private static LocalResource registerLocalResource(
197197
}
198198

199199
public static void setTokensFor(
200-
ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)
200+
ContainerLaunchContext amContainer,
201+
List<Path> paths,
202+
Configuration conf,
203+
boolean obtainingDelegationTokens)
201204
throws IOException {
202205
Credentials credentials = new Credentials();
203-
// for HDFS
204-
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
205-
// for HBase
206-
obtainTokenForHBase(credentials, conf);
206+
207+
if (obtainingDelegationTokens) {
208+
LOG.info("Obtaining delegation tokens for HDFS and HBase.");
209+
// for HDFS
210+
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
211+
// for HBase
212+
obtainTokenForHBase(credentials, conf);
213+
} else {
214+
LOG.info("Disable obtaining delegation tokens.");
215+
}
216+
207217
// for user
208218
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
209219

flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
import java.net.URI;
106106
import java.net.URLDecoder;
107107
import java.nio.charset.Charset;
108+
import java.util.ArrayList;
108109
import java.util.Collection;
109110
import java.util.Collections;
110111
import java.util.HashMap;
@@ -529,6 +530,17 @@ private ClusterClientProvider<ApplicationId> deployInternal(
529530
"Hadoop security with Kerberos is enabled but the login user "
530531
+ "does not have Kerberos credentials or delegation tokens!");
531532
}
533+
534+
boolean kerberosFetchDTEnabled =
535+
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
536+
boolean yarnAccessFSEnabled =
537+
flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS) != null;
538+
if (!kerberosFetchDTEnabled && yarnAccessFSEnabled) {
539+
throw new RuntimeException(
540+
"Disable fetching kerberos delegation tokens but setting "
541+
+ "yarn.security.kerberos.additionalFileSystems option!"
542+
+ "These two options are mutually exclusive.");
543+
}
532544
}
533545

534546
isReadyForDeployment(clusterSpecification);
@@ -1079,15 +1091,24 @@ private ApplicationReport startAppMaster(
10791091

10801092
// setup security tokens
10811093
if (UserGroupInformation.isSecurityEnabled()) {
1082-
// set HDFS delegation tokens when security is enabled
1083-
LOG.info("Adding delegation token to the AM container.");
1084-
List<Path> yarnAccessList =
1085-
ConfigUtils.decodeListFromConfig(
1086-
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
1094+
List<Path> yarnAccessList = new ArrayList<>();
1095+
1096+
Boolean kerberosFetchDelegationTokenEnabled =
1097+
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
1098+
1099+
if (kerberosFetchDelegationTokenEnabled) {
1100+
// set HDFS delegation tokens when security is enabled
1101+
LOG.info("Adding delegation token to the AM container.");
1102+
yarnAccessList =
1103+
ConfigUtils.decodeListFromConfig(
1104+
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
1105+
}
1106+
10871107
Utils.setTokensFor(
10881108
amContainer,
10891109
ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
1090-
yarnConfiguration);
1110+
yarnConfiguration,
1111+
kerberosFetchDelegationTokenEnabled);
10911112
}
10921113

10931114
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());

0 commit comments

Comments
 (0)