Skip to content

Commit 6b489c2

Browse files
foxishmccheah
authored andcommitted
Enable testing against GCE clusters (#243)
* Part 1: making test code cluster-agnostic * Final checked * Move all test code into KubernetesTestComponents * Addressed comments * Fixed doc * Restructure the test backends (#248) * Restructured the test backends * Address comments * var -> val * Comments * removed deadcode
1 parent 04afcf8 commit 6b489c2

File tree

12 files changed

+299
-92
lines changed

12 files changed

+299
-92
lines changed

resource-managers/kubernetes/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ build/mvn integration-test \
6161
-pl resource-managers/kubernetes/integration-tests -am
6262
```
6363

64+
# Running against an arbitrary cluster
65+
66+
In order to run against any cluster, use the following:
67+
build/mvn integration-test \
68+
-Pkubernetes -Pkubernetes-integration-tests \
69+
-pl resource-managers/kubernetes/integration-tests -am
70+
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https://<master> -Dspark.docker.test.driverImage=<driver-image> -Dspark.docker.test.executorImage=<executor-image>"
71+
6472
# Preserve the Minikube VM
6573

6674
The integration tests make use of [Minikube](https://github.com/kubernetes/minikube), which fires up a virtual machine

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
2424
import scala.collection.JavaConverters._
2525

2626
import org.apache.spark.deploy.kubernetes.constants._
27-
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
27+
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
2828
import org.apache.spark.internal.Logging
2929

3030
/**

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,23 @@ import org.scalatest.concurrent.PatienceConfiguration
2525
import org.scalatest.time.{Minutes, Seconds, Span}
2626

2727
import org.apache.spark.SparkFunSuite
28-
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder
29-
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
28+
import org.apache.spark.deploy.kubernetes.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}
3029

3130
private[spark] class KubernetesSuite extends SparkFunSuite {
31+
private val testBackend: IntegrationTestBackend = IntegrationTestBackendFactory.getTestBackend()
3232

3333
override def beforeAll(): Unit = {
34-
Minikube.startMinikube()
35-
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
34+
testBackend.initialize()
3635
}
3736

3837
override def afterAll(): Unit = {
39-
if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
40-
Minikube.deleteMinikube()
41-
}
38+
testBackend.cleanUp()
4239
}
4340

4441
override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = {
4542
Vector(
46-
new KubernetesV1Suite,
47-
new KubernetesV2Suite)
43+
new KubernetesV1Suite(testBackend),
44+
new KubernetesV2Suite(testBackend))
4845
}
4946
}
5047

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,37 @@
1717
package org.apache.spark.deploy.kubernetes.integrationtest
1818

1919
import java.util.UUID
20+
import javax.net.ssl.X509TrustManager
2021

21-
import org.scalatest.concurrent.Eventually
2222
import scala.collection.JavaConverters._
23+
import scala.reflect.ClassTag
24+
25+
import io.fabric8.kubernetes.client.DefaultKubernetesClient
26+
import io.fabric8.kubernetes.client.internal.SSLUtils
27+
import org.scalatest.concurrent.Eventually
2328

2429
import org.apache.spark.SparkConf
2530
import org.apache.spark.deploy.kubernetes.config._
26-
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
31+
import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil
2732

28-
private[spark] class KubernetesTestComponents {
33+
private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {
2934

3035
val namespace = UUID.randomUUID().toString.replaceAll("-", "")
31-
val kubernetesClient = Minikube.getKubernetesClient.inNamespace(namespace)
36+
val kubernetesClient = defaultClient.inNamespace(namespace)
3237
val clientConfig = kubernetesClient.getConfiguration
3338

3439
def createNamespace(): Unit = {
35-
Minikube.getKubernetesClient.namespaces.createNew()
40+
defaultClient.namespaces.createNew()
3641
.withNewMetadata()
3742
.withName(namespace)
3843
.endMetadata()
3944
.done()
4045
}
4146

4247
def deleteNamespace(): Unit = {
43-
Minikube.getKubernetesClient.namespaces.withName(namespace).delete()
48+
defaultClient.namespaces.withName(namespace).delete()
4449
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
45-
val namespaceList = Minikube.getKubernetesClient
50+
val namespaceList = defaultClient
4651
.namespaces()
4752
.list()
4853
.getItems()
@@ -53,13 +58,12 @@ private[spark] class KubernetesTestComponents {
5358

5459
def newSparkConf(): SparkConf = {
5560
new SparkConf(true)
56-
.setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443")
57-
.set(KUBERNETES_SUBMIT_CA_CERT_FILE, clientConfig.getCaCertFile)
58-
.set(KUBERNETES_SUBMIT_CLIENT_KEY_FILE, clientConfig.getClientKeyFile)
59-
.set(KUBERNETES_SUBMIT_CLIENT_CERT_FILE, clientConfig.getClientCertFile)
61+
.setMaster(s"k8s://${kubernetesClient.getMasterUrl}")
6062
.set(KUBERNETES_NAMESPACE, namespace)
61-
.set(DRIVER_DOCKER_IMAGE, "spark-driver:latest")
62-
.set(EXECUTOR_DOCKER_IMAGE, "spark-executor:latest")
63+
.set(DRIVER_DOCKER_IMAGE,
64+
System.getProperty("spark.docker.test.driverImage", "spark-driver:latest"))
65+
.set(EXECUTOR_DOCKER_IMAGE,
66+
System.getProperty("spark.docker.test.executorImage", "spark-executor:latest"))
6367
.setJars(Seq(KubernetesSuite.HELPER_JAR_FILE.getAbsolutePath))
6468
.set("spark.executor.memory", "500m")
6569
.set("spark.executor.cores", "1")
@@ -69,4 +73,26 @@ private[spark] class KubernetesTestComponents {
6973
.set("spark.testing", "false")
7074
.set(WAIT_FOR_APP_COMPLETION, false)
7175
}
72-
}
76+
77+
def getService[T: ClassTag](
78+
serviceName: String,
79+
namespace: String,
80+
servicePortName: String,
81+
servicePath: String = ""): T = synchronized {
82+
val kubernetesMaster = s"${defaultClient.getMasterUrl}"
83+
84+
val url = s"${
85+
Array[String](
86+
s"${kubernetesClient.getMasterUrl}",
87+
"api", "v1", "proxy",
88+
"namespaces", namespace,
89+
"services", serviceName).mkString("/")
90+
}" +
91+
s":$servicePortName$servicePath"
92+
val userHome = System.getProperty("user.home")
93+
val kubernetesConf = kubernetesClient.getConfiguration
94+
val sslContext = SSLUtils.sslContext(kubernetesConf)
95+
val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager]
96+
HttpClientUtil.createClient[T](Set(url), 5, sslContext.getSocketFactory, trustManager)
97+
}
98+
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,37 @@ package org.apache.spark.deploy.kubernetes.integrationtest
1818

1919
import java.util.concurrent.TimeUnit
2020

21+
import scala.collection.JavaConverters._
22+
2123
import com.google.common.collect.ImmutableList
2224
import com.google.common.util.concurrent.SettableFuture
2325
import io.fabric8.kubernetes.api.model.Pod
2426
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
2527
import io.fabric8.kubernetes.client.Watcher.Action
2628
import org.scalatest.{BeforeAndAfter, DoNotDiscover}
2729
import org.scalatest.concurrent.Eventually
28-
import scala.collection.JavaConverters._
2930

3031
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
3132
import org.apache.spark.deploy.kubernetes.SSLUtils
3233
import org.apache.spark.deploy.kubernetes.config._
3334
import org.apache.spark.deploy.kubernetes.constants._
34-
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
35+
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
36+
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
37+
import org.apache.spark.deploy.kubernetes.integrationtest.constants.{GCE_TEST_BACKEND, MINIKUBE_TEST_BACKEND}
3538
import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1
3639
import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager}
3740
import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus}
3841
import org.apache.spark.util.Utils
3942

4043
@DoNotDiscover
41-
private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter {
44+
private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend)
45+
extends SparkFunSuite with BeforeAndAfter {
4246

4347
private var kubernetesTestComponents: KubernetesTestComponents = _
4448
private var sparkConf: SparkConf = _
4549

4650
override def beforeAll(): Unit = {
47-
kubernetesTestComponents = new KubernetesTestComponents()
51+
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
4852
kubernetesTestComponents.createNamespace()
4953
}
5054

@@ -85,7 +89,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
8589
.get(0)
8690
.getMetadata
8791
.getName
88-
Minikube.getService[SparkRestApiV1](serviceName,
92+
kubernetesTestComponents.getService[SparkRestApiV1](serviceName,
8993
kubernetesTestComponents.namespace, "spark-ui-port")
9094
}
9195

@@ -168,6 +172,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
168172
}
169173

170174
test("Enable SSL on the driver submit server") {
175+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
176+
171177
val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair(
172178
Minikube.getMinikubeIp,
173179
"changeit",
@@ -188,6 +194,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
188194
}
189195

190196
test("Enable SSL on the driver submit server using PEM files") {
197+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
198+
191199
val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp)
192200
sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}")
193201
sparkConf.set(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${certPem.getAbsolutePath}")
@@ -201,6 +209,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
201209
}
202210

203211
test("Added files should exist on the driver.") {
212+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
213+
204214
sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath)
205215
sparkConf.setAppName("spark-file-existence-test")
206216
val podCompletedFuture = SettableFuture.create[Boolean]
@@ -257,6 +267,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
257267
}
258268

259269
test("Use external URI provider") {
270+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
271+
260272
val externalUriProviderWatch =
261273
new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient)
262274
Utils.tryWithResource(kubernetesTestComponents.kubernetesClient.services()
@@ -288,6 +300,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
288300
}
289301

290302
test("Mount the Kubernetes credentials onto the driver pod") {
303+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
304+
291305
sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE,
292306
kubernetesTestComponents.clientConfig.getCaCertFile)
293307
sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE,

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,25 @@ import java.util.UUID
2121
import org.scalatest.{BeforeAndAfter, DoNotDiscover}
2222
import org.scalatest.concurrent.Eventually
2323

24-
import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions}
24+
import org.apache.spark._
2525
import org.apache.spark.deploy.kubernetes.SSLUtils
2626
import org.apache.spark.deploy.kubernetes.config._
27-
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
27+
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
28+
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
29+
import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND
2830
import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl}
2931

3032
@DoNotDiscover
31-
private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter {
33+
private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend)
34+
extends SparkFunSuite with BeforeAndAfter {
3235

3336
private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "")
3437
private var kubernetesTestComponents: KubernetesTestComponents = _
3538
private var sparkConf: SparkConf = _
3639
private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _
3740

3841
override def beforeAll(): Unit = {
39-
kubernetesTestComponents = new KubernetesTestComponents
42+
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
4043
resourceStagingServerLauncher = new ResourceStagingServerLauncher(
4144
kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
4245
}
@@ -54,11 +57,15 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter
5457
}
5558

5659
test("Use submission v2.") {
60+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
61+
5762
launchStagingServer(SSLOptions())
5863
runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE)
5964
}
6065

6166
test("Enable SSL on the submission server") {
67+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
68+
6269
val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair(
6370
ipAddress = Minikube.getMinikubeIp,
6471
keyStorePassword = "keyStore",
@@ -81,13 +88,17 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter
8188
}
8289

8390
test("Use container-local resources without the resource staging server") {
91+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
92+
8493
sparkConf.setJars(Seq(
8594
KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE,
8695
KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH))
8796
runSparkAppAndVerifyCompletion(KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE)
8897
}
8998

9099
private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = {
100+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
101+
91102
val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer(
92103
resourceStagingServerSslOptions)
93104
val resourceStagingServerUriScheme = if (resourceStagingServerSslOptions.enabled) {
@@ -96,7 +107,8 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter
96107
"http"
97108
}
98109
sparkConf.set(RESOURCE_STAGING_SERVER_URI,
99-
s"$resourceStagingServerUriScheme://${Minikube.getMinikubeIp}:$resourceStagingServerPort")
110+
s"$resourceStagingServerUriScheme://" +
111+
s"${Minikube.getMinikubeIp}:$resourceStagingServerPort")
100112
}
101113

102114
private def runSparkAppAndVerifyCompletion(appResource: String): Unit = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.integrationtest
18+
19+
import java.io.{BufferedReader, InputStreamReader}
20+
import java.util.concurrent.TimeUnit
21+
22+
import scala.collection.mutable.ArrayBuffer
23+
24+
import org.apache.spark.internal.Logging
25+
import org.apache.spark.util.Utils
26+
27+
object ProcessUtils extends Logging {
28+
/**
29+
* executeProcess is used to run a command and return the output if it
30+
* completes within timeout seconds.
31+
*/
32+
def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = {
33+
val pb = new ProcessBuilder().command(fullCommand: _*)
34+
pb.redirectErrorStream(true)
35+
val proc = pb.start()
36+
val outputLines = new ArrayBuffer[String]
37+
38+
Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput =>
39+
Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) =>
40+
var line: String = null
41+
do {
42+
line = bufferedOutput.readLine()
43+
if (line != null) {
44+
logInfo(line)
45+
outputLines += line
46+
}
47+
} while (line != null)
48+
}
49+
}
50+
assert(proc.waitFor(timeout, TimeUnit.SECONDS),
51+
s"Timed out while executing ${fullCommand.mkString(" ")}")
52+
assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}")
53+
outputLines.toSeq
54+
}
55+
}

0 commit comments

Comments
 (0)