Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens.</td>
</tr>
<tr>
<td><h5>security.kerberos.login.contexts</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/security_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
<td>List&lt;String&gt;</td>
<td>List of factories that should be used to instantiate a security context. If multiple are configured, Flink will use the first compatible factory. You should have a NoOpSecurityContextFactory in this list as a fallback.</td>
</tr>
<tr>
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens.</td>
</tr>
<tr>
<td><h5>security.kerberos.krb5-conf.path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ public class SecurityOptions {
+ " (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for"
+ " Kafka authentication)");

@Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN =
key("security.kerberos.fetch.delegation-token")
.booleanType()
.defaultValue(true)
.withDescription(
"Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. "
+ "Only HDFS and HBase are supported. It is used in Yarn deployments. "
+ "If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. "
+ "If false, Flink will assume that the delegation tokens are managed outside of Flink. "
+ "As a consequence, it will not fetch delegation tokens for HDFS and HBase. "
+ "You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, "
+ "to handle delegation tokens.");

// ------------------------------------------------------------------------
// ZooKeeper Security Options
// ------------------------------------------------------------------------
Expand Down
20 changes: 15 additions & 5 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,23 @@ private static LocalResource registerLocalResource(
}

public static void setTokensFor(
ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)
ContainerLaunchContext amContainer,
List<Path> paths,
Configuration conf,
boolean obtainingDelegationTokens)
throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);

if (obtainingDelegationTokens) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have separate parameters for each of the services similar to how Spark is implementing this feature? That's just an idea I came up with after browsing the Spark sources where it's separated like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should submit another issue to separate it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not necessary for now considering that we don't have a use-case where we want to disable them individually. So, I'm fine with keeping it like that considering that it makes configuration easier.

LOG.info("Obtaining delegation tokens for HDFS and HBase.");
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
} else {
LOG.info("Delegation token retrieval for HDFS and HBase is disabled.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we discussed adding an else branch containing an info log message pointing out that delegation token retrieval for HDFS and HBase is disabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i missed.
So message like: LOG.info("Delegation token retrieval for HDFS and HBase is disabled.");


// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
Expand All @@ -62,7 +63,6 @@
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;

import org.apache.commons.collections.ListUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
Expand Down Expand Up @@ -105,6 +105,7 @@
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -529,6 +530,19 @@ private ClusterClientProvider<ApplicationId> deployInternal(
"Hadoop security with Kerberos is enabled but the login user "
+ "does not have Kerberos credentials or delegation tokens!");
}

final boolean fetchToken =
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
final boolean yarnAccessFSEnabled =
!CollectionUtil.isNullOrEmpty(
flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS));
if (!fetchToken && yarnAccessFSEnabled) {
throw new IllegalConfigurationException(
String.format(
"When %s is disabled, %s must be disabled as well.",
SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
YarnConfigOptions.YARN_ACCESS.key()));
}
}

isReadyForDeployment(clusterSpecification);
Expand Down Expand Up @@ -1081,13 +1095,17 @@ private ApplicationReport startAppMaster(
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If yarnFetchDelegationTokenEnabled == false, we should not print this log.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified it: The log message is still valid since Utils.setTokensFor also sets user-related tokens. We only disable HDFS and HBase when SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN is disabled.

List<Path> yarnAccessList =
ConfigUtils.decodeListFromConfig(
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
Utils.setTokensFor(
amContainer,
ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
yarnConfiguration);
final List<Path> pathsToObtainToken = new ArrayList<>();
boolean fetchToken =
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
if (fetchToken) {
List<Path> yarnAccessList =
ConfigUtils.decodeListFromConfig(
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
pathsToObtainToken.addAll(yarnAccessList);
pathsToObtainToken.addAll(fileUploader.getRemotePaths());
}
Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
}

amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
Expand Down