-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-21700][security] Add an option to disable credential retrieval on a secure cluster #15131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -197,13 +197,23 @@ private static LocalResource registerLocalResource( | |
| } | ||
|
|
||
| public static void setTokensFor( | ||
| ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) | ||
| ContainerLaunchContext amContainer, | ||
| List<Path> paths, | ||
| Configuration conf, | ||
| boolean obtainingDelegationTokens) | ||
| throws IOException { | ||
| Credentials credentials = new Credentials(); | ||
| // for HDFS | ||
| TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf); | ||
| // for HBase | ||
| obtainTokenForHBase(credentials, conf); | ||
|
|
||
| if (obtainingDelegationTokens) { | ||
| LOG.info("Obtaining delegation tokens for HDFS and HBase."); | ||
| // for HDFS | ||
| TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf); | ||
| // for HBase | ||
| obtainTokenForHBase(credentials, conf); | ||
| } else { | ||
| LOG.info("Delegation token retrieval for HDFS and HBase is disabled."); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have we discussed adding an
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, i missed. |
||
|
|
||
| // for user | ||
| UserGroupInformation currUsr = UserGroupInformation.getCurrentUser(); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,6 +50,7 @@ | |
| import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; | ||
| import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils; | ||
| import org.apache.flink.runtime.util.HadoopUtils; | ||
| import org.apache.flink.util.CollectionUtil; | ||
| import org.apache.flink.util.FlinkException; | ||
| import org.apache.flink.util.Preconditions; | ||
| import org.apache.flink.util.ShutdownHookUtil; | ||
|
|
@@ -62,7 +63,6 @@ | |
| import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint; | ||
| import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint; | ||
|
|
||
| import org.apache.commons.collections.ListUtils; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.hdfs.DFSConfigKeys; | ||
|
|
@@ -105,6 +105,7 @@ | |
| import java.net.URI; | ||
| import java.net.URLDecoder; | ||
| import java.nio.charset.Charset; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
|
|
@@ -529,6 +530,19 @@ private ClusterClientProvider<ApplicationId> deployInternal( | |
| "Hadoop security with Kerberos is enabled but the login user " | ||
| + "does not have Kerberos credentials or delegation tokens!"); | ||
| } | ||
|
|
||
| final boolean fetchToken = | ||
| flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN); | ||
| final boolean yarnAccessFSEnabled = | ||
| !CollectionUtil.isNullOrEmpty( | ||
| flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS)); | ||
| if (!fetchToken && yarnAccessFSEnabled) { | ||
| throw new IllegalConfigurationException( | ||
| String.format( | ||
| "When %s is disabled, %s must be disabled as well.", | ||
| SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(), | ||
| YarnConfigOptions.YARN_ACCESS.key())); | ||
| } | ||
| } | ||
|
|
||
| isReadyForDeployment(clusterSpecification); | ||
|
|
@@ -1081,13 +1095,17 @@ private ApplicationReport startAppMaster( | |
| if (UserGroupInformation.isSecurityEnabled()) { | ||
| // set HDFS delegation tokens when security is enabled | ||
| LOG.info("Adding delegation token to the AM container."); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I verified it: The log message is still valid since |
||
| List<Path> yarnAccessList = | ||
| ConfigUtils.decodeListFromConfig( | ||
| configuration, YarnConfigOptions.YARN_ACCESS, Path::new); | ||
| Utils.setTokensFor( | ||
| amContainer, | ||
| ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()), | ||
| yarnConfiguration); | ||
| final List<Path> pathsToObtainToken = new ArrayList<>(); | ||
| boolean fetchToken = | ||
| configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN); | ||
| if (fetchToken) { | ||
| List<Path> yarnAccessList = | ||
| ConfigUtils.decodeListFromConfig( | ||
| configuration, YarnConfigOptions.YARN_ACCESS, Path::new); | ||
| pathsToObtainToken.addAll(yarnAccessList); | ||
| pathsToObtainToken.addAll(fileUploader.getRemotePaths()); | ||
| } | ||
| Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken); | ||
| } | ||
|
|
||
| amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to have separate parameters for each of the services similar to how Spark is implementing this feature? That's just an idea I came up with after browsing the Spark sources where it's separated like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it should submit another issue to separate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's not necessary for now considering that we don't have a use-case where we want to disable them individually. So, I'm fine with keeping it like that considering that it makes configuration easier.