Skip to content

Commit 60812bb

Browse files
committed
[FLINK-21700][security] Add an option to disable credential retrieval on a secure cluster
1 parent 334dae6 commit 60812bb

File tree

5 files changed

+63
-12
lines changed

5 files changed

+63
-12
lines changed

docs/layouts/shortcodes/generated/security_auth_kerberos_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88
</tr>
99
</thead>
1010
<tbody>
11+
<tr>
12+
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
13+
<td style="word-wrap: break-word;">true</td>
14+
<td>Boolean</td>
15+
<td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens.</td>
16+
</tr>
1117
<tr>
1218
<td><h5>security.kerberos.login.contexts</h5></td>
1319
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/security_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>List&lt;String&gt;</td>
1515
<td>List of factories that should be used to instantiate a security context. If multiple are configured, Flink will use the first compatible factory. You should have a NoOpSecurityContextFactory in this list as a fallback.</td>
1616
</tr>
17+
<tr>
18+
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
19+
<td style="word-wrap: break-word;">true</td>
20+
<td>Boolean</td>
21+
<td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>security.kerberos.krb5-conf.path</h5></td>
1925
<td style="word-wrap: break-word;">(none)</td>

flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,20 @@ public class SecurityOptions {
108108
+ " (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for"
109109
+ " Kafka authentication)");
110110

111+
@Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
112+
public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN =
113+
key("security.kerberos.fetch.delegation-token")
114+
.booleanType()
115+
.defaultValue(true)
116+
.withDescription(
117+
"Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. "
118+
+ "Only HDFS and HBase are supported. It is used in Yarn deployments. "
119+
+ "If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. "
120+
+ "If false, Flink will assume that the delegation tokens are managed outside of Flink. "
121+
+ "As a consequence, it will not fetch delegation tokens for HDFS and HBase. "
122+
+ "You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, "
123+
+ "to handle delegation tokens.");
124+
111125
// ------------------------------------------------------------------------
112126
// ZooKeeper Security Options
113127
// ------------------------------------------------------------------------

flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,23 @@ private static LocalResource registerLocalResource(
197197
}
198198

199199
public static void setTokensFor(
200-
ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)
200+
ContainerLaunchContext amContainer,
201+
List<Path> paths,
202+
Configuration conf,
203+
boolean obtainingDelegationTokens)
201204
throws IOException {
202205
Credentials credentials = new Credentials();
203-
// for HDFS
204-
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
205-
// for HBase
206-
obtainTokenForHBase(credentials, conf);
206+
207+
if (obtainingDelegationTokens) {
208+
LOG.info("Obtaining delegation tokens for HDFS and HBase.");
209+
// for HDFS
210+
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
211+
// for HBase
212+
obtainTokenForHBase(credentials, conf);
213+
} else {
214+
LOG.info("Delegation token retrieval for HDFS and HBase is disabled.");
215+
}
216+
207217
// for user
208218
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
209219

flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,18 @@ private ClusterClientProvider<ApplicationId> deployInternal(
529529
"Hadoop security with Kerberos is enabled but the login user "
530530
+ "does not have Kerberos credentials or delegation tokens!");
531531
}
532+
533+
boolean fetchToken =
534+
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
535+
boolean yarnAccessFSEnabled =
536+
flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS) != null;
537+
if (!fetchToken && yarnAccessFSEnabled) {
538+
throw new IllegalConfigurationException(
539+
String.format(
540+
"When %s is disabled, %s must be disabled as well.",
541+
SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
542+
YarnConfigOptions.YARN_ACCESS.key()));
543+
}
532544
}
533545

534546
isReadyForDeployment(clusterSpecification);
@@ -1081,13 +1093,16 @@ private ApplicationReport startAppMaster(
10811093
if (UserGroupInformation.isSecurityEnabled()) {
10821094
// set HDFS delegation tokens when security is enabled
10831095
LOG.info("Adding delegation token to the AM container.");
1084-
List<Path> yarnAccessList =
1085-
ConfigUtils.decodeListFromConfig(
1086-
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
1087-
Utils.setTokensFor(
1088-
amContainer,
1089-
ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
1090-
yarnConfiguration);
1096+
List<Path> pathsToObtainToken = Collections.emptyList();
1097+
boolean fetchToken =
1098+
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
1099+
if (fetchToken) {
1100+
List<Path> yarnAccessList =
1101+
ConfigUtils.decodeListFromConfig(
1102+
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
1103+
pathsToObtainToken = ListUtils.union(yarnAccessList, fileUploader.getRemotePaths());
1104+
}
1105+
Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
10911106
}
10921107

10931108
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());

0 commit comments

Comments
 (0)