diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 70c252009c9b4..aa429f73a5627 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -133,4 +133,3 @@ - diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala index 0f1e7886a1ba2..8f1e356bec8ca 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala @@ -155,6 +155,7 @@ private[spark] class Client( .pods() .withName(kubernetesDriverPodName) .watch(loggingWatch)) { _ => + loggingWatch.start() val resourceCleanShutdownHook = ShutdownHookManager.addShutdownHook(() => kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) val cleanupServiceManagerHook = ShutdownHookManager.addShutdownHook( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala index 7be334194d9d7..537bcccaa1458 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala @@ -24,6 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils /** * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on @@ -40,19 +41,23 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL extends Watcher[Pod] with Logging { // start timer for periodic logging - private val scheduler = Executors.newScheduledThreadPool(1) + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") private val logRunnable: Runnable = new Runnable { override def run() = logShortStatus() } - if (interval > 0) { - scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS) - } private var pod: Option[Pod] = Option.empty private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown") private def status: String = pod.map(_.getStatus().getContainerStatuses().toString()) .getOrElse("unknown") + def start(): Unit = { + if (interval > 0) { + scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) + } + } + override def eventReceived(action: Action, pod: Pod): Unit = { this.pod = Option(pod) action match {