Skip to content

Commit 3acb74e

Browse files
yifhuayujlimanuzhang
authored andcommitted
[HADP-52545][HADP-43018] Disable rack resolve when registering executor on driver (apache#227)
* [HADP-43018] Disable rack resolve when registering executor on driver (apache#388) (apache#74) Make `YarnClusterScheduler` to extend `TaskSchedulerImpl` rather than `YarnScheduler` such that rack resolve is disabled. We've seen driver stuck in following thread with larger number of executors registering. Since we don't need rack info for locality, add a config to disable rack resolve by default, which could possibly eliminate the bottleneck in driver. ``` "dispatcher-event-loop-15" apache#50 daemon prio=5 os_prio=0 tid=0x00007f751a394000 nid=0x11953 runnable [0x00007f74c6290000] java.lang.Thread.State: RUNNABLE at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) at java.net.InetAddress.getAllByName0(InetAddress.java:1277) at java.net.InetAddress.getAllByName(InetAddress.java:1193) at java.net.InetAddress.getAllByName(InetAddress.java:1127) at java.net.InetAddress.getByName(InetAddress.java:1077) at org.apache.hadoop.net.NetUtils.normalizeHostName(NetUtils.java:563) at org.apache.hadoop.net.NetUtils.normalizeHostNames(NetUtils.java:580) at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:109) at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101) at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81) at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:37) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:329) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:318) ``` No Add UT. I've run a test https://bdp.vip.ebay.com/job/detail/?cluster=apollorno&jobType=SPARK&jobId=application_1635906065713_321559&tab=0 on apollorno. The test succeeded with 16612 executors and many executor failed to register. This patch could improve driver performance but it will still run into bottleneck when there are too many executors registering at the same time. ``` 21/11/08 07:40:19 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com:30201 21/11/08 07:42:19 ERROR TransportChannelHandler: Connection to hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com/10.78.173.174:30201 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 21/11/08 07:42:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com/10.78.173.174:30201 is closed 21/11/08 07:42:19 WARN NettyRpcEnv: Ignored failure: java.io.IOException: Connection from hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com/10.78.173.174:30201 closed 21/11/08 07:42:19 ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Driver hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com:30201 disassociated! Shutting down. 21/11/08 07:42:19 ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Cannot register with driver: spark://CoarseGrainedScheduler@hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com:30201 org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from hdc42-mcc10-01-0910-2704-050-tess0028.stratus.rno.ebay.com:30201 in 120 seconds. This timeout is controlled by spark.network.timeout ``` Co-authored-by: tianlzhang <[email protected]> Co-authored-by: yujli <[email protected]> Co-authored-by: tianlzhang <[email protected]>
1 parent 5e2558b commit 3acb74e

File tree

3 files changed

+77
-7
lines changed

3 files changed

+77
-7
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ package object config extends Logging {
155155
.version("3.1.0")
156156
.stringConf
157157
.createWithDefault("SPARK")
158+
private[spark] val RESOLVE_RACK_ENABLED = ConfigBuilder("spark.yarn.resolveRack")
159+
.doc("Whether to resolve rack from host in YarnScheduler.")
160+
.booleanConf
161+
.createWithDefault(true)
158162

159163
/* File distribution. */
160164

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ package org.apache.spark.scheduler.cluster
2020
import java.util.concurrent.ConcurrentHashMap
2121

2222
import org.apache.hadoop.net.NetworkTopology
23+
import org.apache.hadoop.yarn.util.RackResolver
2324

2425
import org.apache.spark._
2526
import org.apache.spark.deploy.yarn.SparkRackResolver
27+
import org.apache.spark.deploy.yarn.config.RESOLVE_RACK_ENABLED
2628
import org.apache.spark.scheduler.TaskSchedulerImpl
2729
import org.apache.spark.util.Utils
2830

@@ -36,15 +38,28 @@ private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(s
3638
private[spark] val resolver = SparkRackResolver.get(sc.hadoopConfiguration)
3739

3840
override def getRacksForHosts(hostPorts: Seq[String]): Seq[Option[String]] = {
39-
val hosts = hostPorts.map(Utils.parseHostPort(_)._1)
40-
val uncachedHosts = hosts.filterNot(hostToRackCache.containsKey(_))
41-
if (uncachedHosts.nonEmpty) {
42-
val resolvedHosts = resolver.resolve(uncachedHosts).map { node =>
43-
Option(node.getNetworkLocation)
41+
if (sc.conf.get(RESOLVE_RACK_ENABLED)) {
42+
val hosts = hostPorts.map(Utils.parseHostPort(_)._1)
43+
val uncachedHosts = hosts.filterNot(hostToRackCache.containsKey(_))
44+
if (uncachedHosts.nonEmpty) {
45+
val resolvedHosts = resolver.resolve(uncachedHosts).map { node =>
46+
Option(node.getNetworkLocation)
47+
}
48+
uncachedHosts.zip(resolvedHosts).map(h => hostToRackCache.put(h._1, h._2))
4449
}
45-
uncachedHosts.zip(resolvedHosts).map(h => hostToRackCache.put(h._1, h._2))
50+
51+
hosts.map(hostToRackCache.get(_))
52+
} else {
53+
hostPorts.map(_ => None)
4654
}
55+
}
4756

48-
hosts.map(hostToRackCache.get(_))
57+
override def getRackForHost(hostPort: String): Option[String] = {
58+
if (sc.conf.get(RESOLVE_RACK_ENABLED)) {
59+
val host = Utils.parseHostPort(hostPort)._1
60+
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
61+
} else {
62+
None
63+
}
4964
}
5065
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster
18+
19+
import java.util.concurrent.atomic.AtomicInteger
20+
21+
import org.apache.hadoop.net.NetworkTopology
22+
import org.mockito.Mockito.doNothing
23+
import org.scalatestplus.mockito.MockitoSugar
24+
25+
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
26+
import org.apache.spark.deploy.yarn.config.RESOLVE_RACK_ENABLED
27+
import org.apache.spark.scheduler.{DAGScheduler, WorkerOffer}
28+
29+
class YarnClusterSchedulerSuite extends SparkFunSuite with MockitoSugar {
30+
31+
test("rack resolve is disabled by default on resourceOffers") {
32+
Seq(None, Some(false), Some(true)).foreach { resolveRack =>
33+
val conf = new SparkConf().setMaster("local").setAppName("YarnClusterSchedulerSuite")
34+
resolveRack.foreach(enabled => conf.set(RESOLVE_RACK_ENABLED, enabled))
35+
val sc = new SparkContext(conf)
36+
val clusterScheduler = new YarnClusterScheduler(sc)
37+
val executor = "executor0"
38+
val host = "host0"
39+
val dagScheduler = mock[DAGScheduler]
40+
clusterScheduler.setDAGScheduler(dagScheduler)
41+
doNothing().when(dagScheduler).executorAdded(executor, host)
42+
clusterScheduler.resourceOffers(IndexedSeq(WorkerOffer(executor, host, new AtomicInteger(1))))
43+
if (conf.get(RESOLVE_RACK_ENABLED)) {
44+
assert(clusterScheduler.hasHostAliveOnRack(NetworkTopology.DEFAULT_RACK))
45+
} else {
46+
assert(!clusterScheduler.hasHostAliveOnRack(NetworkTopology.DEFAULT_RACK))
47+
}
48+
sc.stop()
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)