diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LoggingPodStatusWatcher.scala similarity index 54% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LoggingPodStatusWatcher.scala index 537bcccaa1458..1633a084e463c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LoggingPodStatusWatcher.scala @@ -14,32 +14,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes.submit.v1 +package org.apache.spark.deploy.kubernetes.submit -import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} +import java.util.concurrent.{CountDownLatch, TimeUnit} -import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod} import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils +private[kubernetes] trait LoggingPodStatusWatcher extends Watcher[Pod] { + def awaitCompletion(): Unit +} + /** * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on * every state change and also at an interval for liveness. * - * @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes * @param appId - * @param interval ms between each state request. If set to 0 or a negative number, the periodic - * logging will be disabled. + * @param maybeLoggingInterval ms between each state request. If provided, must be a positive + * number. */ -private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch, - appId: String, - interval: Long) - extends Watcher[Pod] with Logging { +private[kubernetes] class LoggingPodStatusWatcherImpl( + appId: String, maybeLoggingInterval: Option[Long]) + extends LoggingPodStatusWatcher with Logging { + private val podCompletedFuture = new CountDownLatch(1) // start timer for periodic logging private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") @@ -47,13 +51,13 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL override def run() = logShortStatus() } - private var pod: Option[Pod] = Option.empty - private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown") - private def status: String = pod.map(_.getStatus().getContainerStatuses().toString()) - .getOrElse("unknown") + private var pod = Option.empty[Pod] + + private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") def start(): Unit = { - if (interval > 0) { + maybeLoggingInterval.foreach { interval => + require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.") scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) } } @@ -98,7 +102,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL } private def formatPodState(pod: Pod): String = { - + // TODO include specific container state val details = Seq[(String, String)]( // pod metadata ("pod name", pod.getMetadata.getName()), @@ -116,17 +120,59 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL ("start time", pod.getStatus.getStartTime), ("container images", pod.getStatus.getContainerStatuses() - .asScala - .map(_.getImage) - .mkString(", ")), + .asScala + .map(_.getImage) + .mkString(", ")), ("phase", pod.getStatus.getPhase()), ("status", pod.getStatus.getContainerStatuses().toString) ) + formatPairsBundle(details) + } + private def formatPairsBundle(pairs: Seq[(String, String)]) = { // Use more loggable format if value is null or empty - details.map { case (k, v) => - val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") - s"\n\t $k: $newValue" + pairs.map { + case (k, v) => s"\n\t $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}" }.mkString("") } + + override def awaitCompletion(): Unit = { + podCompletedFuture.countDown() + logInfo(pod.map { p => + s"Container final statuses:\n\n${containersDescription(p)}" + }.getOrElse("No containers were found in the driver pod.")) + } + + private def containersDescription(p: Pod): String = { + p.getStatus.getContainerStatuses.asScala.map { status => + Seq( + ("Container name", status.getName), + ("Container image", status.getImage)) ++ + containerStatusDescription(status) + }.map(formatPairsBundle).mkString("\n\n") + } + + private def containerStatusDescription( + containerStatus: ContainerStatus): Seq[(String, String)] = { + val state = containerStatus.getState + Option(state.getRunning) + .orElse(Option(state.getTerminated)) + .orElse(Option(state.getWaiting)) + .map { + case running: ContainerStateRunning => + Seq( + ("Container state", "Running"), + ("Container started at", running.getStartedAt)) + case waiting: ContainerStateWaiting => + Seq( + ("Container state", "Waiting"), + ("Pending reason", waiting.getReason)) + case terminated: ContainerStateTerminated => + Seq( + ("Container state", "Terminated"), + ("Exit code", terminated.getExitCode.toString)) + case unknown => + throw new SparkException(s"Unexpected container status type ${unknown.getClass}.") + }.getOrElse(Seq(("Container state", "N/A"))) + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala index fa3c97c6957b5..32fc434cb693a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala @@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils} +import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils, LoggingPodStatusWatcherImpl} import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource} import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -83,7 +83,9 @@ private[spark] class Client( MEMORY_OVERHEAD_MIN)) private val driverContainerMemoryWithOverhead = driverContainerMemoryMb + memoryOverheadMb - private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION) + private val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) + private val loggingInterval = Some(sparkConf.get(REPORT_INTERVAL)) + .filter( _ => waitForAppCompletion) private val secretBase64String = { val secretBytes = new Array[Byte](128) @@ -147,10 +149,8 @@ private[spark] class Client( driverServiceManager.start(kubernetesClient, kubernetesAppId, sparkConf) // start outer watch for status logging of driver pod // only enable interval logging if in waitForAppCompletion mode - val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 - val driverPodCompletedLatch = new CountDownLatch(1) - val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, - loggingInterval) + val loggingWatch = new LoggingPodStatusWatcherImpl( + kubernetesAppId, loggingInterval) Utils.tryWithResource(kubernetesClient .pods() .withName(kubernetesDriverPodName) @@ -230,7 +230,7 @@ private[spark] class Client( // wait if configured to do so if (waitForAppCompletion) { logInfo(s"Waiting for application $kubernetesAppId to finish...") - driverPodCompletedLatch.await() + loggingWatch.awaitCompletion() logInfo(s"Application $kubernetesAppId finished.") } else { logInfo(s"Application $kubernetesAppId successfully launched.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala index 23e3e09834372..e4ca5c1458abe 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.{LoggingPodStatusWatcher, LoggingPodStatusWatcherImpl} import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher @@ -48,9 +49,11 @@ private[spark] class Client( appArgs: Array[String], sparkJars: Seq[String], sparkFiles: Seq[String], + waitForAppCompletion: Boolean, kubernetesClientProvider: SubmissionKubernetesClientProvider, initContainerComponentsProvider: DriverInitContainerComponentsProvider, - kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider) + kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider, + loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME) @@ -186,27 +189,40 @@ private[spark] class Client( .endContainer() .endSpec() .build() - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { - val driverOwnedResources = Seq(initContainerConfigMap) ++ - maybeSubmittedDependenciesSecret.toSeq ++ - credentialsSecret.toSeq - val driverPodOwnerReference = new OwnerReferenceBuilder() - .withName(createdDriverPod.getMetadata.getName) - .withApiVersion(createdDriverPod.getApiVersion) - .withUid(createdDriverPod.getMetadata.getUid) - .withKind(createdDriverPod.getKind) - .withController(true) - .build() - driverOwnedResources.foreach { resource => - val originalMetadata = resource.getMetadata - originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference)) + Utils.tryWithResource( + kubernetesClient + .pods() + .withName(resolvedDriverPod.getMetadata.getName) + .watch(loggingPodStatusWatcher)) { _ => + val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + try { + val driverOwnedResources = Seq(initContainerConfigMap) ++ + maybeSubmittedDependenciesSecret.toSeq ++ + credentialsSecret.toSeq + val driverPodOwnerReference = new OwnerReferenceBuilder() + .withName(createdDriverPod.getMetadata.getName) + .withApiVersion(createdDriverPod.getApiVersion) + .withUid(createdDriverPod.getMetadata.getUid) + .withKind(createdDriverPod.getKind) + .withController(true) + .build() + driverOwnedResources.foreach { resource => + val originalMetadata = resource.getMetadata + originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference)) + } + kubernetesClient.resourceList(driverOwnedResources: _*).createOrReplace() + } catch { + case e: Throwable => + kubernetesClient.pods().delete(createdDriverPod) + throw e + } + if (waitForAppCompletion) { + logInfo(s"Waiting for application $kubernetesAppId to finish...") + loggingPodStatusWatcher.awaitCompletion() + logInfo(s"Application $kubernetesAppId finished.") + } else { + logInfo(s"Deployed Spark application $kubernetesAppId into Kubernetes.") } - kubernetesClient.resourceList(driverOwnedResources: _*).createOrReplace() - } catch { - case e: Throwable => - kubernetesClient.pods().delete(createdDriverPod) - throw e } } } @@ -274,6 +290,9 @@ private[spark] object Client { val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf) val kubernetesCredentialsMounterProvider = new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId) + val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) + val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter( _ => waitForAppCompletion) + val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) new Client( appName, kubernetesAppId, @@ -282,8 +301,10 @@ private[spark] object Client { appArgs, sparkJars, sparkFiles, + waitForAppCompletion, kubernetesClientProvider, initContainerComponentsProvider, - kubernetesCredentialsMounterProvider).run() + kubernetesCredentialsMounterProvider, + loggingPodStatusWatcher).run() } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala index f0282dbb6d31a..9ad46e52747fd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes.submit.v2 import java.io.File import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder} -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, Watch} import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} import org.hamcrest.{BaseMatcher, Description} import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations} @@ -35,6 +35,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.kubernetes.SparkPodInitContainerBootstrap import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.LoggingPodStatusWatcher class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private val JARS_RESOURCE = SubmittedResourceIdAndSecret("jarsId", "jarsSecret") @@ -59,13 +60,13 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private val SPARK_JARS = Seq( "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar") private val RESOLVED_SPARK_JARS = Seq( - "hdfs://localhost:9000/app/jars/jar1.jar", "file:///var/data/spark-jars/jar2.jar") + "hdfs://localhost:9000/app/jars/jar1.jar", "file:///var/data/spark-jars/jar2.jar") private val RESOLVED_SPARK_REMOTE_AND_LOCAL_JARS = Seq( - "/var/data/spark-jars/jar1.jar", "/var/data/spark-jars/jar2.jar") + "/var/data/spark-jars/jar1.jar", "/var/data/spark-jars/jar2.jar") private val SPARK_FILES = Seq( - "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt") + "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt") private val RESOLVED_SPARK_FILES = Seq( - "hdfs://localhost:9000/app/files/file1.txt", "file:///var/data/spark-files/file2.txt") + "hdfs://localhost:9000/app/files/file1.txt", "file:///var/data/spark-files/file2.txt") private val INIT_CONTAINER_SECRET = new SecretBuilder() .withNewMetadata() .withName(INIT_CONTAINER_SECRET_NAME) @@ -140,6 +141,12 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private var credentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider = _ @Mock private var credentialsMounter: DriverPodKubernetesCredentialsMounter = _ + @Mock + private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _ + @Mock + private var namedPodResource: PodResource[Pod, DoneablePod] = _ + @Mock + private var watch: Watch = _ before { MockitoAnnotations.initMocks(this) @@ -177,6 +184,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .build() } }) + when(podOps.withName(APP_ID)).thenReturn(namedPodResource) + when(namedPodResource.watch(loggingPodStatusWatcher)).thenReturn(watch) when(containerLocalizedFilesResolver.resolveSubmittedAndRemoteSparkJars()) .thenReturn(RESOLVED_SPARK_REMOTE_AND_LOCAL_JARS) when(containerLocalizedFilesResolver.resolveSubmittedSparkJars()) @@ -278,6 +287,25 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { }) } + test("Waiting for completion should await completion on the status watcher.") { + expectationsForNoMountedCredentials() + expectationsForNoDependencyUploader() + new Client( + APP_NAME, + APP_ID, + MAIN_CLASS, + SPARK_CONF, + APP_ARGS, + SPARK_JARS, + SPARK_FILES, + true, + kubernetesClientProvider, + initContainerComponentsProvider, + credentialsMounterProvider, + loggingPodStatusWatcher).run() + verify(loggingPodStatusWatcher).awaitCompletion() + } + private def expectationsForNoDependencyUploader(): Unit = { when(initContainerComponentsProvider .provideInitContainerSubmittedDependencyUploader(ALL_EXPECTED_LABELS)) @@ -353,9 +381,11 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { APP_ARGS, SPARK_JARS, SPARK_FILES, + false, kubernetesClientProvider, initContainerComponentsProvider, - credentialsMounterProvider).run() + credentialsMounterProvider, + loggingPodStatusWatcher).run() val podMatcher = new BaseMatcher[Pod] { override def matches(o: scala.Any): Boolean = { o match {