diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index 9e3d23c0063cc..192b5993efe07 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -24,12 +24,21 @@ import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ import org.apache.spark.SparkConf +import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.{ThreadUtils, Utils} -private[spark] class ExecutorPodsPollingSnapshotSource( +/** + * :: DeveloperApi :: + * + * A class used for polling K8s executor pods by ExternalClusterManagers. + * @since 3.1.3 + */ +@Stable +@DeveloperApi +class ExecutorPodsPollingSnapshotSource( conf: SparkConf, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, @@ -39,6 +48,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( private var pollingFuture: Future[_] = _ + @Since("3.1.3") def start(applicationId: String): Unit = { require(pollingFuture == null, "Cannot start polling more than once.") logDebug(s"Starting to check for executor pod state every $pollingInterval ms.") @@ -46,6 +56,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) } + @Since("3.1.3") def stop(): Unit = { if (pollingFuture != null) { pollingFuture.cancel(true) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 762878cbacac6..06d942eb5b36f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -22,16 +22,27 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException} import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] class ExecutorPodsWatchSnapshotSource( +/** + * :: DeveloperApi :: + * + * A class used for watching K8s executor pods by ExternalClusterManagers. + * + * @since 3.1.3 + */ +@Stable +@DeveloperApi +class ExecutorPodsWatchSnapshotSource( snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) extends Logging { private var watchConnection: Closeable = _ + @Since("3.1.3") def start(applicationId: String): Unit = { require(watchConnection == null, "Cannot start the watcher twice.") logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + @@ -42,6 +53,7 @@ private[spark] class ExecutorPodsWatchSnapshotSource( .watch(new ExecutorPodsWatcher()) } + @Since("3.1.3") def stop(): Unit = { if (watchConnection != null) { Utils.tryLogNonFatalError {