Skip to content

Commit c5b8d96

Browse files
committed
[FLINK-21700][security] Add an option to disable credential retrieval on a secure cluster
1 parent bef98cf commit c5b8d96

File tree

5 files changed

+62
-18
lines changed

5 files changed

+62
-18
lines changed

docs/layouts/shortcodes/generated/security_auth_kerberos_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88
</tr>
99
</thead>
1010
<tbody>
11+
<tr>
12+
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
13+
<td style="word-wrap: break-word;">true</td>
14+
<td>Boolean</td>
15+
<td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens. </td>
16+
</tr>
1117
<tr>
1218
<td><h5>security.kerberos.login.contexts</h5></td>
1319
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/security_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>List&lt;String&gt;</td>
1515
<td>List of factories that should be used to instantiate a security context. If multiple are configured, Flink will use the first compatible factory. You should have a NoOpSecurityContextFactory in this list as a fallback.</td>
1616
</tr>
17+
<tr>
18+
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
19+
<td style="word-wrap: break-word;">true</td>
20+
<td>Boolean</td>
21+
<td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens. </td>
22+
</tr>
1723
<tr>
1824
<td><h5>security.kerberos.krb5-conf.path</h5></td>
1925
<td style="word-wrap: break-word;">(none)</td>

flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,20 @@ public class SecurityOptions {
108108
+ " (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for"
109109
+ " Kafka authentication)");
110110

111+
@Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
112+
public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN =
113+
key("security.kerberos.fetch.delegation-token")
114+
.booleanType()
115+
.defaultValue(true)
116+
.withDescription(
117+
"Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. "
118+
+ "Only HDFS and HBase are supported. It is used in Yarn deployments. "
119+
+ "If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. "
120+
+ "If false, Flink will assume that the delegation tokens are managed outside of Flink. "
121+
+ "As a consequence, it will not fetch delegation tokens for HDFS and HBase. "
122+
+ "You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, "
123+
+ "to handle delegation tokens. ");
124+
111125
// ------------------------------------------------------------------------
112126
// ZooKeeper Security Options
113127
// ------------------------------------------------------------------------

flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.hadoop.fs.FileSystem;
3535
import org.apache.hadoop.fs.Path;
3636
import org.apache.hadoop.io.DataOutputBuffer;
37-
import org.apache.hadoop.io.Text;
3837
import org.apache.hadoop.mapreduce.security.TokenCache;
3938
import org.apache.hadoop.security.Credentials;
4039
import org.apache.hadoop.security.UserGroupInformation;
@@ -197,21 +196,28 @@ private static LocalResource registerLocalResource(
197196
}
198197

199198
public static void setTokensFor(
200-
ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)
199+
ContainerLaunchContext amContainer,
200+
List<Path> paths,
201+
Configuration conf,
202+
boolean obtainingDelegationTokens)
201203
throws IOException {
202204
Credentials credentials = new Credentials();
203-
// for HDFS
204-
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
205-
// for HBase
206-
obtainTokenForHBase(credentials, conf);
205+
206+
if (obtainingDelegationTokens) {
207+
LOG.info("Obtaining delegation tokens for HDFS and HBase.");
208+
// for HDFS
209+
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
210+
// for HBase
211+
obtainTokenForHBase(credentials, conf);
212+
}
213+
207214
// for user
208215
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
209216

210217
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
211218
for (Token<? extends TokenIdentifier> token : usrTok) {
212-
final Text id = new Text(token.getIdentifier());
213-
LOG.info("Adding user token " + id + " with " + token);
214-
credentials.addToken(id, token);
219+
LOG.info("Adding user token " + token.getService() + " with " + token);
220+
credentials.addToken(token.getService(), token);
215221
}
216222
try (DataOutputBuffer dob = new DataOutputBuffer()) {
217223
credentials.writeTokenStorageToStream(dob);
@@ -560,8 +566,7 @@ static ContainerLaunchContext createTaskExecutorContext(
560566
Collection<Token<? extends TokenIdentifier>> userTokens = cred.getAllTokens();
561567
for (Token<? extends TokenIdentifier> token : userTokens) {
562568
if (!token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
563-
final Text id = new Text(token.getIdentifier());
564-
taskManagerCred.addToken(id, token);
569+
taskManagerCred.addToken(token.getService(), token);
565570
}
566571
}
567572

flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,16 @@ private ClusterClientProvider<ApplicationId> deployInternal(
529529
"Hadoop security with Kerberos is enabled but the login user "
530530
+ "does not have Kerberos credentials or delegation tokens!");
531531
}
532+
533+
boolean fetchToken =
534+
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
535+
boolean yarnAccessFSEnabled =
536+
flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS) != null;
537+
if (!fetchToken && yarnAccessFSEnabled) {
538+
throw new IllegalConfigurationException(
539+
"When security.kerberos.fetch.delegation-token is set, "
540+
+ "yarn.security.kerberos.additionalFileSystems must be unset.");
541+
}
532542
}
533543

534544
isReadyForDeployment(clusterSpecification);
@@ -1081,13 +1091,16 @@ private ApplicationReport startAppMaster(
10811091
if (UserGroupInformation.isSecurityEnabled()) {
10821092
// set HDFS delegation tokens when security is enabled
10831093
LOG.info("Adding delegation token to the AM container.");
1084-
List<Path> yarnAccessList =
1085-
ConfigUtils.decodeListFromConfig(
1086-
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
1087-
Utils.setTokensFor(
1088-
amContainer,
1089-
ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
1090-
yarnConfiguration);
1094+
List<Path> pathsToObtainToken = null;
1095+
boolean fetchToken =
1096+
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
1097+
if (fetchToken) {
1098+
List<Path> yarnAccessList =
1099+
ConfigUtils.decodeListFromConfig(
1100+
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
1101+
pathsToObtainToken = ListUtils.union(yarnAccessList, fileUploader.getRemotePaths());
1102+
}
1103+
Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
10911104
}
10921105

10931106
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());

0 commit comments

Comments
 (0)