Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,50 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v1
package org.apache.spark.deploy.kubernetes.submit

import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import java.util.concurrent.{CountDownLatch, TimeUnit}

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exceeds 100 chars and would fail scala style check.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scalastyle doesn't check imports for line length.

import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.JavaConverters._

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils

private[kubernetes] trait LoggingPodStatusWatcher extends Watcher[Pod] {
def awaitCompletion(): Unit
}

/**
* A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
* every state change and also at an interval for liveness.
*
* @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes
* @param appId
* @param interval ms between each state request. If set to 0 or a negative number, the periodic
* logging will be disabled.
* @param maybeLoggingInterval ms between each state request. If provided, must be a positive
* number.
*/
private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch,
appId: String,
interval: Long)
extends Watcher[Pod] with Logging {
private[kubernetes] class LoggingPodStatusWatcherImpl(
appId: String, maybeLoggingInterval: Option[Long])
extends LoggingPodStatusWatcher with Logging {

private val podCompletedFuture = new CountDownLatch(1)
// start timer for periodic logging
private val scheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
private val logRunnable: Runnable = new Runnable {
override def run() = logShortStatus()
}

private var pod: Option[Pod] = Option.empty
private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown")
private def status: String = pod.map(_.getStatus().getContainerStatuses().toString())
.getOrElse("unknown")
private var pod = Option.empty[Pod]

private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")

def start(): Unit = {
if (interval > 0) {
maybeLoggingInterval.foreach { interval =>
require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.")
scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
}
}
Expand Down Expand Up @@ -98,7 +102,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL
}

private def formatPodState(pod: Pod): String = {

// TODO include specific container state
val details = Seq[(String, String)](
// pod metadata
("pod name", pod.getMetadata.getName()),
Expand All @@ -116,17 +120,59 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL
("start time", pod.getStatus.getStartTime),
("container images",
pod.getStatus.getContainerStatuses()
.asScala
.map(_.getImage)
.mkString(", ")),
.asScala
.map(_.getImage)
.mkString(", ")),
("phase", pod.getStatus.getPhase()),
("status", pod.getStatus.getContainerStatuses().toString)
)
formatPairsBundle(details)
}

private def formatPairsBundle(pairs: Seq[(String, String)]) = {
// Use more loggable format if value is null or empty
details.map { case (k, v) =>
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
s"\n\t $k: $newValue"
pairs.map {
case (k, v) => s"\n\t $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
}.mkString("")
}

override def awaitCompletion(): Unit = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for encapsulating the latch here

podCompletedFuture.countDown()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be podCompletedFuture.await? My client exits right away without waiting for the driver pod to complete. I wonder if it's because of this line?

2017-06-02 14:48:36 INFO Client:54 - Waiting for application spark-hdfstest-1496440110123 to finish...
2017-06-02 14:48:36 INFO LoggingPodStatusWatcherImpl:54 - Container final statuses:
Container name: spark-kubernetes-driver
Container image: docker:5000/spark-driver:kimoon-0602-1
Container state: Waiting
Pending reason: PodInitializing
2017-06-02 14:48:36 INFO Client:54 - Application spark-hdfstest-1496440110123 finished.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, missed this. Can you send a fix?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind I see the fix now.

logInfo(pod.map { p =>
s"Container final statuses:\n\n${containersDescription(p)}"
}.getOrElse("No containers were found in the driver pod."))
}

private def containersDescription(p: Pod): String = {
p.getStatus.getContainerStatuses.asScala.map { status =>
Seq(
("Container name", status.getName),
("Container image", status.getImage)) ++
containerStatusDescription(status)
}.map(formatPairsBundle).mkString("\n\n")
}

private def containerStatusDescription(
containerStatus: ContainerStatus): Seq[(String, String)] = {
val state = containerStatus.getState
Option(state.getRunning)
.orElse(Option(state.getTerminated))
.orElse(Option(state.getWaiting))
.map {
case running: ContainerStateRunning =>
Seq(
("Container state", "Running"),
("Container started at", running.getStartedAt))
case waiting: ContainerStateWaiting =>
Seq(
("Container state", "Waiting"),
("Pending reason", waiting.getReason))
case terminated: ContainerStateTerminated =>
Seq(
("Container state", "Terminated"),
("Exit code", terminated.getExitCode.toString))
case unknown =>
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where would this exception get caught? I don't think we want to throw an exception that possibly blocks application flow just for logging purposes -- maybe instead log Container state "Unknown" so as to not break code flow

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be happening in the logging thread, I don't think this will crash the application.

}.getOrElse(Seq(("Container state", "N/A")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils}
import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils, LoggingPodStatusWatcherImpl}
import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource}
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ShutdownHookManager, Utils}
Expand Down Expand Up @@ -83,7 +83,9 @@ private[spark] class Client(
MEMORY_OVERHEAD_MIN))
private val driverContainerMemoryWithOverhead = driverContainerMemoryMb + memoryOverheadMb

private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)
private val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
private val loggingInterval = Some(sparkConf.get(REPORT_INTERVAL))
.filter( _ => waitForAppCompletion)

private val secretBase64String = {
val secretBytes = new Array[Byte](128)
Expand Down Expand Up @@ -147,10 +149,8 @@ private[spark] class Client(
driverServiceManager.start(kubernetesClient, kubernetesAppId, sparkConf)
// start outer watch for status logging of driver pod
// only enable interval logging if in waitForAppCompletion mode
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
val driverPodCompletedLatch = new CountDownLatch(1)
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
loggingInterval)
val loggingWatch = new LoggingPodStatusWatcherImpl(
kubernetesAppId, loggingInterval)
Utils.tryWithResource(kubernetesClient
.pods()
.withName(kubernetesDriverPodName)
Expand Down Expand Up @@ -230,7 +230,7 @@ private[spark] class Client(
// wait if configured to do so
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
driverPodCompletedLatch.await()
loggingWatch.awaitCompletion()
logInfo(s"Application $kubernetesAppId finished.")
} else {
logInfo(s"Application $kubernetesAppId successfully launched.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.{LoggingPodStatusWatcher, LoggingPodStatusWatcherImpl}
import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
Expand All @@ -48,9 +49,11 @@ private[spark] class Client(
appArgs: Array[String],
sparkJars: Seq[String],
sparkFiles: Seq[String],
waitForAppCompletion: Boolean,
kubernetesClientProvider: SubmissionKubernetesClientProvider,
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider)
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider,
loggingPodStatusWatcher: LoggingPodStatusWatcher)
extends Logging {

private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
Expand Down Expand Up @@ -186,27 +189,40 @@ private[spark] class Client(
.endContainer()
.endSpec()
.build()
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val driverOwnedResources = Seq(initContainerConfigMap) ++
maybeSubmittedDependenciesSecret.toSeq ++
credentialsSecret.toSeq
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(createdDriverPod.getMetadata.getName)
.withApiVersion(createdDriverPod.getApiVersion)
.withUid(createdDriverPod.getMetadata.getUid)
.withKind(createdDriverPod.getKind)
.withController(true)
.build()
driverOwnedResources.foreach { resource =>
val originalMetadata = resource.getMetadata
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
Utils.tryWithResource(
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.watch(loggingPodStatusWatcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val driverOwnedResources = Seq(initContainerConfigMap) ++
maybeSubmittedDependenciesSecret.toSeq ++
credentialsSecret.toSeq
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(createdDriverPod.getMetadata.getName)
.withApiVersion(createdDriverPod.getApiVersion)
.withUid(createdDriverPod.getMetadata.getUid)
.withKind(createdDriverPod.getKind)
.withController(true)
.build()
driverOwnedResources.foreach { resource =>
val originalMetadata = resource.getMetadata
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
}
kubernetesClient.resourceList(driverOwnedResources: _*).createOrReplace()
} catch {
case e: Throwable =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
loggingPodStatusWatcher.awaitCompletion()
logInfo(s"Application $kubernetesAppId finished.")
} else {
logInfo(s"Deployed Spark application $kubernetesAppId into Kubernetes.")
}
kubernetesClient.resourceList(driverOwnedResources: _*).createOrReplace()
} catch {
case e: Throwable =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}
}
}
Expand Down Expand Up @@ -274,6 +290,9 @@ private[spark] object Client {
val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf)
val kubernetesCredentialsMounterProvider =
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter( _ => waitForAppCompletion)
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
new Client(
appName,
kubernetesAppId,
Expand All @@ -282,8 +301,10 @@ private[spark] object Client {
appArgs,
sparkJars,
sparkFiles,
waitForAppCompletion,
kubernetesClientProvider,
initContainerComponentsProvider,
kubernetesCredentialsMounterProvider).run()
kubernetesCredentialsMounterProvider,
loggingPodStatusWatcher).run()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes.submit.v2
import java.io.File

import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
import org.hamcrest.{BaseMatcher, Description}
import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations}
Expand All @@ -35,6 +35,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.SparkPodInitContainerBootstrap
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.LoggingPodStatusWatcher

class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
private val JARS_RESOURCE = SubmittedResourceIdAndSecret("jarsId", "jarsSecret")
Expand All @@ -59,13 +60,13 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
private val SPARK_JARS = Seq(
"hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
private val RESOLVED_SPARK_JARS = Seq(
"hdfs://localhost:9000/app/jars/jar1.jar", "file:///var/data/spark-jars/jar2.jar")
"hdfs://localhost:9000/app/jars/jar1.jar", "file:///var/data/spark-jars/jar2.jar")
private val RESOLVED_SPARK_REMOTE_AND_LOCAL_JARS = Seq(
"/var/data/spark-jars/jar1.jar", "/var/data/spark-jars/jar2.jar")
"/var/data/spark-jars/jar1.jar", "/var/data/spark-jars/jar2.jar")
private val SPARK_FILES = Seq(
"hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
"hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
private val RESOLVED_SPARK_FILES = Seq(
"hdfs://localhost:9000/app/files/file1.txt", "file:///var/data/spark-files/file2.txt")
"hdfs://localhost:9000/app/files/file1.txt", "file:///var/data/spark-files/file2.txt")
private val INIT_CONTAINER_SECRET = new SecretBuilder()
.withNewMetadata()
.withName(INIT_CONTAINER_SECRET_NAME)
Expand Down Expand Up @@ -140,6 +141,12 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
private var credentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider = _
@Mock
private var credentialsMounter: DriverPodKubernetesCredentialsMounter = _
@Mock
private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
@Mock
private var namedPodResource: PodResource[Pod, DoneablePod] = _
@Mock
private var watch: Watch = _

before {
MockitoAnnotations.initMocks(this)
Expand Down Expand Up @@ -177,6 +184,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
.build()
}
})
when(podOps.withName(APP_ID)).thenReturn(namedPodResource)
when(namedPodResource.watch(loggingPodStatusWatcher)).thenReturn(watch)
when(containerLocalizedFilesResolver.resolveSubmittedAndRemoteSparkJars())
.thenReturn(RESOLVED_SPARK_REMOTE_AND_LOCAL_JARS)
when(containerLocalizedFilesResolver.resolveSubmittedSparkJars())
Expand Down Expand Up @@ -278,6 +287,25 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
})
}

test("Waiting for completion should await completion on the status watcher.") {
expectationsForNoMountedCredentials()
expectationsForNoDependencyUploader()
new Client(
APP_NAME,
APP_ID,
MAIN_CLASS,
SPARK_CONF,
APP_ARGS,
SPARK_JARS,
SPARK_FILES,
true,
kubernetesClientProvider,
initContainerComponentsProvider,
credentialsMounterProvider,
loggingPodStatusWatcher).run()
verify(loggingPodStatusWatcher).awaitCompletion()
}

private def expectationsForNoDependencyUploader(): Unit = {
when(initContainerComponentsProvider
.provideInitContainerSubmittedDependencyUploader(ALL_EXPECTED_LABELS))
Expand Down Expand Up @@ -353,9 +381,11 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
APP_ARGS,
SPARK_JARS,
SPARK_FILES,
false,
kubernetesClientProvider,
initContainerComponentsProvider,
credentialsMounterProvider).run()
credentialsMounterProvider,
loggingPodStatusWatcher).run()
val podMatcher = new BaseMatcher[Pod] {
override def matches(o: scala.Any): Boolean = {
o match {
Expand Down