diff --git a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
index bd5d2748a44e0..580b51dae7397 100644
--- a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
+++ b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
@@ -8,6 +8,12 @@
+
+ security.kerberos.fetch.delegation-token |
+ true |
+ Boolean |
+ Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens. |
+
security.kerberos.login.contexts |
(none) |
diff --git a/docs/layouts/shortcodes/generated/security_configuration.html b/docs/layouts/shortcodes/generated/security_configuration.html
index 14b4de4313183..74fdee6d2eea0 100644
--- a/docs/layouts/shortcodes/generated/security_configuration.html
+++ b/docs/layouts/shortcodes/generated/security_configuration.html
@@ -14,6 +14,12 @@
List<String> |
List of factories that should be used to instantiate a security context. If multiple are configured, Flink will use the first compatible factory. You should have a NoOpSecurityContextFactory in this list as a fallback. |
+
+ security.kerberos.fetch.delegation-token |
+ true |
+ Boolean |
+ Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens. |
+
security.kerberos.krb5-conf.path |
(none) |
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index d774f1768e2ea..56c99ccc83f51 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -108,6 +108,20 @@ public class SecurityOptions {
+ " (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for"
+ " Kafka authentication)");
+ @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+ public static final ConfigOption KERBEROS_FETCH_DELEGATION_TOKEN =
+ key("security.kerberos.fetch.delegation-token")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. "
+ + "Only HDFS and HBase are supported. It is used in Yarn deployments. "
+ + "If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. "
+ + "If false, Flink will assume that the delegation tokens are managed outside of Flink. "
+ + "As a consequence, it will not fetch delegation tokens for HDFS and HBase. "
+ + "You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, "
+ + "to handle delegation tokens.");
+
// ------------------------------------------------------------------------
// ZooKeeper Security Options
// ------------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 361b8e58692c8..6309af76031f5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -197,13 +197,23 @@ private static LocalResource registerLocalResource(
}
public static void setTokensFor(
- ContainerLaunchContext amContainer, List paths, Configuration conf)
+ ContainerLaunchContext amContainer,
+ List paths,
+ Configuration conf,
+ boolean obtainingDelegationTokens)
throws IOException {
Credentials credentials = new Credentials();
- // for HDFS
- TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
- // for HBase
- obtainTokenForHBase(credentials, conf);
+
+ if (obtainingDelegationTokens) {
+ LOG.info("Obtaining delegation tokens for HDFS and HBase.");
+ // for HDFS
+ TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
+ // for HBase
+ obtainTokenForHBase(credentials, conf);
+ } else {
+ LOG.info("Delegation token retrieval for HDFS and HBase is disabled.");
+ }
+
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 2b071b4789782..7bc07c5974aa8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -50,6 +50,7 @@
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
@@ -62,7 +63,6 @@
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
-import org.apache.commons.collections.ListUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -105,6 +105,7 @@
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -529,6 +530,19 @@ private ClusterClientProvider deployInternal(
"Hadoop security with Kerberos is enabled but the login user "
+ "does not have Kerberos credentials or delegation tokens!");
}
+
+ final boolean fetchToken =
+ flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
+ final boolean yarnAccessFSEnabled =
+ !CollectionUtil.isNullOrEmpty(
+ flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS));
+ if (!fetchToken && yarnAccessFSEnabled) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "When %s is disabled, %s must be disabled as well.",
+ SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
+ YarnConfigOptions.YARN_ACCESS.key()));
+ }
}
isReadyForDeployment(clusterSpecification);
@@ -1081,13 +1095,17 @@ private ApplicationReport startAppMaster(
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container.");
- List yarnAccessList =
- ConfigUtils.decodeListFromConfig(
- configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
- Utils.setTokensFor(
- amContainer,
- ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
- yarnConfiguration);
+ final List pathsToObtainToken = new ArrayList<>();
+ boolean fetchToken =
+ configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
+ if (fetchToken) {
+ List yarnAccessList =
+ ConfigUtils.decodeListFromConfig(
+ configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
+ pathsToObtainToken.addAll(yarnAccessList);
+ pathsToObtainToken.addAll(fileUploader.getRemotePaths());
+ }
+ Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
}
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());