Skip to content

Commit 28f1491

Browse files
committed
[SPARK-13232][YARN] Fix executor node label
1 parent 140ddef commit 28f1491

File tree

2 files changed

+29
-2
lines changed

2 files changed

+29
-2
lines changed

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,14 @@ private[yarn] class YarnAllocator(
307307
nodes: Array[String],
308308
racks: Array[String]): ContainerRequest = {
309309
nodeLabelConstructor.map { constructor =>
310+
val labelExp = if ((racks != null && (!racks.isEmpty))
311+
|| (nodes != null && (!nodes.isEmpty))) {
312+
null
313+
} else {
314+
labelExpression.orNull
315+
}
310316
constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
311-
labelExpression.orNull)
317+
labelExp)
312318
}.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY))
313319
}
314320

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,17 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
8787
override def equals(other: Any): Boolean = false
8888
}
8989

90-
def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
90+
def createAllocator(
91+
maxExecutors: Int = 5,
92+
executorNodeLabel: Option[String] = None): YarnAllocator = {
9193
val args = Array(
9294
"--executor-cores", "5",
9395
"--executor-memory", "2048",
9496
"--jar", "somejar.jar",
9597
"--class", "SomeClass")
9698
val sparkConfClone = sparkConf.clone()
9799
sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
100+
executorNodeLabel.foreach(sparkConfClone.set("spark.yarn.executor.nodeLabelExpression", _))
98101
new YarnAllocator(
99102
"not used",
100103
mock(classOf[RpcEndpointRef]),
@@ -272,4 +275,22 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
272275
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
273276
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
274277
}
278+
279+
test("request executors with locality") {
280+
val handler = createAllocator(1, Some("label"))
281+
handler.updateResourceRequests()
282+
handler.getNumExecutorsRunning should be (0)
283+
handler.getPendingAllocate.size should be (1)
284+
285+
handler.requestTotalExecutorsWithPreferredLocalities(3, 20, Map(("host1", 10), ("host2", 20)))
286+
handler.updateResourceRequests()
287+
handler.getPendingAllocate.size should be (3)
288+
289+
val container = createContainer("host1")
290+
handler.handleAllocatedContainers(Array(container))
291+
292+
handler.getNumExecutorsRunning should be (1)
293+
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
294+
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
295+
}
275296
}

0 commit comments

Comments
 (0)