Skip to content

Commit 9922be0

Browse files
committed
Merge branch 'master' into stage_distributions
2 parents f5a5196 + ee11be2 commit 9922be0

File tree

238 files changed

+7332
-2744
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

238 files changed

+7332
-2744
lines changed

bin/spark-class

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -40,35 +40,46 @@ else
4040
fi
4141
fi
4242

43-
# Look for the launcher. In non-release mode, add the compiled classes directly to the classpath
44-
# instead of looking for a jar file.
45-
SPARK_LAUNCHER_CP=
46-
if [ -f $SPARK_HOME/RELEASE ]; then
47-
LAUNCHER_DIR="$SPARK_HOME/lib"
48-
num_jars="$(ls -1 "$LAUNCHER_DIR" | grep "^spark-launcher.*\.jar$" | wc -l)"
49-
if [ "$num_jars" -eq "0" -a -z "$SPARK_LAUNCHER_CP" ]; then
50-
echo "Failed to find Spark launcher in $LAUNCHER_DIR." 1>&2
51-
echo "You need to build Spark before running this program." 1>&2
52-
exit 1
53-
fi
43+
# Find assembly jar
44+
SPARK_ASSEMBLY_JAR=
45+
if [ -f "$SPARK_HOME/RELEASE" ]; then
46+
ASSEMBLY_DIR="$SPARK_HOME/lib"
47+
else
48+
ASSEMBLY_DIR="$SPARK_HOME/assembly/target/scala-$SPARK_SCALA_VERSION"
49+
fi
5450

55-
LAUNCHER_JARS="$(ls -1 "$LAUNCHER_DIR" | grep "^spark-launcher.*\.jar$" || true)"
56-
if [ "$num_jars" -gt "1" ]; then
57-
echo "Found multiple Spark launcher jars in $LAUNCHER_DIR:" 1>&2
58-
echo "$LAUNCHER_JARS" 1>&2
59-
echo "Please remove all but one jar." 1>&2
60-
exit 1
61-
fi
51+
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
52+
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" ]; then
53+
echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
54+
echo "You need to build Spark before running this program." 1>&2
55+
exit 1
56+
fi
57+
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
58+
if [ "$num_jars" -gt "1" ]; then
59+
echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
60+
echo "$ASSEMBLY_JARS" 1>&2
61+
echo "Please remove all but one jar." 1>&2
62+
exit 1
63+
fi
6264

63-
SPARK_LAUNCHER_CP="${LAUNCHER_DIR}/${LAUNCHER_JARS}"
65+
SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"
66+
67+
# Verify that versions of java used to build the jars and run Spark are compatible
68+
if [ -n "$JAVA_HOME" ]; then
69+
JAR_CMD="$JAVA_HOME/bin/jar"
6470
else
65-
LAUNCHER_DIR="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION"
66-
if [ ! -d "$LAUNCHER_DIR/classes" ]; then
67-
echo "Failed to find Spark launcher classes in $LAUNCHER_DIR." 1>&2
68-
echo "You need to build Spark before running this program." 1>&2
71+
JAR_CMD="jar"
72+
fi
73+
74+
if [ $(command -v "$JAR_CMD") ] ; then
75+
jar_error_check=$("$JAR_CMD" -tf "$SPARK_ASSEMBLY_JAR" nonexistent/class/path 2>&1)
76+
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
77+
echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
78+
echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
79+
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
80+
echo "or build Spark with Java 6." 1>&2
6981
exit 1
7082
fi
71-
SPARK_LAUNCHER_CP="$LAUNCHER_DIR/classes"
7283
fi
7384

7485
# The launcher library will print arguments separated by a NULL character, to allow arguments with
@@ -77,7 +88,7 @@ fi
7788
CMD=()
7889
while IFS= read -d '' -r ARG; do
7990
CMD+=("$ARG")
80-
done < <("$RUNNER" -cp "$SPARK_LAUNCHER_CP" org.apache.spark.launcher.Main "$@")
91+
done < <("$RUNNER" -cp "$SPARK_ASSEMBLY_JAR" org.apache.spark.launcher.Main "$@")
8192

8293
if [ "${CMD[0]}" = "usage" ]; then
8394
"${CMD[@]}"

bin/spark-class2.cmd

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,31 +29,20 @@ if "x%1"=="x" (
2929
exit /b 1
3030
)
3131

32-
set LAUNCHER_CP=0
33-
if exist %SPARK_HOME%\RELEASE goto find_release_launcher
32+
rem Find assembly jar
33+
set SPARK_ASSEMBLY_JAR=0
3434

35-
rem Look for the Spark launcher in both Scala build directories. The launcher doesn't use Scala so
36-
rem it doesn't really matter which one is picked up. Add the compiled classes directly to the
37-
rem classpath instead of looking for a jar file, since it's very common for people using sbt to use
38-
rem the "assembly" target instead of "package".
39-
set LAUNCHER_CLASSES=%SPARK_HOME%\launcher\target\scala-2.10\classes
40-
if exist %LAUNCHER_CLASSES% (
41-
set LAUNCHER_CP=%LAUNCHER_CLASSES%
35+
if exist "%SPARK_HOME%\RELEASE" (
36+
set ASSEMBLY_DIR=%SPARK_HOME%\lib
37+
) else (
38+
set ASSEMBLY_DIR=%SPARK_HOME%\assembly\target\scala-%SPARK_SCALA_VERSION%
4239
)
43-
set LAUNCHER_CLASSES=%SPARK_HOME%\launcher\target\scala-2.11\classes
44-
if exist %LAUNCHER_CLASSES% (
45-
set LAUNCHER_CP=%LAUNCHER_CLASSES%
46-
)
47-
goto check_launcher
4840

49-
:find_release_launcher
50-
for %%d in (%SPARK_HOME%\lib\spark-launcher*.jar) do (
51-
set LAUNCHER_CP=%%d
41+
for %%d in (%ASSEMBLY_DIR%\spark-assembly*hadoop*.jar) do (
42+
set SPARK_ASSEMBLY_JAR=%%d
5243
)
53-
54-
:check_launcher
55-
if "%LAUNCHER_CP%"=="0" (
56-
echo Failed to find Spark launcher JAR.
44+
if "%SPARK_ASSEMBLY_JAR%"=="0" (
45+
echo Failed to find Spark assembly JAR.
5746
echo You need to build Spark before running this program.
5847
exit /b 1
5948
)
@@ -64,7 +53,7 @@ if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
6453

6554
rem The launcher library prints the command to be executed in a single line suitable for being
6655
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
67-
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCHER_CP% org.apache.spark.launcher.Main %*"') do (
56+
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %SPARK_ASSEMBLY_JAR% org.apache.spark.launcher.Main %*"') do (
6857
set SPARK_CMD=%%i
6958
)
7059
%SPARK_CMD%

core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ $(function() {
3030

3131
stripeSummaryTable();
3232

33-
$("input:checkbox").click(function() {
33+
$('input[type="checkbox"]').click(function() {
3434
var column = "table ." + $(this).attr("name");
3535
$(column).toggle();
3636
stripeSummaryTable();
@@ -39,15 +39,15 @@ $(function() {
3939
$("#select-all-metrics").click(function() {
4040
if (this.checked) {
4141
// Toggle all un-checked options.
42-
$('input:checkbox:not(:checked)').trigger('click');
42+
$('input[type="checkbox"]:not(:checked)').trigger('click');
4343
} else {
4444
// Toggle all checked options.
45-
$('input:checkbox:checked').trigger('click');
45+
$('input[type="checkbox"]:checked').trigger('click');
4646
}
4747
});
4848

4949
// Trigger a click on the checkbox if a user clicks the label next to it.
5050
$("span.additional-metric-title").click(function() {
51-
$(this).parent().find('input:checkbox').trigger('click');
51+
$(this).parent().find('input[type="checkbox"]').trigger('click');
5252
});
5353
});

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
4949

5050
// executor ID -> timestamp of when the last heartbeat from this executor was received
5151
private val executorLastSeen = new mutable.HashMap[String, Long]
52-
53-
private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
54-
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000
55-
56-
private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
57-
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000
52+
53+
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
54+
// "milliseconds"
55+
private val executorTimeoutMs = sc.conf.getOption("spark.network.timeout").map(_.toLong * 1000).
56+
getOrElse(sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120000))
57+
58+
// "spark.network.timeoutInterval" uses "seconds", while
59+
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
60+
private val checkTimeoutIntervalMs =
61+
sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000).
62+
getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))
5863

5964
private var timeoutCheckingTask: Cancellable = null
6065

@@ -84,7 +89,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
8489
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
8590
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
8691
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
87-
"timed out after ${now - lastSeenMs} ms"))
92+
s"timed out after ${now - lastSeenMs} ms"))
8893
if (sc.supportDynamicAllocation) {
8994
sc.killExecutor(executorId)
9095
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ import org.apache.spark.metrics.MetricsSystem
3434
import org.apache.spark.network.BlockTransferService
3535
import org.apache.spark.network.netty.NettyBlockTransferService
3636
import org.apache.spark.network.nio.NioBlockTransferService
37+
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
38+
import org.apache.spark.rpc.akka.AkkaRpcEnv
3739
import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
38-
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorActor
40+
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
3941
import org.apache.spark.serializer.Serializer
4042
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
4143
import org.apache.spark.storage._
42-
import org.apache.spark.util.{AkkaUtils, Utils}
44+
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
4345

4446
/**
4547
* :: DeveloperApi ::
@@ -54,7 +56,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
5456
@DeveloperApi
5557
class SparkEnv (
5658
val executorId: String,
57-
val actorSystem: ActorSystem,
59+
private[spark] val rpcEnv: RpcEnv,
5860
val serializer: Serializer,
5961
val closureSerializer: Serializer,
6062
val cacheManager: CacheManager,
@@ -71,6 +73,9 @@ class SparkEnv (
7173
val outputCommitCoordinator: OutputCommitCoordinator,
7274
val conf: SparkConf) extends Logging {
7375

76+
// TODO Remove actorSystem
77+
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
78+
7479
private[spark] var isStopped = false
7580
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
7681

@@ -91,7 +96,8 @@ class SparkEnv (
9196
blockManager.master.stop()
9297
metricsSystem.stop()
9398
outputCommitCoordinator.stop()
94-
actorSystem.shutdown()
99+
rpcEnv.shutdown()
100+
95101
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
96102
// down, but let's call it anyway in case it gets fixed in a later release
97103
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
@@ -236,16 +242,15 @@ object SparkEnv extends Logging {
236242
val securityManager = new SecurityManager(conf)
237243

238244
// Create the ActorSystem for Akka and get the port it binds to.
239-
val (actorSystem, boundPort) = {
240-
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
241-
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
242-
}
245+
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
246+
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)
247+
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
243248

244249
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
245250
if (isDriver) {
246-
conf.set("spark.driver.port", boundPort.toString)
251+
conf.set("spark.driver.port", rpcEnv.address.port.toString)
247252
} else {
248-
conf.set("spark.executor.port", boundPort.toString)
253+
conf.set("spark.executor.port", rpcEnv.address.port.toString)
249254
}
250255

251256
// Create an instance of the class with the given name, possibly initializing it with our conf
@@ -290,6 +295,15 @@ object SparkEnv extends Logging {
290295
}
291296
}
292297

298+
def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = {
299+
if (isDriver) {
300+
logInfo("Registering " + name)
301+
rpcEnv.setupEndpoint(name, endpointCreator)
302+
} else {
303+
RpcUtils.makeDriverRef(name, conf, rpcEnv)
304+
}
305+
}
306+
293307
val mapOutputTracker = if (isDriver) {
294308
new MapOutputTrackerMaster(conf)
295309
} else {
@@ -377,13 +391,13 @@ object SparkEnv extends Logging {
377391
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
378392
new OutputCommitCoordinator(conf)
379393
}
380-
val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator",
381-
new OutputCommitCoordinatorActor(outputCommitCoordinator))
382-
outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor)
394+
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
395+
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
396+
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
383397

384398
val envInstance = new SparkEnv(
385399
executorId,
386-
actorSystem,
400+
rpcEnv,
387401
serializer,
388402
closureSerializer,
389403
cacheManager,

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapred._
2626
import org.apache.hadoop.fs.FileSystem
2727
import org.apache.hadoop.fs.Path
2828

29-
import org.apache.spark.executor.CommitDeniedException
3029
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3130
import org.apache.spark.rdd.HadoopRDD
3231

@@ -104,55 +103,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
104103
}
105104

106105
def commit() {
107-
val taCtxt = getTaskContext()
108-
val cmtr = getOutputCommitter()
109-
110-
// Called after we have decided to commit
111-
def performCommit(): Unit = {
112-
try {
113-
cmtr.commitTask(taCtxt)
114-
logInfo (s"$taID: Committed")
115-
} catch {
116-
case e: IOException =>
117-
logError("Error committing the output of task: " + taID.value, e)
118-
cmtr.abortTask(taCtxt)
119-
throw e
120-
}
121-
}
122-
123-
// First, check whether the task's output has already been committed by some other attempt
124-
if (cmtr.needsTaskCommit(taCtxt)) {
125-
// The task output needs to be committed, but we don't know whether some other task attempt
126-
// might be racing to commit the same output partition. Therefore, coordinate with the driver
127-
// in order to determine whether this attempt can commit (see SPARK-4879).
128-
val shouldCoordinateWithDriver: Boolean = {
129-
val sparkConf = SparkEnv.get.conf
130-
// We only need to coordinate with the driver if there are multiple concurrent task
131-
// attempts, which should only occur if speculation is enabled
132-
val speculationEnabled = sparkConf.getBoolean("spark.speculation", false)
133-
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
134-
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
135-
}
136-
if (shouldCoordinateWithDriver) {
137-
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
138-
val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID)
139-
if (canCommit) {
140-
performCommit()
141-
} else {
142-
val msg = s"$taID: Not committed because the driver did not authorize commit"
143-
logInfo(msg)
144-
// We need to abort the task so that the driver can reschedule new attempts, if necessary
145-
cmtr.abortTask(taCtxt)
146-
throw new CommitDeniedException(msg, jobID, splitID, attemptID)
147-
}
148-
} else {
149-
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
150-
performCommit()
151-
}
152-
} else {
153-
// Some other attempt committed the output, so we do nothing and signal success
154-
logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}")
155-
}
106+
SparkHadoopMapRedUtil.commitTask(
107+
getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
156108
}
157109

158110
def commitJob() {

core/src/main/scala/org/apache/spark/SparkStatusTracker.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
4545
*/
4646
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
4747
jobProgressListener.synchronized {
48-
val jobData = jobProgressListener.jobIdToData.valuesIterator
49-
jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
48+
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
5049
}
5150
}
5251

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ private[deploy] object DeployMessages {
101101
case class RegisterApplication(appDescription: ApplicationDescription)
102102
extends DeployMessage
103103

104+
case class UnregisterApplication(appId: String)
105+
104106
case class MasterChangeAcknowledged(appId: String)
105107

106108
// Master to AppClient

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ private[spark] class AppClient(
157157

158158
case StopAppClient =>
159159
markDead("Application has been stopped.")
160+
master ! UnregisterApplication(appId)
160161
sender ! true
161162
context.stop(self)
162163
}

0 commit comments

Comments
 (0)