Skip to content

Commit e230ef9

Browse files
committed
Merge remote-tracking branch 'apache/master' into config-cleanup
2 parents a374369 + 82eadc3 commit e230ef9

File tree

20 files changed

+544
-409
lines changed

20 files changed

+544
-409
lines changed

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
5959
}
6060

6161
def addFileToDir(file: File, dir: File) : String = {
62+
// Check whether the file is a directory. If it is, throw a more meaningful exception.
63+
// If we don't catch this, Guava throws a very confusing error message:
64+
// java.io.FileNotFoundException: [file] (No such file or directory)
65+
// even though the directory ([file]) exists.
66+
if (file.isDirectory) {
67+
throw new IllegalArgumentException(s"$file cannot be a directory.")
68+
}
6269
Files.copy(file, new File(dir, file.getName))
6370
dir + "/" + file.getName
6471
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -794,7 +794,7 @@ class SparkContext(config: SparkConf) extends Logging {
794794
addedFiles(key) = System.currentTimeMillis
795795

796796
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
797-
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
797+
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager)
798798

799799
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
800800
postEnvironmentUpdate()
@@ -932,13 +932,12 @@ class SparkContext(config: SparkConf) extends Logging {
932932
try {
933933
env.httpFileServer.addJar(new File(fileName))
934934
} catch {
935-
case e: Exception => {
935+
case e: Exception =>
936936
// For now just log an error but allow to go through so spark examples work.
937937
// The spark examples don't really need the jar distributed since its also
938938
// the app jar.
939939
logError("Error adding jar (" + e + "), was the --addJars option used?")
940940
null
941-
}
942941
}
943942
} else {
944943
env.httpFileServer.addJar(new File(uri.getPath))

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,14 @@ private[spark] class Worker(
317317
state match {
318318
case DriverState.ERROR =>
319319
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
320+
case DriverState.FAILED =>
321+
logWarning(s"Driver $driverId exited with failure")
320322
case DriverState.FINISHED =>
321323
logInfo(s"Driver $driverId exited successfully")
322324
case DriverState.KILLED =>
323325
logInfo(s"Driver $driverId was killed by user")
326+
case _ =>
327+
logDebug(s"Driver $driverId changed state to $state")
324328
}
325329
masterLock.synchronized {
326330
master ! DriverStateChanged(driverId, state, exception)

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
9393
implicit val futureExecContext = ExecutionContext.fromExecutor(
9494
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
9595

96-
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
96+
@volatile
97+
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null
9798

9899
private val authEnabled = securityManager.isAuthenticationEnabled()
99100

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.io.NotSerializableException
20+
import java.io.{NotSerializableException, PrintWriter, StringWriter}
2121
import java.util.Properties
2222
import java.util.concurrent.atomic.AtomicInteger
2323

@@ -580,6 +580,10 @@ class DAGScheduler(
580580
case e: Exception =>
581581
jobResult = JobFailed(e)
582582
job.listener.jobFailed(e)
583+
case oom: OutOfMemoryError =>
584+
val exception = new SparkException("Local job aborted due to out of memory error", oom)
585+
jobResult = JobFailed(exception)
586+
job.listener.jobFailed(exception)
583587
} finally {
584588
val s = job.finalStage
585589
stageIdToJobIds -= s.id // clean up data structures that were populated for a local job,

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ private[spark] class BlockManager(
772772
/**
773773
* Replicate block to another node.
774774
*/
775-
var cachedPeers: Seq[BlockManagerId] = null
775+
@volatile var cachedPeers: Seq[BlockManagerId] = null
776776
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
777777
val tLevel = StorageLevel(
778778
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,20 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
256256
assertDataStructuresEmpty
257257
}
258258

259+
test("local job oom") {
260+
val rdd = new MyRDD(sc, Nil) {
261+
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
262+
throw new java.lang.OutOfMemoryError("test local job oom")
263+
override def getPartitions = Array( new Partition { override def index = 0 } )
264+
override def getPreferredLocations(split: Partition) = Nil
265+
override def toString = "DAGSchedulerSuite Local RDD"
266+
}
267+
val jobId = scheduler.nextJobId.getAndIncrement()
268+
runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
269+
assert(results.size == 0)
270+
assertDataStructuresEmpty
271+
}
272+
259273
test("run trivial job w/ dependency") {
260274
val baseRdd = makeRdd(1, Nil)
261275
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))

0 commit comments

Comments
 (0)