Skip to content

Commit 0af9e22

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-5231
2 parents da8bd14 + 96c2c71 commit 0af9e22

39 files changed

+1241
-458
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark;
19+
20+
import org.apache.spark.scheduler.SparkListener;
21+
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
22+
import org.apache.spark.scheduler.SparkListenerApplicationStart;
23+
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
24+
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
25+
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
26+
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
27+
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
28+
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
29+
import org.apache.spark.scheduler.SparkListenerJobEnd;
30+
import org.apache.spark.scheduler.SparkListenerJobStart;
31+
import org.apache.spark.scheduler.SparkListenerStageCompleted;
32+
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
33+
import org.apache.spark.scheduler.SparkListenerTaskEnd;
34+
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
35+
import org.apache.spark.scheduler.SparkListenerTaskStart;
36+
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
37+
38+
/**
39+
* Java clients should extend this class instead of implementing
40+
* SparkListener directly. This is to prevent java clients
41+
* from breaking when new events are added to the SparkListener
42+
* trait.
43+
*
44+
* This is a concrete class instead of abstract to enforce
45+
* new events get added to both the SparkListener and this adapter
46+
* in lockstep.
47+
*/
48+
public class JavaSparkListener implements SparkListener {
49+
50+
@Override
51+
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
52+
53+
@Override
54+
public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
55+
56+
@Override
57+
public void onTaskStart(SparkListenerTaskStart taskStart) { }
58+
59+
@Override
60+
public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
61+
62+
@Override
63+
public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }
64+
65+
@Override
66+
public void onJobStart(SparkListenerJobStart jobStart) { }
67+
68+
@Override
69+
public void onJobEnd(SparkListenerJobEnd jobEnd) { }
70+
71+
@Override
72+
public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
73+
74+
@Override
75+
public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
76+
77+
@Override
78+
public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
79+
80+
@Override
81+
public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
82+
83+
@Override
84+
public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
85+
86+
@Override
87+
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
88+
89+
@Override
90+
public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
91+
92+
@Override
93+
public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }
94+
95+
@Override
96+
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
97+
}

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ private[spark] class ApplicationInfo(
3838
extends Serializable {
3939

4040
@transient var state: ApplicationState.Value = _
41-
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
42-
@transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
41+
@transient var executors: mutable.HashMap[Int, ExecutorDesc] = _
42+
@transient var removedExecutors: ArrayBuffer[ExecutorDesc] = _
4343
@transient var coresGranted: Int = _
4444
@transient var endTime: Long = _
4545
@transient var appSource: ApplicationSource = _
@@ -55,12 +55,12 @@ private[spark] class ApplicationInfo(
5555

5656
private def init() {
5757
state = ApplicationState.WAITING
58-
executors = new mutable.HashMap[Int, ExecutorInfo]
58+
executors = new mutable.HashMap[Int, ExecutorDesc]
5959
coresGranted = 0
6060
endTime = -1L
6161
appSource = new ApplicationSource(this)
6262
nextExecutorId = 0
63-
removedExecutors = new ArrayBuffer[ExecutorInfo]
63+
removedExecutors = new ArrayBuffer[ExecutorDesc]
6464
}
6565

6666
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -75,14 +75,14 @@ private[spark] class ApplicationInfo(
7575
}
7676
}
7777

78-
def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
79-
val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
78+
def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = {
79+
val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
8080
executors(exec.id) = exec
8181
coresGranted += cores
8282
exec
8383
}
8484

85-
def removeExecutor(exec: ExecutorInfo) {
85+
def removeExecutor(exec: ExecutorDesc) {
8686
if (executors.contains(exec.id)) {
8787
removedExecutors += executors(exec.id)
8888
executors -= exec.id

core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala renamed to core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.master
1919

2020
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
2121

22-
private[spark] class ExecutorInfo(
22+
private[spark] class ExecutorDesc(
2323
val id: Int,
2424
val application: ApplicationInfo,
2525
val worker: WorkerInfo,
@@ -37,7 +37,7 @@ private[spark] class ExecutorInfo(
3737

3838
override def equals(other: Any): Boolean = {
3939
other match {
40-
case info: ExecutorInfo =>
40+
case info: ExecutorDesc =>
4141
fullId == info.fullId &&
4242
worker.id == info.worker.id &&
4343
cores == info.cores &&

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ private[spark] class Master(
581581
}
582582
}
583583

584-
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
584+
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
585585
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
586586
worker.addExecutor(exec)
587587
worker.actor ! LaunchExecutor(masterUrl,

core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private[spark] class WorkerInfo(
3838
Utils.checkHost(host, "Expected hostname")
3939
assert (port > 0)
4040

41-
@transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info
41+
@transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
4242
@transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info
4343
@transient var state: WorkerState.Value = _
4444
@transient var coresUsed: Int = _
@@ -70,13 +70,13 @@ private[spark] class WorkerInfo(
7070
host + ":" + port
7171
}
7272

73-
def addExecutor(exec: ExecutorInfo) {
73+
def addExecutor(exec: ExecutorDesc) {
7474
executors(exec.fullId) = exec
7575
coresUsed += exec.cores
7676
memoryUsed += exec.memory
7777
}
7878

79-
def removeExecutor(exec: ExecutorInfo) {
79+
def removeExecutor(exec: ExecutorDesc) {
8080
if (executors.contains(exec.fullId)) {
8181
executors -= exec.fullId
8282
coresUsed -= exec.cores

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.json4s.JValue
2727

2828
import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
2929
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
30-
import org.apache.spark.deploy.master.ExecutorInfo
30+
import org.apache.spark.deploy.master.ExecutorDesc
3131
import org.apache.spark.ui.{UIUtils, WebUIPage}
3232
import org.apache.spark.util.Utils
3333

@@ -109,7 +109,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
109109
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
110110
}
111111

112-
private def executorRow(executor: ExecutorInfo): Seq[Node] = {
112+
private def executorRow(executor: ExecutorDesc): Seq[Node] = {
113113
<tr>
114114
<td>{executor.id}</td>
115115
<td>

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ private[spark] class EventLoggingListener(
168168
logEvent(event, flushLogger = true)
169169
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
170170
logEvent(event, flushLogger = true)
171+
override def onExecutorAdded(event: SparkListenerExecutorAdded) =
172+
logEvent(event, flushLogger = true)
173+
override def onExecutorRemoved(event: SparkListenerExecutorRemoved) =
174+
logEvent(event, flushLogger = true)
171175

172176
// No-op because logging every update would be overkill
173177
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.mutable
2525
import org.apache.spark.{Logging, TaskEndReason}
2626
import org.apache.spark.annotation.DeveloperApi
2727
import org.apache.spark.executor.TaskMetrics
28+
import org.apache.spark.scheduler.cluster.ExecutorInfo
2829
import org.apache.spark.storage.BlockManagerId
2930
import org.apache.spark.util.{Distribution, Utils}
3031

@@ -89,6 +90,14 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
8990
@DeveloperApi
9091
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
9192

93+
@DeveloperApi
94+
case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo)
95+
extends SparkListenerEvent
96+
97+
@DeveloperApi
98+
case class SparkListenerExecutorRemoved(executorId: String)
99+
extends SparkListenerEvent
100+
92101
/**
93102
* Periodic updates from executors.
94103
* @param execId executor id
@@ -114,7 +123,8 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent
114123
/**
115124
* :: DeveloperApi ::
116125
* Interface for listening to events from the Spark scheduler. Note that this is an internal
117-
* interface which might change in different Spark releases.
126+
* interface which might change in different Spark releases. Java clients should extend
127+
* {@link JavaSparkListener}
118128
*/
119129
@DeveloperApi
120130
trait SparkListener {
@@ -188,6 +198,16 @@ trait SparkListener {
188198
* Called when the driver receives task metrics from an executor in a heartbeat.
189199
*/
190200
def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
201+
202+
/**
203+
* Called when the driver registers a new executor.
204+
*/
205+
def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { }
206+
207+
/**
208+
* Called when the driver removes an executor.
209+
*/
210+
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
191211
}
192212

193213
/**

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ private[spark] trait SparkListenerBus extends Logging {
7070
foreachListener(_.onApplicationEnd(applicationEnd))
7171
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
7272
foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
73+
case executorAdded: SparkListenerExecutorAdded =>
74+
foreachListener(_.onExecutorAdded(executorAdded))
75+
case executorRemoved: SparkListenerExecutorRemoved =>
76+
foreachListener(_.onExecutorRemoved(executorRemoved))
7377
case SparkListenerShutdown =>
7478
}
7579
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import akka.pattern.ask
2828
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
2929

3030
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
31-
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
31+
import org.apache.spark.scheduler._
3232
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3333
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
3434

@@ -66,6 +66,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
6666
// Number of executors requested from the cluster manager that have not registered yet
6767
private var numPendingExecutors = 0
6868

69+
private val listenerBus = scheduler.sc.listenerBus
70+
6971
// Executors we have requested the cluster manager to kill that have not died yet
7072
private val executorsPendingToRemove = new HashSet[String]
7173

@@ -106,6 +108,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
106108
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
107109
}
108110
}
111+
listenerBus.post(SparkListenerExecutorAdded(executorId, data))
109112
makeOffers()
110113
}
111114

@@ -213,6 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
213216
totalCoreCount.addAndGet(-executorInfo.totalCores)
214217
totalRegisteredExecutors.addAndGet(-1)
215218
scheduler.executorLost(executorId, SlaveLost(reason))
219+
listenerBus.post(SparkListenerExecutorRemoved(executorId))
216220
case None => logError(s"Asked to remove non-existent executor $executorId")
217221
}
218222
}

0 commit comments

Comments
 (0)