Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2669f57
[maven-release-plugin] prepare for next development iteration
tdas May 26, 2014
7a83163
[SPARK-1914] [SQL] Simplify CountFunction not to traverse to evaluate…
ueshin May 26, 2014
f09cb85
SPARK-1925: Replace '&' with '&&'
zsxwing May 26, 2014
f268548
[SPARK-1931] Reconstruct routing tables in Graph.partitionBy
ankurdave May 26, 2014
9bcd999
Updated dev Python scripts to make them PEP8 compliant.
rxin May 27, 2014
fcb3750
SPARK-1933: Throw a more meaningful exception when a directory is pas…
rxin May 27, 2014
214f90e
SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers
zsxwing May 27, 2014
30be37c
bugfix worker DriverStateChanged state should match DriverState.FAILED
lianhuiwang May 27, 2014
f539963
[SPARK-1926] [SQL] Nullability of Max/Min/First should be true.
ueshin May 27, 2014
50e234b
[SPARK-1915] [SQL] AverageFunction should not count if the evaluated …
ueshin May 27, 2014
5d63825
[SQL] SPARK-1922
May 27, 2014
24a1cac
[SPARK-1938] [SQL] ApproxCountDistinctMergeFunction should return Int…
ueshin May 28, 2014
3669bb8
Fix doc about NetworkWordCount/JavaNetworkWordCount usage of spark st…
jmu May 28, 2014
032493e
Organize configuration docs
pwendell May 28, 2014
0b769b7
Spark 1916
May 28, 2014
386fd83
[SPARK-1712]: TaskDescription instance is too big causes Spark to hang
witgo May 28, 2014
7179180
Added doctest and method description in context.py
jyotiska May 29, 2014
8bb9390
SPARK-1935: Explicitly add commons-codec 1.5 as a dependency.
yhuai May 29, 2014
0f56aad
[SPARK-1368][SQL] Optimized HiveTableScan
liancheng May 29, 2014
80721fb
[SPARK-1566] consolidate programming guide, and general doc updates
mateiz May 30, 2014
1696a44
[SPARK-1901] worker should make sure executor has exited before updat…
zhpengg May 30, 2014
c5803f0
Merge branch 'branch-1.0' of github.com:apache/spark into csd-1.0
markhamstra May 30, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-candidate-csd-1-SNAPSHOT</version>
<version>1.0.1-candidate-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-candidate-csd-1-SNAPSHOT</version>
<version>1.0.1-candidate-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-candidate-csd-1-SNAPSHOT</version>
<version>1.0.1-candidate-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
}

def addFileToDir(file: File, dir: File) : String = {
// Check whether the file is a directory. If it is, throw a more meaningful exception.
// If we don't catch this, Guava throws a very confusing error message:
// java.io.FileNotFoundException: [file] (No such file or directory)
// even though the directory ([file]) exists.
if (file.isDirectory) {
throw new IllegalArgumentException(s"$file cannot be a directory.")
}
Files.copy(file, new File(dir, file.getName))
dir + "/" + file.getName
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ class SparkContext(config: SparkConf) extends Logging {
addedFiles(key) = System.currentTimeMillis

// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
Expand Down Expand Up @@ -932,13 +932,12 @@ class SparkContext(config: SparkConf) extends Logging {
try {
env.httpFileServer.addJar(new File(fileName))
} catch {
case e: Exception => {
case e: Exception =>
// For now just log an error but allow to go through so spark examples work.
// The spark examples don't really need the jar distributed since its also
// the app jar.
logError("Error adding jar (" + e + "), was the --addJars option used?")
null
}
}
} else {
env.httpFileServer.addJar(new File(uri.getPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,23 @@ private[spark] class ExecutorRunner(
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess()
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}

private def killProcess() {
/**
* kill executor process, wait for exit and notify worker to update resource status
*
* @param message the exception message which caused the executor's death
*/
private def killProcess(message: Option[String]) {
if (process != null) {
logInfo("Killing process!")
process.destroy()
process.waitFor()
val exitCode = process.waitFor()
worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
}
}

Expand All @@ -82,7 +88,6 @@ private[spark] class ExecutorRunner(
workerThread.interrupt()
workerThread = null
state = ExecutorState.KILLED
worker ! ExecutorStateChanged(appId, execId, state, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
Expand Down Expand Up @@ -148,14 +153,13 @@ private[spark] class ExecutorRunner(
} catch {
case interrupted: InterruptedException => {
logInfo("Runner thread for executor " + fullId + " interrupted")
killProcess()
state = ExecutorState.KILLED
killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
killProcess()
state = ExecutorState.FAILED
val message = e.getClass + ": " + e.getMessage
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
killProcess(Some(e.toString))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,14 @@ private[spark] class Worker(
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
masterLock.synchronized {
master ! DriverStateChanged(driverId, state, exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{AkkaUtils, Utils}

private[spark] class CoarseGrainedExecutorBackend(
Expand Down Expand Up @@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Slave registration failed: " + message)
System.exit(1)

case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = SparkEnv.get.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
implicit val futureExecContext = ExecutionContext.fromExecutor(
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))

private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
@volatile
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null

private val authEnabled = securityManager.isAuthenticationEnabled()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
private[spark] object CoarseGrainedClusterMessages {

// Driver to executors
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}

/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
Expand All @@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
var totalCoreCount = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
Expand Down Expand Up @@ -140,8 +141,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - 1024) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
var msg = "Serialized task %s:%d was %d bytes which " +
"exceeds spark.akka.frameSize (%d bytes). " +
"Consider using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
taskSet.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/
var cachedPeers: Seq[BlockManagerId] = null
@volatile var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext}
import org.apache.spark.util.{SerializableBuffer, AkkaUtils}

import org.scalatest.FunSuite

class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext {

test("serialized task larger than akka frame size") {
val conf = new SparkConf
conf.set("spark.akka.frameSize","1")
conf.set("spark.default.parallelism","1")
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
val larger = sc.parallelize(Seq(buffer))
val thrown = intercept[SparkException] {
larger.collect()
}
assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
val smaller = sc.parallelize(1 to 4).collect()
assert(smaller.size === 4)
}

}
Loading