Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 4 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ RELEASE
control
docs
fairscheduler.xml.template
spark-defaults.conf.template
log4j.properties
log4j.properties.template
metrics.properties.template
Expand Down Expand Up @@ -39,3 +40,6 @@ work
.*\.q
golden
test.out/*
.*iml
service.properties
db.lck
35 changes: 24 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,33 @@ guide, on the project webpage at <http://spark.apache.org/documentation.html>.
This README file only contains basic setup instructions.


## Building
## Building Spark

Spark requires Scala 2.10. The project is built using Simple Build Tool (SBT),
which can be obtained [here](http://www.scala-sbt.org). If SBT is installed we
will use the system version of sbt otherwise we will attempt to download it
automatically. To build Spark and its example programs, run:
Spark is built on Scala 2.10. To build Spark and its example programs, run:

./sbt/sbt assembly

Once you've built Spark, the easiest way to start using it is the shell:
## Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Or, for the Python API, the Python shell (`./bin/pyspark`).
Try the following command, which should return 1000:

scala> sc.parallelize(1 to 1000).count()

## Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1000:

>>> sc.parallelize(range(1000)).count()

## Example Programs

Spark also comes with several sample programs in the `examples` directory.
To run one of them, use `./bin/run-example <class> <params>`. For example:
Expand All @@ -38,13 +51,13 @@ All of the Spark samples take a `<master>` parameter that is the cluster URL
to connect to. This can be a mesos:// or spark:// URL, or "local" to run
locally with one thread, or "local[N]" to run locally with N threads.

## Running tests
## Running Tests

Testing first requires [Building](#building) Spark. Once Spark is built, tests
Testing first requires [building Spark](#building-spark). Once Spark is built, tests
can be run using:

`./sbt/sbt test`
./sbt/sbt test

## A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
Expand Down
12 changes: 11 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
Expand Down Expand Up @@ -208,7 +218,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>buildnumber-maven-plugin</artifactId>
<version>1.1</version>
<version>1.2</version>
<executions>
<execution>
<phase>validate</phase>
Expand Down
20 changes: 12 additions & 8 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,27 +220,31 @@ object Bagel extends Logging {
*/
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
grouped: RDD[(K, (Iterable[C], Iterable[V]))],
compute: (V, Option[C]) => (V, Array[M]),
storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
val processed = grouped.flatMapValues {
case (_, vs) if vs.size == 0 => None
case (c, vs) =>
val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
.flatMapValues {
case (_, vs) if !vs.hasNext => None
case (c, vs) => {
val (newVert, newMsgs) =
compute(vs(0), c match {
case Seq(comb) => Some(comb)
case Seq() => None
})
compute(vs.next,
c.hasNext match {
case true => Some(c.next)
case false => None
}
)

numMsgs += newMsgs.size
if (newVert.active) {
numActiveVerts += 1
}

Some((newVert, newMsgs))
}
}.persist(storageLevel)

// Force evaluation of processed RDD for accurate performance measurements
Expand Down
6 changes: 4 additions & 2 deletions bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.storage.StorageLevel

import scala.language.postfixOps

class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable

class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {

var sc: SparkContext = _

after {
if (sc != null) {
sc.stop()
Expand Down
35 changes: 19 additions & 16 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,7 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"

# Support for interacting with Hive. Since hive pulls in a lot of dependencies that might break
# existing Spark applications, it is not included in the standard spark assembly. Instead, we only
# include it in the classpath if the user has explicitly requested it by running "sbt hive/assembly"
# Hopefully we will find a way to avoid uber-jars entirely and deploy only the needed packages in
# the future.
if [ -f "$FWDIR"/sql/hive/target/scala-$SCALA_VERSION/spark-hive-assembly-*.jar ]; then

# Datanucleus jars do not work if only included in the uberjar as plugin.xml metadata is lost.
DATANUCLEUSJARS=$(JARS=("$FWDIR/lib_managed/jars"/datanucleus-*.jar); IFS=:; echo "${JARS[*]}")
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS

ASSEMBLY_DIR="$FWDIR/sql/hive/target/scala-$SCALA_VERSION/"
else
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION/"
fi
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

# First check if we have a dependencies jar. If so, include binary classes with the deps jar
if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
Expand All @@ -59,7 +45,7 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"

DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark*-assembly*hadoop*-deps.jar`
DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar`
CLASSPATH="$CLASSPATH:$DEPS_ASSEMBLY_JAR"
else
# Else use spark-assembly jar from either RELEASE or assembly directory
Expand All @@ -71,6 +57,23 @@ else
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
fi

# When Hive support is needed, Datanucleus jars must be included on the classpath.
# Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
# Both sbt and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is
# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ 2>/dev/null | grep "datanucleus-.*\\.jar" | wc -l)
if [ $num_datanucleus_jars -gt 0 ]; then
AN_ASSEMBLY_JAR=${ASSEMBLY_JAR:-$DEPS_ASSEMBLY_JAR}
num_hive_files=$(jar tvf "$AN_ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null | wc -l)
if [ $num_hive_files -gt 0 ]; then
echo "Spark assembly has been built with Hive, including Datanucleus jars on classpath" 1>&2
DATANUCLEUSJARS=$(echo "$FWDIR/lib_managed/jars"/datanucleus-*.jar | tr " " :)
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS
fi
fi

# Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1
if [[ $SPARK_TESTING == 1 ]]; then
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes"
Expand Down
3 changes: 3 additions & 0 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ if [ -z "$SPARK_ENV_LOADED" ]; then
use_conf_dir=${SPARK_CONF_DIR:-"$parent_dir/conf"}

if [ -f "${use_conf_dir}/spark-env.sh" ]; then
# Promote all variable declarations to environment (exported) variables
set -a
. "${use_conf_dir}/spark-env.sh"
set +a
fi
fi
3 changes: 2 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi

if [[ "$IPYTHON" = "1" ]] ; then
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
exec ipython $IPYTHON_OPTS
else
exec "$PYSPARK_PYTHON" "$@"
Expand Down
1 change: 0 additions & 1 deletion bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ fi

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$SPARK_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
Expand Down
12 changes: 7 additions & 5 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ DEFAULT_MEM=${SPARK_MEM:-512m}

SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"

# Add java opts and memory settings for master, worker, executors, and repl.
# Add java opts and memory settings for master, worker, history server, executors, and repl.
case "$1" in
# Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
# Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
Expand All @@ -58,6 +58,10 @@ case "$1" in
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;
'org.apache.spark.deploy.history.HistoryServer')
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;

# Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
'org.apache.spark.executor.CoarseGrainedExecutorBackend')
Expand Down Expand Up @@ -94,7 +98,7 @@ fi

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$_SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
Expand Down Expand Up @@ -154,5 +158,3 @@ if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
fi

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"


7 changes: 5 additions & 2 deletions bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m

set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true

rem Add java opts and memory settings for master, worker, executors, and repl.
rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
rem Add java opts and memory settings for master, worker, history server, executors, and repl.
rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
if "%1"=="org.apache.spark.deploy.master.Master" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%

rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (
Expand Down
8 changes: 4 additions & 4 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ set -o posix
FWDIR="$(cd `dirname $0`/..; pwd)"

SPARK_REPL_OPTS="${SPARK_REPL_OPTS:-""}"
DEFAULT_MASTER="local"
DEFAULT_MASTER="local[*]"
MASTER=${MASTER:-""}

info_log=0
Expand Down Expand Up @@ -64,7 +64,7 @@ ${txtbld}OPTIONS${txtrst}:
is followed by m for megabytes or g for gigabytes, e.g. "1g".
-dm --driver-memory : The memory used by the Spark Shell, the number is followed
by m for megabytes or g for gigabytes, e.g. "1g".
-m --master : A full string that describes the Spark Master, defaults to "local"
-m --master : A full string that describes the Spark Master, defaults to "local[*]"
e.g. "spark://localhost:7077".
--log-conf : Enables logging of the supplied SparkConf as INFO at start of the
Spark Context.
Expand Down Expand Up @@ -127,7 +127,7 @@ function set_spark_log_conf(){

function set_spark_master(){
if ! [[ "$1" =~ $ARG_FLAG_PATTERN ]]; then
MASTER="$1"
export MASTER="$1"
else
out_error "wrong format for $2"
fi
Expand All @@ -145,7 +145,7 @@ function resolve_spark_master(){
fi

if [ -z "$MASTER" ]; then
MASTER="$DEFAULT_MASTER"
export MASTER="$DEFAULT_MASTER"
fi

}
Expand Down
7 changes: 6 additions & 1 deletion bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ while (($#)); do
DEPLOY_MODE=$2
elif [ $1 = "--driver-memory" ]; then
DRIVER_MEMORY=$2
elif [ $1 = "--driver-library-path" ]; then
export _SPARK_LIBRARY_PATH=$2
elif [ $1 = "--driver-class-path" ]; then
export SPARK_CLASSPATH="$SPARK_CLASSPATH:$2"
elif [ $1 = "--driver-java-options" ]; then
export SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $2"
fi

shift
done

Expand Down
7 changes: 7 additions & 0 deletions conf/spark-defaults.conf.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
Loading