Skip to content

Commit 5bffb98

Browse files
holdenkdongjoon-hyun
authored andcommitted
[SPARK-36462][K8S] Add the ability to selectively disable watching or polling
### What changes were proposed in this pull request? Add the ability to selectively disable watching or polling Updated version of #34264 ### Why are the changes needed? Watching or polling for pod status on Kubernetes can place additional load on etcd, with a large number of executors and large number of jobs this can have negative impacts and executors register themselves with the driver under normal operations anyways. ### Does this PR introduce _any_ user-facing change? Two new config flags. ### How was this patch tested? New unit tests + manually tested a forked version of this on an internal cluster with both watching and polling disabled. Closes #36433 from holdenk/SPARK-36462-allow-spark-on-kube-to-operate-without-watchers. Lead-authored-by: Holden Karau <[email protected]> Co-authored-by: Holden Karau <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 6cd9d88 commit 5bffb98

File tree

6 files changed

+87
-26
lines changed

6 files changed

+87
-26
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,25 @@ private[spark] object Config extends Logging {
483483
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
484484
.createWithDefaultString("1s")
485485

486+
val KUBERNETES_EXECUTOR_ENABLE_API_POLLING =
487+
ConfigBuilder("spark.kubernetes.executor.enableApiPolling")
488+
.doc("If Spark should poll Kubernetes for executor pod status. " +
489+
"You should leave this enabled unless you're encountering issues with your etcd.")
490+
.version("3.4.0")
491+
.internal()
492+
.booleanConf
493+
.createWithDefault(true)
494+
495+
val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER =
496+
ConfigBuilder("spark.kubernetes.executor.enableApiWatcher")
497+
.doc("If Spark should create watchers for executor pod status. " +
498+
"You should leave this enabled unless you're encountering issues with your etcd.")
499+
.version("3.4.0")
500+
.internal()
501+
.booleanConf
502+
.createWithDefault(true)
503+
504+
486505
val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
487506
ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
488507
.doc("Interval between polls against the Kubernetes API server to inspect the " +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,18 @@ class ExecutorPodsPollingSnapshotSource(
4545
pollingExecutor: ScheduledExecutorService) extends Logging {
4646

4747
private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
48+
private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING)
4849

4950
private var pollingFuture: Future[_] = _
5051

5152
@Since("3.1.3")
5253
def start(applicationId: String): Unit = {
53-
require(pollingFuture == null, "Cannot start polling more than once.")
54-
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
55-
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
56-
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
54+
if (pollingEnabled) {
55+
require(pollingFuture == null, "Cannot start polling more than once.")
56+
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
57+
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
58+
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
59+
}
5760
}
5861

5962
@Since("3.1.3")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import io.fabric8.kubernetes.api.model.Pod
2222
import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException}
2323
import io.fabric8.kubernetes.client.Watcher.Action
2424

25+
import org.apache.spark.{SparkConf, SparkContext}
2526
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
27+
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
2628
import org.apache.spark.deploy.k8s.Constants._
2729
import org.apache.spark.internal.Logging
2830
import org.apache.spark.util.Utils
@@ -38,19 +40,28 @@ import org.apache.spark.util.Utils
3840
@DeveloperApi
3941
class ExecutorPodsWatchSnapshotSource(
4042
snapshotsStore: ExecutorPodsSnapshotsStore,
41-
kubernetesClient: KubernetesClient) extends Logging {
43+
kubernetesClient: KubernetesClient,
44+
conf: SparkConf) extends Logging {
4245

4346
private var watchConnection: Closeable = _
47+
private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER)
48+
49+
// If we're constructed with the old API get the SparkConf from the running SparkContext.
50+
def this(snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) = {
51+
this(snapshotsStore, kubernetesClient, SparkContext.getOrCreate().conf)
52+
}
4453

4554
@Since("3.1.3")
4655
def start(applicationId: String): Unit = {
47-
require(watchConnection == null, "Cannot start the watcher twice.")
48-
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
49-
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
50-
watchConnection = kubernetesClient.pods()
51-
.withLabel(SPARK_APP_ID_LABEL, applicationId)
52-
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
53-
.watch(new ExecutorPodsWatcher())
56+
if (enableWatching) {
57+
require(watchConnection == null, "Cannot start the watcher twice.")
58+
logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
59+
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
60+
watchConnection = kubernetesClient.pods()
61+
.withLabel(SPARK_APP_ID_LABEL, applicationId)
62+
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
63+
.watch(new ExecutorPodsWatcher())
64+
}
5465
}
5566

5667
@Since("3.1.3")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
114114

115115
val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
116116
snapshotsStore,
117-
kubernetesClient)
117+
kubernetesClient,
118+
sc.conf)
118119

119120
val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
120121
"kubernetes-executor-pod-polling-sync")

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{ListOptionsBuilder, PodListBuilder}
2222
import io.fabric8.kubernetes.client.KubernetesClient
2323
import org.jmock.lib.concurrent.DeterministicScheduler
2424
import org.mockito.{Mock, MockitoAnnotations}
25-
import org.mockito.Mockito.{verify, when}
25+
import org.mockito.Mockito.{never, verify, when}
2626
import org.scalatest.BeforeAndAfter
2727

2828
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -33,9 +33,9 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
3333

3434
class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {
3535

36-
private val sparkConf = new SparkConf
36+
private val defaultConf = new SparkConf()
3737

38-
private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
38+
private val pollingInterval = defaultConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
3939

4040
@Mock
4141
private var kubernetesClient: KubernetesClient = _
@@ -61,12 +61,6 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
6161
before {
6262
MockitoAnnotations.openMocks(this).close()
6363
pollingExecutor = new DeterministicScheduler()
64-
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
65-
sparkConf,
66-
kubernetesClient,
67-
eventQueue,
68-
pollingExecutor)
69-
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
7064
when(kubernetesClient.pods()).thenReturn(podOperations)
7165
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
7266
.thenReturn(appIdLabeledPods)
@@ -77,6 +71,13 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
7771
}
7872

7973
test("Items returned by the API should be pushed to the event queue") {
74+
val sparkConf = new SparkConf()
75+
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
76+
sparkConf,
77+
kubernetesClient,
78+
eventQueue,
79+
pollingExecutor)
80+
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
8081
val exec1 = runningExecutor(1)
8182
val exec2 = runningExecutor(2)
8283
when(activeExecutorPods.list())
@@ -89,13 +90,27 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
8990
verify(eventQueue).replaceSnapshot(Seq(exec1, exec2))
9091
}
9192

93+
test("SPARK-36462: If polling is disabled we don't call pods() on the client") {
94+
val sparkConf = new SparkConf()
95+
val source = new ExecutorPodsPollingSnapshotSource(
96+
sparkConf.set(KUBERNETES_EXECUTOR_ENABLE_API_POLLING, false),
97+
kubernetesClient,
98+
eventQueue,
99+
pollingExecutor)
100+
source.start(TEST_SPARK_APP_ID)
101+
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
102+
verify(kubernetesClient, never()).pods()
103+
}
104+
92105
test("SPARK-36334: Support pod listing with resource version") {
93106
Seq(true, false).foreach { value =>
107+
val sparkConf = new SparkConf()
94108
val source = new ExecutorPodsPollingSnapshotSource(
95109
sparkConf.set(KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION, value),
96110
kubernetesClient,
97111
eventQueue,
98112
pollingExecutor)
113+
source.start(TEST_SPARK_APP_ID)
99114
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
100115
if (value) {
101116
verify(activeExecutorPods).list(new ListOptionsBuilder().withResourceVersion("0").build())

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ import io.fabric8.kubernetes.api.model.Pod
2020
import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
2121
import io.fabric8.kubernetes.client.Watcher.Action
2222
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
23-
import org.mockito.Mockito.{verify, when}
23+
import org.mockito.Mockito.{never, verify, when}
2424
import org.scalatest.BeforeAndAfter
2525

26+
import org.apache.spark.SparkConf
2627
import org.apache.spark.SparkFunSuite
28+
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
2729
import org.apache.spark.deploy.k8s.Constants._
2830
import org.apache.spark.deploy.k8s.Fabric8Aliases._
2931
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
@@ -61,17 +63,27 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
6163
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
6264
.thenReturn(executorRoleLabeledPods)
6365
when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
64-
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
65-
eventQueue, kubernetesClient)
66-
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
6766
}
6867

6968
test("Watch events should be pushed to the snapshots store as snapshot updates.") {
69+
val conf = new SparkConf()
70+
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
71+
eventQueue, kubernetesClient, conf)
72+
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
7073
val exec1 = runningExecutor(1)
7174
val exec2 = runningExecutor(2)
7275
watch.getValue.eventReceived(Action.ADDED, exec1)
7376
watch.getValue.eventReceived(Action.MODIFIED, exec2)
7477
verify(eventQueue).updatePod(exec1)
7578
verify(eventQueue).updatePod(exec2)
7679
}
80+
81+
test("SPARK-36462: Verify if watchers are disabled we don't call pods() on the client") {
82+
val conf = new SparkConf()
83+
conf.set(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER, false)
84+
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
85+
eventQueue, kubernetesClient, conf)
86+
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
87+
verify(kubernetesClient, never()).pods()
88+
}
7789
}

0 commit comments

Comments
 (0)