Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
1 change: 0 additions & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,3 @@
</build>

</project>

Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ private[spark] class Client(
.pods()
.withName(kubernetesDriverPodName)
.watch(loggingWatch)) { _ =>
loggingWatch.start()
val resourceCleanShutdownHook = ShutdownHookManager.addShutdownHook(() =>
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient))
val cleanupServiceManagerHook = ShutdownHookManager.addShutdownHook(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils

/**
* A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
Expand All @@ -40,19 +41,23 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL
extends Watcher[Pod] with Logging {

// start timer for periodic logging
private val scheduler = Executors.newScheduledThreadPool(1)
private val scheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
private val logRunnable: Runnable = new Runnable {
override def run() = logShortStatus()
}
if (interval > 0) {
scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
}

private var pod: Option[Pod] = Option.empty
private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown")
private def status: String = pod.map(_.getStatus().getContainerStatuses().toString())
.getOrElse("unknown")

def start(): Unit = {
if (interval > 0) {
scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
}
}

override def eventReceived(action: Action, pod: Pod): Unit = {
this.pod = Option(pod)
action match {
Expand Down