Skip to content

Commit 2b04496

Browse files
committed
[SPARK-37497][K8S] Promote ExecutorPods[PollingSnapshot|WatchSnapshot]Source to DeveloperApi
### What changes were proposed in this pull request? This PR aims to promote `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` as **stable** `DeveloperApi` in order to maintain it officially in a backward compatible way at Apache Spark 3.3.0. ### Why are the changes needed? - Since SPARK-24248 at Apache Spark 2.4.0, `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` have been used to monitor executor pods without any interface changes for over 3 years. - Apache Spark 3.1.1 makes `Kubernetes` module GA and provides an extensible external cluster manager framework. New `ExternalClusterManager` for K8s environment need to depend on this to monitor pods. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. Closes apache#34751 from dongjoon-hyun/SPARK-37497. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent ca25534 commit 2b04496

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,21 @@ import io.fabric8.kubernetes.client.KubernetesClient
2424
import scala.collection.JavaConverters._
2525

2626
import org.apache.spark.SparkConf
27+
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
2728
import org.apache.spark.deploy.k8s.Config._
2829
import org.apache.spark.deploy.k8s.Constants._
2930
import org.apache.spark.internal.Logging
3031
import org.apache.spark.util.{ThreadUtils, Utils}
3132

32-
private[spark] class ExecutorPodsPollingSnapshotSource(
33+
/**
34+
* :: DeveloperApi ::
35+
*
36+
* A class used for polling K8s executor pods by ExternalClusterManagers.
37+
* @since 3.1.3
38+
*/
39+
@Stable
40+
@DeveloperApi
41+
class ExecutorPodsPollingSnapshotSource(
3342
conf: SparkConf,
3443
kubernetesClient: KubernetesClient,
3544
snapshotsStore: ExecutorPodsSnapshotsStore,
@@ -39,13 +48,15 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
3948

4049
private var pollingFuture: Future[_] = _
4150

51+
@Since("3.1.3")
4252
def start(applicationId: String): Unit = {
4353
require(pollingFuture == null, "Cannot start polling more than once.")
4454
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
4555
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
4656
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
4757
}
4858

59+
@Since("3.1.3")
4960
def stop(): Unit = {
5061
if (pollingFuture != null) {
5162
pollingFuture.cancel(true)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,27 @@ import io.fabric8.kubernetes.api.model.Pod
2222
import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException}
2323
import io.fabric8.kubernetes.client.Watcher.Action
2424

25+
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
2526
import org.apache.spark.deploy.k8s.Constants._
2627
import org.apache.spark.internal.Logging
2728
import org.apache.spark.util.Utils
2829

29-
private[spark] class ExecutorPodsWatchSnapshotSource(
30+
/**
31+
* :: DeveloperApi ::
32+
*
33+
* A class used for watching K8s executor pods by ExternalClusterManagers.
34+
*
35+
* @since 3.1.3
36+
*/
37+
@Stable
38+
@DeveloperApi
39+
class ExecutorPodsWatchSnapshotSource(
3040
snapshotsStore: ExecutorPodsSnapshotsStore,
3141
kubernetesClient: KubernetesClient) extends Logging {
3242

3343
private var watchConnection: Closeable = _
3444

45+
@Since("3.1.3")
3546
def start(applicationId: String): Unit = {
3647
require(watchConnection == null, "Cannot start the watcher twice.")
3748
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
@@ -42,6 +53,7 @@ private[spark] class ExecutorPodsWatchSnapshotSource(
4253
.watch(new ExecutorPodsWatcher())
4354
}
4455

56+
@Since("3.1.3")
4557
def stop(): Unit = {
4658
if (watchConnection != null) {
4759
Utils.tryLogNonFatalError {

0 commit comments

Comments
 (0)