Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 940bc9e

Browse files
committed
Re-implement driver pod kubernetes credentials mounting for V2.
1 parent 5dddbd3 commit 940bc9e

File tree

18 files changed

+490
-112
lines changed

18 files changed

+490
-112
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
case class KubernetesCredentials(
20+
oauthTokenBase64: Option[String],
21+
caCertDataBase64: Option[String],
22+
clientKeyDataBase64: Option[String],
23+
clientCertDataBase64: Option[String])

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,30 +120,42 @@ package object config extends Logging {
120120
private[spark] val KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE =
121121
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.caCertFile")
122122
.doc("Path on the driver pod's disk containing the CA cert file to use when authenticating" +
123-
" against Kubernetes.")
123+
" against Kubernetes. Typically this is configured by spark-submit from mounting a" +
124+
" secret from the submitting machine into the pod, and hence this configuration is marked" +
125+
" as internal, but this can also be set manually to use a certificate that is mounted" +
126+
" into the driver pod via other means.")
124127
.stringConf
125128
.createOptional
126129

127130
private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE =
128131
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientKeyFile")
129132
.doc("Path on the driver pod's disk containing the client key file to use when" +
130-
" authenticating against Kubernetes.")
133+
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
134+
" mounting a secret from the submitting machine into the pod, and hence this" +
135+
" configuration is marked as internal, but this can also be set manually to" +
136+
" use a certificate that is mounted into the driver pod via other means.")
131137
.internal()
132138
.stringConf
133139
.createOptional
134140

135141
private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE =
136142
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientCertFile")
137143
.doc("Path on the driver pod's disk containing the client cert file to use when" +
138-
" authenticating against Kubernetes.")
144+
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
145+
" mounting a secret from the submitting machine into the pod, and hence this" +
146+
" configuration is marked as internal, but this can also be set manually to" +
147+
" use a certificate that is mounted into the driver pod via other means.")
139148
.internal()
140149
.stringConf
141150
.createOptional
142151

143152
private[spark] val KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN =
144153
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.oauthTokenFile")
145154
.doc("Path on the driver pod's disk containing the OAuth token file to use when" +
146-
" authenticating against Kubernetes.")
155+
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
156+
" mounting a secret from the submitting machine into the pod, and hence this" +
157+
" configuration is marked as internal, but this can also be set manually to" +
158+
" use a certificate that is mounted into the driver pod via other means.")
147159
.internal()
148160
.stringConf
149161
.createOptional

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,22 @@ package object constants {
3838
private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"
3939
private[spark] val SUBMISSION_SSL_KEY_PEM_SECRET_NAME = "spark-submission-server-key-pem"
4040
private[spark] val SUBMISSION_SSL_CERT_PEM_SECRET_NAME = "spark-submission-server-cert-pem"
41+
private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
42+
"/mnt/secrets/spark-kubernetes-credentials"
43+
private[spark] val DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME = "ca-cert"
44+
private[spark] val DRIVER_CREDENTIALS_CA_CERT_PATH =
45+
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME"
46+
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME = "client-key"
47+
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_PATH =
48+
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME"
49+
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME = "client-cert"
50+
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_PATH =
51+
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME"
52+
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME = "oauth-token"
53+
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH =
54+
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
55+
private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"
56+
4157

4258
// Default and fixed ports
4359
private[spark] val SUBMISSION_SERVER_PORT = 7077
Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,16 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.deploy.kubernetes.submit.v1
17+
package org.apache.spark.deploy.kubernetes.submit
1818

1919
import java.io.File
2020

21+
import com.google.common.base.Charsets
2122
import com.google.common.io.{BaseEncoding, Files}
2223

2324
import org.apache.spark.SparkConf
25+
import org.apache.spark.deploy.kubernetes.KubernetesCredentials
2426
import org.apache.spark.deploy.kubernetes.config._
25-
import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials
2627
import org.apache.spark.internal.config.OptionalConfigEntry
2728

2829
private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) {
@@ -38,15 +39,17 @@ private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf
3839
require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty,
3940
"Cannot specify both a service account and a driver pod client cert file.")
4041
}
41-
val oauthToken = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN)
42+
val oauthTokenBase64 = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).map { token =>
43+
BaseEncoding.base64().encode(token.getBytes(Charsets.UTF_8))
44+
}
4245
val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE,
4346
s"Driver CA cert file provided at %s does not exist or is not a file.")
4447
val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE,
4548
s"Driver client key file provided at %s does not exist or is not a file.")
4649
val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE,
4750
s"Driver client cert file provided at %s does not exist or is not a file.")
4851
KubernetesCredentials(
49-
oauthToken = oauthToken,
52+
oauthTokenBase64 = oauthTokenBase64,
5053
caCertDataBase64 = caCertDataBase64,
5154
clientKeyDataBase64 = clientKeyDataBase64,
5255
clientCertDataBase64 = clientCertDataBase64)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ import org.apache.commons.codec.binary.Base64
3030
import scala.collection.JavaConverters._
3131

3232
import org.apache.spark.{SparkConf, SparkException}
33-
import org.apache.spark.deploy.kubernetes.CompressionUtils
33+
import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials}
3434
import org.apache.spark.deploy.kubernetes.config._
3535
import org.apache.spark.deploy.kubernetes.constants._
36-
import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils
37-
import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesCredentials, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource}
36+
import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils}
37+
import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource}
3838
import org.apache.spark.internal.Logging
3939
import org.apache.spark.util.{ShutdownHookManager, Utils}
4040

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ private[spark] class Client(
4949
sparkJars: Seq[String],
5050
sparkFiles: Seq[String],
5151
kubernetesClientProvider: SubmissionKubernetesClientProvider,
52-
initContainerComponentsProvider: DriverInitContainerComponents) extends Logging {
52+
initContainerComponentsProvider: DriverInitContainerComponents,
53+
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider)
54+
extends Logging {
5355

5456
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
5557
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
@@ -143,8 +145,15 @@ private[spark] class Client(
143145

144146
val executorInitContainerConfiguration = initContainerComponentsProvider
145147
.provideExecutorInitContainerConfiguration()
146-
val resolvedSparkConf = executorInitContainerConfiguration
148+
val sparkConfWithExecutorInit = executorInitContainerConfiguration
147149
.configureSparkConfForExecutorInitContainer(sparkConf)
150+
val credentialsMounter = kubernetesCredentialsMounterProvider
151+
.getDriverPodKubernetesCredentialsMounter()
152+
val credentialsSecret = credentialsMounter.createCredentialsSecret()
153+
val podWithInitContainerAndMountedCreds = credentialsMounter.mountDriverKubernetesCredentials(
154+
podWithInitContainer, driverContainer.getName, credentialsSecret)
155+
val resolvedSparkConf = credentialsMounter.setDriverPodKubernetesCredentialLocations(
156+
sparkConfWithExecutorInit)
148157
if (resolvedSparkJars.nonEmpty) {
149158
resolvedSparkConf.set("spark.jars", resolvedSparkJars.mkString(","))
150159
}
@@ -166,7 +175,7 @@ private[spark] class Client(
166175
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map {
167176
case (confKey, confValue) => s"-D$confKey=$confValue"
168177
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
169-
val resolvedDriverPod = podWithInitContainer.editSpec()
178+
val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec()
170179
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName))
171180
.addNewEnv()
172181
.withName(ENV_MOUNTED_CLASSPATH)
@@ -261,6 +270,8 @@ private[spark] object Client {
261270
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl(
262271
sparkConf, kubernetesAppId, sparkJars, sparkFiles)
263272
val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf)
273+
val kubernetesCredentialsMounterProvider =
274+
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)
264275
new Client(
265276
appName,
266277
kubernetesAppId,
@@ -271,6 +282,7 @@ private[spark] object Client {
271282
sparkJars,
272283
sparkFiles,
273284
kubernetesClientProvider,
274-
initContainerComponentsProvider).run()
285+
initContainerComponentsProvider,
286+
kubernetesCredentialsMounterProvider).run()
275287
}
276288
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.v2
18+
19+
import io.fabric8.kubernetes.api.model.{PodBuilder, Secret, SecretBuilder}
20+
import scala.collection.JavaConverters._
21+
22+
import org.apache.spark.SparkConf
23+
import org.apache.spark.deploy.kubernetes.KubernetesCredentials
24+
import org.apache.spark.deploy.kubernetes.config._
25+
import org.apache.spark.deploy.kubernetes.constants._
26+
import org.apache.spark.internal.config.OptionalConfigEntry
27+
28+
private[spark] trait DriverPodKubernetesCredentialsMounter {
29+
30+
/**
31+
* Set fields on the Spark configuration that indicate where the driver pod is
32+
* to find its Kubernetes credentials for requesting executors.
33+
*/
34+
def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf
35+
36+
/**
37+
* Create the Kubernetes secret object that correspond to the driver's credentials
38+
* that have to be created and mounted into the driver pod. The single Secret
39+
* object contains all of the data entries for the driver pod's Kubernetes
40+
* credentials. Returns empty if no secrets are to be mounted.
41+
*/
42+
def createCredentialsSecret(): Option[Secret]
43+
44+
/**
45+
* Mount any Kubernetes credentials from the submitting machine's disk into the
46+
* driver pod.
47+
*/
48+
def mountDriverKubernetesCredentials(
49+
originalPodSpec: PodBuilder,
50+
driverContainerName: String,
51+
credentialsSecret: Option[Secret]): PodBuilder
52+
}
53+
54+
private[spark] class DriverPodKubernetesCredentialsMounterImpl(
55+
kubernetesAppId: String,
56+
submitterLocalDriverPodKubernetesCredentials: KubernetesCredentials,
57+
maybeUserSpecifiedMountedClientKeyFile: Option[String],
58+
maybeUserSpecifiedMountedClientCertFile: Option[String],
59+
maybeUserSpecifiedMountedOAuthTokenFile: Option[String],
60+
maybeUserSpecifiedMountedCaCertFile: Option[String])
61+
extends DriverPodKubernetesCredentialsMounter {
62+
63+
override def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf = {
64+
val resolvedMountedClientKeyFile = resolveSecretLocation(
65+
maybeUserSpecifiedMountedClientKeyFile,
66+
submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64,
67+
DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
68+
val resolvedMountedClientCertFile = resolveSecretLocation(
69+
maybeUserSpecifiedMountedClientCertFile,
70+
submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64,
71+
DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
72+
val resolvedMountedCaCertFile = resolveSecretLocation(
73+
maybeUserSpecifiedMountedCaCertFile,
74+
submitterLocalDriverPodKubernetesCredentials.caCertDataBase64,
75+
DRIVER_CREDENTIALS_CA_CERT_PATH)
76+
val resolvedMountedOAuthTokenFile = resolveSecretLocation(
77+
maybeUserSpecifiedMountedOAuthTokenFile,
78+
submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64,
79+
DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
80+
val sparkConfWithCredentialLocations = sparkConf.clone()
81+
.setOption(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE, resolvedMountedCaCertFile)
82+
.setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE, resolvedMountedClientKeyFile)
83+
.setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE, resolvedMountedClientCertFile)
84+
.setOption(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, resolvedMountedOAuthTokenFile)
85+
sparkConfWithCredentialLocations.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ =>
86+
sparkConfWithCredentialLocations.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "<present_but_redacted>")
87+
}
88+
sparkConfWithCredentialLocations.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ =>
89+
sparkConfWithCredentialLocations.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "<present_but_redacted>")
90+
}
91+
sparkConfWithCredentialLocations
92+
}
93+
94+
override def createCredentialsSecret(): Option[Secret] = {
95+
val allSecretData = resolveSecretData(
96+
maybeUserSpecifiedMountedClientKeyFile,
97+
submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64,
98+
DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
99+
resolveSecretData(
100+
maybeUserSpecifiedMountedClientCertFile,
101+
submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64,
102+
DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
103+
resolveSecretData(
104+
maybeUserSpecifiedMountedCaCertFile,
105+
submitterLocalDriverPodKubernetesCredentials.caCertDataBase64,
106+
DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
107+
resolveSecretData(
108+
maybeUserSpecifiedMountedOAuthTokenFile,
109+
submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64,
110+
DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
111+
if (allSecretData.isEmpty) {
112+
None
113+
} else {
114+
Some(new SecretBuilder()
115+
.withNewMetadata().withName(s"$kubernetesAppId-kubernetes-credentials").endMetadata()
116+
.withData(allSecretData.asJava)
117+
.build())
118+
}
119+
}
120+
121+
override def mountDriverKubernetesCredentials(
122+
originalPodSpec: PodBuilder,
123+
driverContainerName: String,
124+
credentialsSecret: Option[Secret]): PodBuilder = {
125+
credentialsSecret.map { secret =>
126+
originalPodSpec.editSpec()
127+
.addNewVolume()
128+
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
129+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
130+
.endVolume()
131+
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainerName))
132+
.addNewVolumeMount()
133+
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
134+
.withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
135+
.endVolumeMount()
136+
.endContainer()
137+
.endSpec()
138+
}.getOrElse(originalPodSpec)
139+
}
140+
141+
private def resolveSecretLocation(
142+
mountedUserSpecified: Option[String],
143+
valueMountedFromSubmitter: Option[String],
144+
mountedCanonicalLocation: String): Option[String] = {
145+
mountedUserSpecified.orElse(valueMountedFromSubmitter.map( _ => {
146+
mountedCanonicalLocation
147+
}))
148+
}
149+
150+
private def resolveSecretData(
151+
mountedUserSpecified: Option[String],
152+
valueMountedFromSubmitter: Option[String],
153+
secretName: String): Map[String, String] = {
154+
mountedUserSpecified.map { _ => Map.empty[String, String]}
155+
.getOrElse {
156+
valueMountedFromSubmitter.map { valueBase64 =>
157+
Map(secretName -> valueBase64)
158+
}.getOrElse(Map.empty[String, String])
159+
}
160+
}
161+
162+
private implicit def augmentSparkConf(sparkConf: SparkConf): OptionSettableSparkConf = {
163+
new OptionSettableSparkConf(sparkConf)
164+
}
165+
}
166+
167+
private class OptionSettableSparkConf(sparkConf: SparkConf) {
168+
def setOption[T](configEntry: OptionalConfigEntry[T], option: Option[T]): SparkConf = {
169+
option.map( opt => {
170+
sparkConf.set(configEntry, opt)
171+
}).getOrElse(sparkConf)
172+
}
173+
}

0 commit comments

Comments
 (0)