Skip to content

Commit a6460d4

Browse files
committed
Merge github.com:apache/spark into cleanup
Conflicts: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
2 parents 762a4d8 + 33e6361 commit a6460d4

File tree

262 files changed

+7292
-1680
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

262 files changed

+7292
-1680
lines changed

bin/pyspark

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ if [ -n "$IPYTHON_OPTS" ]; then
5555
IPYTHON=1
5656
fi
5757

58-
if [[ "$IPYTHON" = "1" ]] ; then
58+
# Only use ipython if no command line arguments were provided [SPARK-1134]
59+
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
5960
exec ipython $IPYTHON_OPTS
6061
else
6162
exec "$PYSPARK_PYTHON" "$@"

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
151151
<version>3.2.6</version>
152152
<!-- see also exclusion for lift-json; this is necessary since it depends on
153-
scala-library and scalap 2.10.0, but we use 2.10.3, and only override
153+
scala-library and scalap 2.10.0, but we use 2.10.4, and only override
154154
scala-library -->
155155
<exclusions>
156156
<exclusion>

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
128128
*/
129129
object Client {
130130
def main(args: Array[String]) {
131+
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
132+
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
133+
131134
val conf = new SparkConf()
132135
val driverArgs = new ClientArguments(args)
133136

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.io.File
20+
import java.io.{PrintStream, File}
2121
import java.net.URL
2222

2323
import org.apache.spark.executor.ExecutorURLClassLoader
@@ -32,38 +32,51 @@ import scala.collection.mutable.Map
3232
* modes that Spark supports.
3333
*/
3434
object SparkSubmit {
35-
val YARN = 1
36-
val STANDALONE = 2
37-
val MESOS = 4
38-
val LOCAL = 8
39-
val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
35+
private val YARN = 1
36+
private val STANDALONE = 2
37+
private val MESOS = 4
38+
private val LOCAL = 8
39+
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
4040

41-
var clusterManager: Int = LOCAL
41+
private var clusterManager: Int = LOCAL
4242

4343
def main(args: Array[String]) {
4444
val appArgs = new SparkSubmitArguments(args)
45+
if (appArgs.verbose) {
46+
printStream.println(appArgs)
47+
}
4548
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
46-
launch(childArgs, classpath, sysProps, mainClass)
49+
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
4750
}
4851

52+
// Exposed for testing
53+
private[spark] var printStream: PrintStream = System.err
54+
private[spark] var exitFn: () => Unit = () => System.exit(-1)
55+
56+
private[spark] def printErrorAndExit(str: String) = {
57+
printStream.println("error: " + str)
58+
printStream.println("run with --help for more information or --verbose for debugging output")
59+
exitFn()
60+
}
61+
private[spark] def printWarning(str: String) = printStream.println("warning: " + str)
62+
4963
/**
5064
* @return
5165
* a tuple containing the arguments for the child, a list of classpath
5266
* entries for the child, and the main class for the child
5367
*/
54-
def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
68+
private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
5569
ArrayBuffer[String], Map[String, String], String) = {
56-
if (appArgs.master.startsWith("yarn")) {
70+
if (appArgs.master.startsWith("local")) {
71+
clusterManager = LOCAL
72+
} else if (appArgs.master.startsWith("yarn")) {
5773
clusterManager = YARN
5874
} else if (appArgs.master.startsWith("spark")) {
5975
clusterManager = STANDALONE
6076
} else if (appArgs.master.startsWith("mesos")) {
6177
clusterManager = MESOS
62-
} else if (appArgs.master.startsWith("local")) {
63-
clusterManager = LOCAL
6478
} else {
65-
System.err.println("master must start with yarn, mesos, spark, or local")
66-
System.exit(1)
79+
printErrorAndExit("master must start with yarn, mesos, spark, or local")
6780
}
6881

6982
// Because "yarn-standalone" and "yarn-client" encapsulate both the master
@@ -73,12 +86,10 @@ object SparkSubmit {
7386
appArgs.deployMode = "cluster"
7487
}
7588
if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
76-
System.err.println("Deploy mode \"cluster\" and master \"yarn-client\" are at odds")
77-
System.exit(1)
89+
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
7890
}
7991
if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
80-
System.err.println("Deploy mode \"client\" and master \"yarn-standalone\" are at odds")
81-
System.exit(1)
92+
printErrorAndExit("Deploy mode \"client\" and master \"yarn-standalone\" are not compatible")
8293
}
8394
if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
8495
appArgs.master = "yarn-standalone"
@@ -95,8 +106,7 @@ object SparkSubmit {
95106
var childMainClass = ""
96107

97108
if (clusterManager == MESOS && deployOnCluster) {
98-
System.err.println("Mesos does not support running the driver on the cluster")
99-
System.exit(1)
109+
printErrorAndExit("Mesos does not support running the driver on the cluster")
100110
}
101111

102112
if (!deployOnCluster) {
@@ -174,8 +184,17 @@ object SparkSubmit {
174184
(childArgs, childClasspath, sysProps, childMainClass)
175185
}
176186

177-
def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
178-
sysProps: Map[String, String], childMainClass: String) {
187+
private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
188+
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
189+
190+
if (verbose) {
191+
System.err.println(s"Main class:\n$childMainClass")
192+
System.err.println(s"Arguments:\n${childArgs.mkString("\n")}")
193+
System.err.println(s"System properties:\n${sysProps.mkString("\n")}")
194+
System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
195+
System.err.println("\n")
196+
}
197+
179198
val loader = new ExecutorURLClassLoader(new Array[URL](0),
180199
Thread.currentThread.getContextClassLoader)
181200
Thread.currentThread.setContextClassLoader(loader)
@@ -193,10 +212,10 @@ object SparkSubmit {
193212
mainMethod.invoke(null, childArgs.toArray)
194213
}
195214

196-
def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
215+
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
197216
val localJarFile = new File(localJar)
198217
if (!localJarFile.exists()) {
199-
System.err.println("Jar does not exist: " + localJar + ". Skipping.")
218+
printWarning(s"Jar $localJar does not exist, skipping.")
200219
}
201220

202221
val url = localJarFile.getAbsoluteFile.toURI.toURL

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,45 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
4040
var name: String = null
4141
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
4242
var jars: String = null
43+
var verbose: Boolean = false
4344

4445
loadEnvVars()
45-
parseArgs(args.toList)
46-
47-
def loadEnvVars() {
48-
master = System.getenv("MASTER")
49-
deployMode = System.getenv("DEPLOY_MODE")
46+
parseOpts(args.toList)
47+
48+
// Sanity checks
49+
if (args.length == 0) printUsageAndExit(-1)
50+
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
51+
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
52+
53+
override def toString = {
54+
s"""Parsed arguments:
55+
| master $master
56+
| deployMode $deployMode
57+
| executorMemory $executorMemory
58+
| executorCores $executorCores
59+
| totalExecutorCores $totalExecutorCores
60+
| driverMemory $driverMemory
61+
| drivercores $driverCores
62+
| supervise $supervise
63+
| queue $queue
64+
| numExecutors $numExecutors
65+
| files $files
66+
| archives $archives
67+
| mainClass $mainClass
68+
| primaryResource $primaryResource
69+
| name $name
70+
| childArgs [${childArgs.mkString(" ")}]
71+
| jars $jars
72+
| verbose $verbose
73+
""".stripMargin
5074
}
5175

52-
def parseArgs(args: List[String]) {
53-
if (args.size == 0) {
54-
printUsageAndExit(1)
55-
System.exit(1)
56-
}
57-
primaryResource = args(0)
58-
parseOpts(args.tail)
76+
private def loadEnvVars() {
77+
Option(System.getenv("MASTER")).map(master = _)
78+
Option(System.getenv("DEPLOY_MODE")).map(deployMode = _)
5979
}
6080

61-
def parseOpts(opts: List[String]): Unit = opts match {
81+
private def parseOpts(opts: List[String]): Unit = opts match {
6282
case ("--name") :: value :: tail =>
6383
name = value
6484
parseOpts(tail)
@@ -73,8 +93,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
7393

7494
case ("--deploy-mode") :: value :: tail =>
7595
if (value != "client" && value != "cluster") {
76-
System.err.println("--deploy-mode must be either \"client\" or \"cluster\"")
77-
System.exit(1)
96+
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
7897
}
7998
deployMode = value
8099
parseOpts(tail)
@@ -130,17 +149,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
130149
case ("--help" | "-h") :: tail =>
131150
printUsageAndExit(0)
132151

133-
case Nil =>
152+
case ("--verbose" | "-v") :: tail =>
153+
verbose = true
154+
parseOpts(tail)
134155

135-
case _ =>
136-
printUsageAndExit(1, opts)
156+
case value :: tail =>
157+
if (primaryResource != null) {
158+
val error = s"Found two conflicting resources, $value and $primaryResource." +
159+
" Expecting only one resource."
160+
SparkSubmit.printErrorAndExit(error)
161+
}
162+
primaryResource = value
163+
parseOpts(tail)
164+
165+
case Nil =>
137166
}
138167

139-
def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
168+
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
169+
val outStream = SparkSubmit.printStream
140170
if (unknownParam != null) {
141-
System.err.println("Unknown/unsupported param " + unknownParam)
171+
outStream.println("Unknown/unsupported param " + unknownParam)
142172
}
143-
System.err.println(
173+
outStream.println(
144174
"""Usage: spark-submit <primary binary> [options]
145175
|Options:
146176
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
@@ -171,6 +201,6 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
171201
| --archives ARCHIVES Comma separated list of archives to be extracted into the
172202
| working dir of each executor.""".stripMargin
173203
)
174-
System.exit(exitCode)
204+
SparkSubmit.exitFn()
175205
}
176206
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class DAGScheduler(
8484
private[scheduler] val stageIdToJobIds = new HashMap[Int, HashSet[Int]]
8585
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
8686
private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
87-
private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
87+
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
8888
private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
8989
private[spark] val stageToInfos = new HashMap[Stage, StageInfo]
9090

@@ -536,7 +536,7 @@ class DAGScheduler(
536536
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
537537
runLocally(job)
538538
} else {
539-
stageIdToActiveJob(jobId) = job
539+
jobIdToActiveJob(jobId) = job
540540
activeJobs += job
541541
resultStageToJob(finalStage) = job
542542
listenerBus.post(
@@ -559,7 +559,7 @@ class DAGScheduler(
559559
// Cancel all running jobs.
560560
runningStages.map(_.jobId).foreach(handleJobCancellation)
561561
activeJobs.clear() // These should already be empty by this point,
562-
stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
562+
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
563563

564564
case ExecutorAdded(execId, host) =>
565565
handleExecutorAdded(execId, host)
@@ -569,7 +569,6 @@ class DAGScheduler(
569569

570570
case BeginEvent(task, taskInfo) =>
571571
for (
572-
job <- stageIdToActiveJob.get(task.stageId);
573572
stage <- stageIdToStage.get(task.stageId);
574573
stageInfo <- stageToInfos.get(stage)
575574
) {
@@ -697,7 +696,7 @@ class DAGScheduler(
697696
private def activeJobForStage(stage: Stage): Option[Int] = {
698697
if (stageIdToJobIds.contains(stage.id)) {
699698
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
700-
jobsThatUseStage.find(stageIdToActiveJob.contains)
699+
jobsThatUseStage.find(jobIdToActiveJob.contains)
701700
} else {
702701
None
703702
}
@@ -750,8 +749,8 @@ class DAGScheduler(
750749
}
751750
}
752751

753-
val properties = if (stageIdToActiveJob.contains(jobId)) {
754-
stageIdToActiveJob(stage.jobId).properties
752+
val properties = if (jobIdToActiveJob.contains(jobId)) {
753+
jobIdToActiveJob(stage.jobId).properties
755754
} else {
756755
// this stage will be assigned to "default" pool
757756
null
@@ -827,7 +826,7 @@ class DAGScheduler(
827826
job.numFinished += 1
828827
// If the whole job has finished, remove it
829828
if (job.numFinished == job.numPartitions) {
830-
stageIdToActiveJob -= stage.jobId
829+
jobIdToActiveJob -= stage.jobId
831830
activeJobs -= job
832831
resultStageToJob -= stage
833832
markStageAsFinished(stage)
@@ -986,11 +985,11 @@ class DAGScheduler(
986985
val independentStages = removeJobAndIndependentStages(jobId)
987986
independentStages.foreach(taskScheduler.cancelTasks)
988987
val error = new SparkException("Job %d cancelled".format(jobId))
989-
val job = stageIdToActiveJob(jobId)
988+
val job = jobIdToActiveJob(jobId)
990989
job.listener.jobFailed(error)
991990
jobIdToStageIds -= jobId
992991
activeJobs -= job
993-
stageIdToActiveJob -= jobId
992+
jobIdToActiveJob -= jobId
994993
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
995994
}
996995
}
@@ -1011,7 +1010,7 @@ class DAGScheduler(
10111010
val error = new SparkException("Job aborted: " + reason)
10121011
job.listener.jobFailed(error)
10131012
jobIdToStageIdsRemove(job.jobId)
1014-
stageIdToActiveJob -= resultStage.jobId
1013+
jobIdToActiveJob -= resultStage.jobId
10151014
activeJobs -= job
10161015
resultStageToJob -= resultStage
10171016
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
package org.apache.spark.ui
1919

2020
import java.net.{InetSocketAddress, URL}
21+
import javax.servlet.DispatcherType
2122
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
2223

2324
import scala.annotation.tailrec
2425
import scala.util.{Failure, Success, Try}
2526
import scala.xml.Node
2627

27-
import org.eclipse.jetty.server.{DispatcherType, Server}
28+
import org.eclipse.jetty.server.Server
2829
import org.eclipse.jetty.server.handler._
2930
import org.eclipse.jetty.servlet._
3031
import org.eclipse.jetty.util.thread.QueuedThreadPool

core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListe
8484

8585
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
8686
val rddInfo = stageSubmitted.stageInfo.rddInfo
87-
_rddInfoMap(rddInfo.id) = rddInfo
87+
_rddInfoMap.getOrElseUpdate(rddInfo.id, rddInfo)
8888
}
8989

9090
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {

0 commit comments

Comments
 (0)