From f845a10f24f23a18ec230b3b8581e3bce866cc61 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 29 Nov 2021 22:19:43 -0800 Subject: [PATCH 1/4] [SPARK-37497][K8S] Promote ExecutorPods[PollingSnapshot|WatchSnapshot]Source to DeveloperApi --- .../k8s/ExecutorPodsPollingSnapshotSource.scala | 12 +++++++++++- .../k8s/ExecutorPodsWatchSnapshotSource.scala | 12 +++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index 9e3d23c0063cc..dd1a472ce25b0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -29,7 +29,15 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.{ThreadUtils, Utils} -private[spark] class ExecutorPodsPollingSnapshotSource( +/** + * :: DeveloperApi :: + * + * A class used for monitoring executor pods in ExternalClusterManagers. + * @since 2.4.0 + */ +@Stable +@DeveloperApi +class ExecutorPodsPollingSnapshotSource( conf: SparkConf, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, @@ -39,6 +47,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( private var pollingFuture: Future[_] = _ + @Since("2.4.0") def start(applicationId: String): Unit = { require(pollingFuture == null, "Cannot start polling more than once.") logDebug(s"Starting to check for executor pod state every $pollingInterval ms.") @@ -46,6 +55,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) } + @Since("2.4.0") def stop(): Unit = { if (pollingFuture != null) { pollingFuture.cancel(true) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 762878cbacac6..0c5cc9ca87fed 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -26,12 +26,21 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] class ExecutorPodsWatchSnapshotSource( +/** + * :: DeveloperApi :: + * + * A class used for K8s executor pod monitoring in ExternalClusterManagers. + * @since 2.4.0 + */ +@Stable +@DeveloperApi +class ExecutorPodsWatchSnapshotSource( snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) extends Logging { private var watchConnection: Closeable = _ + @Since("2.4.0") def start(applicationId: String): Unit = { require(watchConnection == null, "Cannot start the watcher twice.") logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + @@ -42,6 +51,7 @@ private[spark] class ExecutorPodsWatchSnapshotSource( .watch(new ExecutorPodsWatcher()) } + @Since("2.4.0") def stop(): Unit = { if (watchConnection != null) { Utils.tryLogNonFatalError { From 8129ab1dd1e2e384caf5851a616e3b255953e9b0 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 29 Nov 2021 22:31:10 -0800 Subject: [PATCH 2/4] Update docs --- .../cluster/k8s/ExecutorPodsPollingSnapshotSource.scala | 2 +- .../cluster/k8s/ExecutorPodsWatchSnapshotSource.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index dd1a472ce25b0..045c6288c1948 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.{ThreadUtils, Utils} /** * :: DeveloperApi :: * - * A class used for monitoring executor pods in ExternalClusterManagers. + * A class used for polling K8s executor pods by ExternalClusterManagers. * @since 2.4.0 */ @Stable diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 0c5cc9ca87fed..7f2f6029d6dc2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -29,7 +29,8 @@ import org.apache.spark.util.Utils /** * :: DeveloperApi :: * - * A class used for K8s executor pod monitoring in ExternalClusterManagers. + * A class used for watching K8s executor pods by ExternalClusterManagers. + * * @since 2.4.0 */ @Stable From fb1d62af938107a43c6140c06c1681117ba965dd Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 29 Nov 2021 22:54:31 -0800 Subject: [PATCH 3/4] Add imports --- .../cluster/k8s/ExecutorPodsPollingSnapshotSource.scala | 1 + .../scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index 045c6288c1948..8cb5fd00c2888 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -24,6 +24,7 @@ import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ import org.apache.spark.SparkConf +import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 7f2f6029d6dc2..1cec0ef90c9d0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -22,6 +22,7 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException} import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils From 3622fa8bdca401ced7a00a5778a9a33d22601a9b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 30 Nov 2021 18:03:46 -0800 Subject: [PATCH 4/4] Address comments --- .../cluster/k8s/ExecutorPodsPollingSnapshotSource.scala | 6 +++--- .../cluster/k8s/ExecutorPodsWatchSnapshotSource.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index 8cb5fd00c2888..192b5993efe07 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.{ThreadUtils, Utils} * :: DeveloperApi :: * * A class used for polling K8s executor pods by ExternalClusterManagers. - * @since 2.4.0 + * @since 3.1.3 */ @Stable @DeveloperApi @@ -48,7 +48,7 @@ class ExecutorPodsPollingSnapshotSource( private var pollingFuture: Future[_] = _ - @Since("2.4.0") + @Since("3.1.3") def start(applicationId: String): Unit = { require(pollingFuture == null, "Cannot start polling more than once.") logDebug(s"Starting to check for executor pod state every $pollingInterval ms.") @@ -56,7 +56,7 @@ class ExecutorPodsPollingSnapshotSource( new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) } - @Since("2.4.0") + @Since("3.1.3") def stop(): Unit = { if (pollingFuture != null) { pollingFuture.cancel(true) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 1cec0ef90c9d0..06d942eb5b36f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.Utils * * A class used for watching K8s executor pods by ExternalClusterManagers. * - * @since 2.4.0 + * @since 3.1.3 */ @Stable @DeveloperApi @@ -42,7 +42,7 @@ class ExecutorPodsWatchSnapshotSource( private var watchConnection: Closeable = _ - @Since("2.4.0") + @Since("3.1.3") def start(applicationId: String): Unit = { require(watchConnection == null, "Cannot start the watcher twice.") logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + @@ -53,7 +53,7 @@ class ExecutorPodsWatchSnapshotSource( .watch(new ExecutorPodsWatcher()) } - @Since("2.4.0") + @Since("3.1.3") def stop(): Unit = { if (watchConnection != null) { Utils.tryLogNonFatalError {