Skip to content

Commit 65d7cc7

Browse files
committed
Add the ability to selectively disable watching or polling for pods on Kubernetes for environments where etcd may be under a high load or otherwise not support polling/watching.
1 parent 1af7072 commit 65d7cc7

File tree

6 files changed

+80
-26
lines changed

6 files changed

+80
-26
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,23 @@ private[spark] object Config extends Logging {
395395
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
396396
.createWithDefaultString("1s")
397397

398+
val KUBERNETES_EXECUTOR_ENABLE_API_POLLING =
399+
ConfigBuilder("spark.kubernetes.executor.enableApiPolling")
400+
.doc("If Spark should poll Kubernetes for executor pod status. " +
401+
"You should leave this enabled unless your encountering performance issues with your etcd.")
402+
.version("3.3.0")
403+
.booleanConf
404+
.createWithDefault(true)
405+
406+
val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER =
407+
ConfigBuilder("spark.kubernetes.executor.enableApiWatcher")
408+
.doc("If Spark should create watchers for executor pod status. " +
409+
"You should leave this enabled unless your encountering performance issues with your etcd.")
410+
.version("3.3.0")
411+
.booleanConf
412+
.createWithDefault(true)
413+
414+
398415
val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
399416
ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
400417
.doc("Interval between polls against the Kubernetes API server to inspect the " +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,17 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
3636
pollingExecutor: ScheduledExecutorService) extends Logging {
3737

3838
private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
39+
private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING)
3940

4041
private var pollingFuture: Future[_] = _
4142

4243
def start(applicationId: String): Unit = {
43-
require(pollingFuture == null, "Cannot start polling more than once.")
44-
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
45-
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
46-
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
44+
if (pollingEnabled) {
45+
require(pollingFuture == null, "Cannot start polling more than once.")
46+
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
47+
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
48+
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
49+
}
4750
}
4851

4952
def stop(): Unit = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,30 @@ import io.fabric8.kubernetes.api.model.Pod
2222
import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException}
2323
import io.fabric8.kubernetes.client.Watcher.Action
2424

25+
import org.apache.spark.SparkConf
26+
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
2527
import org.apache.spark.deploy.k8s.Constants._
2628
import org.apache.spark.internal.Logging
2729
import org.apache.spark.util.Utils
2830

2931
private[spark] class ExecutorPodsWatchSnapshotSource(
3032
snapshotsStore: ExecutorPodsSnapshotsStore,
31-
kubernetesClient: KubernetesClient) extends Logging {
33+
kubernetesClient: KubernetesClient,
34+
conf: SparkConf) extends Logging {
3235

3336
private var watchConnection: Closeable = _
37+
private val enablePolling = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER)
3438

3539
def start(applicationId: String): Unit = {
36-
require(watchConnection == null, "Cannot start the watcher twice.")
37-
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
38-
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
39-
watchConnection = kubernetesClient.pods()
40-
.withLabel(SPARK_APP_ID_LABEL, applicationId)
41-
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
42-
.watch(new ExecutorPodsWatcher())
40+
if (enablePolling) {
41+
require(watchConnection == null, "Cannot start the watcher twice.")
42+
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
43+
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
44+
watchConnection = kubernetesClient.pods()
45+
.withLabel(SPARK_APP_ID_LABEL, applicationId)
46+
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
47+
.watch(new ExecutorPodsWatcher())
48+
}
4349
}
4450

4551
def stop(): Unit = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
114114

115115
val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
116116
snapshotsStore,
117-
kubernetesClient)
117+
kubernetesClient,
118+
sc.conf)
118119

119120
val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
120121
"kubernetes-executor-pod-polling-sync")

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{ListOptionsBuilder, PodListBuilder}
2222
import io.fabric8.kubernetes.client.KubernetesClient
2323
import org.jmock.lib.concurrent.DeterministicScheduler
2424
import org.mockito.{Mock, MockitoAnnotations}
25-
import org.mockito.Mockito.{verify, when}
25+
import org.mockito.Mockito.{never, verify, when}
2626
import org.scalatest.BeforeAndAfter
2727

2828
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -33,9 +33,9 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
3333

3434
class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {
3535

36-
private val sparkConf = new SparkConf
36+
private val defaultConf = new SparkConf()
3737

38-
private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
38+
private val pollingInterval = defaultConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
3939

4040
@Mock
4141
private var kubernetesClient: KubernetesClient = _
@@ -61,12 +61,6 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
6161
before {
6262
MockitoAnnotations.openMocks(this).close()
6363
pollingExecutor = new DeterministicScheduler()
64-
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
65-
sparkConf,
66-
kubernetesClient,
67-
eventQueue,
68-
pollingExecutor)
69-
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
7064
when(kubernetesClient.pods()).thenReturn(podOperations)
7165
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
7266
.thenReturn(appIdLabeledPods)
@@ -77,6 +71,13 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
7771
}
7872

7973
test("Items returned by the API should be pushed to the event queue") {
74+
val sparkConf = new SparkConf()
75+
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
76+
sparkConf,
77+
kubernetesClient,
78+
eventQueue,
79+
pollingExecutor)
80+
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
8081
val exec1 = runningExecutor(1)
8182
val exec2 = runningExecutor(2)
8283
when(activeExecutorPods.list())
@@ -89,13 +90,27 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
8990
verify(eventQueue).replaceSnapshot(Seq(exec1, exec2))
9091
}
9192

93+
test("If polling is disabled we don't call pods() on the client") {
94+
val sparkConf = new SparkConf()
95+
val source = new ExecutorPodsPollingSnapshotSource(
96+
sparkConf.set(KUBERNETES_EXECUTOR_ENABLE_API_POLLING, false),
97+
kubernetesClient,
98+
eventQueue,
99+
pollingExecutor)
100+
source.start(TEST_SPARK_APP_ID)
101+
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
102+
verify(kubernetesClient, never()).pods()
103+
}
104+
92105
test("SPARK-36334: Support pod listing with resource version") {
93106
Seq(true, false).foreach { value =>
107+
val sparkConf = new SparkConf()
94108
val source = new ExecutorPodsPollingSnapshotSource(
95109
sparkConf.set(KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION, value),
96110
kubernetesClient,
97111
eventQueue,
98112
pollingExecutor)
113+
source.start(TEST_SPARK_APP_ID)
99114
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
100115
if (value) {
101116
verify(activeExecutorPods).list(new ListOptionsBuilder().withResourceVersion("0").build())

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ import io.fabric8.kubernetes.api.model.Pod
2020
import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
2121
import io.fabric8.kubernetes.client.Watcher.Action
2222
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
23-
import org.mockito.Mockito.{verify, when}
23+
import org.mockito.Mockito.{never, verify, when}
2424
import org.scalatest.BeforeAndAfter
2525

26+
import org.apache.spark.SparkConf
2627
import org.apache.spark.SparkFunSuite
28+
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
2729
import org.apache.spark.deploy.k8s.Constants._
2830
import org.apache.spark.deploy.k8s.Fabric8Aliases._
2931
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
@@ -61,17 +63,27 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
6163
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
6264
.thenReturn(executorRoleLabeledPods)
6365
when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
64-
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
65-
eventQueue, kubernetesClient)
66-
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
6766
}
6867

6968
test("Watch events should be pushed to the snapshots store as snapshot updates.") {
69+
val conf = new SparkConf()
70+
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
71+
eventQueue, kubernetesClient, conf)
72+
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
7073
val exec1 = runningExecutor(1)
7174
val exec2 = runningExecutor(2)
7275
watch.getValue.eventReceived(Action.ADDED, exec1)
7376
watch.getValue.eventReceived(Action.MODIFIED, exec2)
7477
verify(eventQueue).updatePod(exec1)
7578
verify(eventQueue).updatePod(exec2)
7679
}
80+
81+
test("Verify if watchers are disabled we don't call pods() on the client") {
82+
val conf = new SparkConf()
83+
conf.set(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER, false)
84+
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
85+
eventQueue, kubernetesClient, conf)
86+
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
87+
verify(kubernetesClient, never()).pods()
88+
}
7789
}

0 commit comments

Comments
 (0)