Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

case class KubernetesCredentials(
oauthTokenBase64: Option[String],
caCertDataBase64: Option[String],
clientKeyDataBase64: Option[String],
clientCertDataBase64: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,42 @@ package object config extends Logging {
private[spark] val KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE =
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.caCertFile")
.doc("Path on the driver pod's disk containing the CA cert file to use when authenticating" +
" against Kubernetes.")
" against Kubernetes. Typically this is configured by spark-submit from mounting a" +
" secret from the submitting machine into the pod, and hence this configuration is marked" +
" as internal, but this can also be set manually to use a certificate that is mounted" +
" into the driver pod via other means.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE =
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientKeyFile")
.doc("Path on the driver pod's disk containing the client key file to use when" +
" authenticating against Kubernetes.")
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
" mounting a secret from the submitting machine into the pod, and hence this" +
" configuration is marked as internal, but this can also be set manually to" +
" use a key file that is mounted into the driver pod via other means.")
.internal()
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE =
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientCertFile")
.doc("Path on the driver pod's disk containing the client cert file to use when" +
" authenticating against Kubernetes.")
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
" mounting a secret from the submitting machine into the pod, and hence this" +
" configuration is marked as internal, but this can also be set manually to" +
" use a certificate that is mounted into the driver pod via other means.")
.internal()
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN =
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.oauthTokenFile")
.doc("Path on the driver pod's disk containing the OAuth token file to use when" +
" authenticating against Kubernetes.")
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
" mounting a secret from the submitting machine into the pod, and hence this" +
" configuration is marked as internal, but this can also be set manually to" +
" use a token that is mounted into the driver pod via other means.")
.internal()
.stringConf
.createOptional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ package object constants {
private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"
private[spark] val SUBMISSION_SSL_KEY_PEM_SECRET_NAME = "spark-submission-server-key-pem"
private[spark] val SUBMISSION_SSL_CERT_PEM_SECRET_NAME = "spark-submission-server-cert-pem"
private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
"/mnt/secrets/spark-kubernetes-credentials"
private[spark] val DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME = "ca-cert"
private[spark] val DRIVER_CREDENTIALS_CA_CERT_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME = "client-key"
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME = "client-cert"
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME = "oauth-token"
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"


// Default and fixed ports
private[spark] val SUBMISSION_SERVER_PORT = 7077
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v1
package org.apache.spark.deploy.kubernetes.submit

import java.io.File

import com.google.common.base.Charsets
import com.google.common.io.{BaseEncoding, Files}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.KubernetesCredentials
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials
import org.apache.spark.internal.config.OptionalConfigEntry

private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) {
Expand All @@ -38,15 +39,17 @@ private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf
require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty,
"Cannot specify both a service account and a driver pod client cert file.")
}
val oauthToken = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN)
val oauthTokenBase64 = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).map { token =>
BaseEncoding.base64().encode(token.getBytes(Charsets.UTF_8))
}
val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE,
s"Driver CA cert file provided at %s does not exist or is not a file.")
val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE,
s"Driver client key file provided at %s does not exist or is not a file.")
val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE,
s"Driver client cert file provided at %s does not exist or is not a file.")
KubernetesCredentials(
oauthToken = oauthToken,
oauthTokenBase64 = oauthTokenBase64,
caCertDataBase64 = caCertDataBase64,
clientKeyDataBase64 = clientKeyDataBase64,
clientCertDataBase64 = clientCertDataBase64)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import org.apache.commons.codec.binary.Base64
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.CompressionUtils
import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils
import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesCredentials, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils}
import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource}
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ShutdownHookManager, Utils}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ private[spark] class Client(
sparkJars: Seq[String],
sparkFiles: Seq[String],
kubernetesClientProvider: SubmissionKubernetesClientProvider,
initContainerComponentsProvider: DriverInitContainerComponentsProvider) extends Logging {
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider)
extends Logging {

private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(kubernetesAppId)
Expand Down Expand Up @@ -133,18 +135,22 @@ private[spark] class Client(
.provideInitContainerBootstrap()
.bootstrapInitContainerAndVolumes(driverContainer.getName, basePod)

val driverOwnedResources = Seq(initContainerConfigMap) ++
maybeSubmittedDependenciesSecret.toSeq

val containerLocalizedFilesResolver = initContainerComponentsProvider
.provideContainerLocalizedFilesResolver()
val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars()
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles()

val executorInitContainerConfiguration = initContainerComponentsProvider
.provideExecutorInitContainerConfiguration()
val resolvedSparkConf = executorInitContainerConfiguration
val sparkConfWithExecutorInit = executorInitContainerConfiguration
.configureSparkConfForExecutorInitContainer(sparkConf)
val credentialsMounter = kubernetesCredentialsMounterProvider
.getDriverPodKubernetesCredentialsMounter()
val credentialsSecret = credentialsMounter.createCredentialsSecret()
val podWithInitContainerAndMountedCreds = credentialsMounter.mountDriverKubernetesCredentials(
podWithInitContainer, driverContainer.getName, credentialsSecret)
val resolvedSparkConf = credentialsMounter.setDriverPodKubernetesCredentialLocations(
sparkConfWithExecutorInit)
if (resolvedSparkJars.nonEmpty) {
resolvedSparkConf.set("spark.jars", resolvedSparkJars.mkString(","))
}
Expand All @@ -166,7 +172,7 @@ private[spark] class Client(
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map {
case (confKey, confValue) => s"-D$confKey=$confValue"
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
val resolvedDriverPod = podWithInitContainer.editSpec()
val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec()
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName))
.addNewEnv()
.withName(ENV_MOUNTED_CLASSPATH)
Expand All @@ -181,6 +187,9 @@ private[spark] class Client(
.build()
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val driverOwnedResources = Seq(initContainerConfigMap) ++
maybeSubmittedDependenciesSecret.toSeq ++
credentialsSecret.toSeq
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(createdDriverPod.getMetadata.getName)
.withApiVersion(createdDriverPod.getApiVersion)
Expand Down Expand Up @@ -261,6 +270,8 @@ private[spark] object Client {
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl(
sparkConf, kubernetesAppId, sparkJars, sparkFiles)
val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf)
val kubernetesCredentialsMounterProvider =
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)
new Client(
appName,
kubernetesAppId,
Expand All @@ -270,6 +281,7 @@ private[spark] object Client {
sparkJars,
sparkFiles,
kubernetesClientProvider,
initContainerComponentsProvider).run()
initContainerComponentsProvider,
kubernetesCredentialsMounterProvider).run()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2

import io.fabric8.kubernetes.api.model.{PodBuilder, Secret, SecretBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.KubernetesCredentials
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.config.OptionalConfigEntry

private[spark] trait DriverPodKubernetesCredentialsMounter {

/**
* Set fields on the Spark configuration that indicate where the driver pod is
* to find its Kubernetes credentials for requesting executors.
*/
def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf

/**
* Create the Kubernetes secret object that correspond to the driver's credentials
* that have to be created and mounted into the driver pod. The single Secret
* object contains all of the data entries for the driver pod's Kubernetes
* credentials. Returns empty if no secrets are to be mounted.
*/
def createCredentialsSecret(): Option[Secret]

/**
* Mount any Kubernetes credentials from the submitting machine's disk into the driver pod. The
* secret that is passed in here should have been created from createCredentialsSecret so that
* the implementation does not need to hold its state.
*/
def mountDriverKubernetesCredentials(
originalPodSpec: PodBuilder,
driverContainerName: String,
credentialsSecret: Option[Secret]): PodBuilder
}

private[spark] class DriverPodKubernetesCredentialsMounterImpl(
kubernetesAppId: String,
submitterLocalDriverPodKubernetesCredentials: KubernetesCredentials,
maybeUserSpecifiedMountedClientKeyFile: Option[String],
maybeUserSpecifiedMountedClientCertFile: Option[String],
maybeUserSpecifiedMountedOAuthTokenFile: Option[String],
maybeUserSpecifiedMountedCaCertFile: Option[String])
extends DriverPodKubernetesCredentialsMounter {

override def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf = {
val resolvedMountedClientKeyFile = resolveSecretLocation(
maybeUserSpecifiedMountedClientKeyFile,
submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64,
DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
val resolvedMountedClientCertFile = resolveSecretLocation(
maybeUserSpecifiedMountedClientCertFile,
submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64,
DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
val resolvedMountedCaCertFile = resolveSecretLocation(
maybeUserSpecifiedMountedCaCertFile,
submitterLocalDriverPodKubernetesCredentials.caCertDataBase64,
DRIVER_CREDENTIALS_CA_CERT_PATH)
val resolvedMountedOAuthTokenFile = resolveSecretLocation(
maybeUserSpecifiedMountedOAuthTokenFile,
submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64,
DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
val sparkConfWithCredentialLocations = sparkConf.clone()
.setOption(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE, resolvedMountedCaCertFile)
.setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE, resolvedMountedClientKeyFile)
.setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE, resolvedMountedClientCertFile)
.setOption(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, resolvedMountedOAuthTokenFile)
sparkConfWithCredentialLocations.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ =>
sparkConfWithCredentialLocations.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "<present_but_redacted>")
}
sparkConfWithCredentialLocations.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ =>
sparkConfWithCredentialLocations.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "<present_but_redacted>")
}
sparkConfWithCredentialLocations
}

override def createCredentialsSecret(): Option[Secret] = {
val allSecretData =
resolveSecretData(
maybeUserSpecifiedMountedClientKeyFile,
submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64,
DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
resolveSecretData(
maybeUserSpecifiedMountedClientCertFile,
submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64,
DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
resolveSecretData(
maybeUserSpecifiedMountedCaCertFile,
submitterLocalDriverPodKubernetesCredentials.caCertDataBase64,
DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
resolveSecretData(
maybeUserSpecifiedMountedOAuthTokenFile,
submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64,
DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
if (allSecretData.isEmpty) {
None
} else {
Some(new SecretBuilder()
.withNewMetadata().withName(s"$kubernetesAppId-kubernetes-credentials").endMetadata()
.withData(allSecretData.asJava)
.build())
}
}

override def mountDriverKubernetesCredentials(
originalPodSpec: PodBuilder,
driverContainerName: String,
credentialsSecret: Option[Secret]): PodBuilder = {
credentialsSecret.map { secret =>
originalPodSpec.editSpec()
.addNewVolume()
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
.endVolume()
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainerName))
.addNewVolumeMount()
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
.withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
.endVolumeMount()
.endContainer()
.endSpec()
}.getOrElse(originalPodSpec)
}

private def resolveSecretLocation(
mountedUserSpecified: Option[String],
valueMountedFromSubmitter: Option[String],
mountedCanonicalLocation: String): Option[String] = {
mountedUserSpecified.orElse(valueMountedFromSubmitter.map( _ => {
mountedCanonicalLocation
}))
}

private def resolveSecretData(
mountedUserSpecified: Option[String],
valueMountedFromSubmitter: Option[String],
secretName: String): Map[String, String] = {
mountedUserSpecified.map { _ => Map.empty[String, String]}
.getOrElse {
valueMountedFromSubmitter.map { valueBase64 =>
Map(secretName -> valueBase64)
}.getOrElse(Map.empty[String, String])
}
}

private implicit def augmentSparkConf(sparkConf: SparkConf): OptionSettableSparkConf = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala magic! I'm less familiar with it though -- this doesn't do anything outside this class does it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The private modifier ensures that the implicit scope only applies to this class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worthwhile to have a setOption API in the core SparkConf though.

new OptionSettableSparkConf(sparkConf)
}
}

private class OptionSettableSparkConf(sparkConf: SparkConf) {
def setOption[T](configEntry: OptionalConfigEntry[T], option: Option[T]): SparkConf = {
option.map( opt => {
sparkConf.set(configEntry, opt)
}).getOrElse(sparkConf)
}
}
Loading