Skip to content

Commit 3ccc313

Browse files
committed
Merge branch 'master' into eval
2 parents 1a47e10 + 87d0928 commit 3ccc313

File tree

22 files changed

+256
-91
lines changed

22 files changed

+256
-91
lines changed

assembly/pom.xml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,16 @@
163163
</dependency>
164164
</dependencies>
165165
</profile>
166+
<profile>
167+
<id>hive</id>
168+
<dependencies>
169+
<dependency>
170+
<groupId>org.apache.spark</groupId>
171+
<artifactId>spark-hive_${scala.binary.version}</artifactId>
172+
<version>${project.version}</version>
173+
</dependency>
174+
</dependencies>
175+
</profile>
166176
<profile>
167177
<id>spark-ganglia-lgpl</id>
168178
<dependencies>
@@ -208,7 +218,7 @@
208218
<plugin>
209219
<groupId>org.codehaus.mojo</groupId>
210220
<artifactId>buildnumber-maven-plugin</artifactId>
211-
<version>1.1</version>
221+
<version>1.2</version>
212222
<executions>
213223
<execution>
214224
<phase>validate</phase>

bin/compute-classpath.sh

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,7 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
3030
# Build up classpath
3131
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"
3232

33-
# Support for interacting with Hive. Since hive pulls in a lot of dependencies that might break
34-
# existing Spark applications, it is not included in the standard spark assembly. Instead, we only
35-
# include it in the classpath if the user has explicitly requested it by running "sbt hive/assembly"
36-
# Hopefully we will find a way to avoid uber-jars entirely and deploy only the needed packages in
37-
# the future.
38-
if [ -f "$FWDIR"/sql/hive/target/scala-$SCALA_VERSION/spark-hive-assembly-*.jar ]; then
39-
40-
# Datanucleus jars do not work if only included in the uberjar as plugin.xml metadata is lost.
41-
DATANUCLEUSJARS=$(JARS=("$FWDIR/lib_managed/jars"/datanucleus-*.jar); IFS=:; echo "${JARS[*]}")
42-
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS
43-
44-
ASSEMBLY_DIR="$FWDIR/sql/hive/target/scala-$SCALA_VERSION/"
45-
else
46-
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION/"
47-
fi
33+
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"
4834

4935
# First check if we have a dependencies jar. If so, include binary classes with the deps jar
5036
if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
@@ -59,7 +45,7 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
5945
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
6046
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
6147

62-
DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark*-assembly*hadoop*-deps.jar`
48+
DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar`
6349
CLASSPATH="$CLASSPATH:$DEPS_ASSEMBLY_JAR"
6450
else
6551
# Else use spark-assembly jar from either RELEASE or assembly directory
@@ -71,6 +57,23 @@ else
7157
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
7258
fi
7359

60+
# When Hive support is needed, Datanucleus jars must be included on the classpath.
61+
# Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
62+
# Both sbt and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is
63+
# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
64+
# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
65+
# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
66+
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ | grep "datanucleus-.*\\.jar" | wc -l)
67+
if [ $num_datanucleus_jars -gt 0 ]; then
68+
AN_ASSEMBLY_JAR=${ASSEMBLY_JAR:-$DEPS_ASSEMBLY_JAR}
69+
num_hive_files=$(jar tvf "$AN_ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null | wc -l)
70+
if [ $num_hive_files -gt 0 ]; then
71+
echo "Spark assembly has been built with Hive, including Datanucleus jars on classpath" 1>&2
72+
DATANUCLEUSJARS=$(echo "$FWDIR/lib_managed/jars"/datanucleus-*.jar | tr " " :)
73+
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS
74+
fi
75+
fi
76+
7477
# Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1
7578
if [[ $SPARK_TESTING == 1 ]]; then
7679
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes"

bin/spark-class

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,5 +154,3 @@ if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
154154
fi
155155

156156
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
157-
158-

core/pom.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,10 @@
117117
<dependency>
118118
<groupId>com.twitter</groupId>
119119
<artifactId>chill_${scala.binary.version}</artifactId>
120-
<version>0.3.1</version>
121120
</dependency>
122121
<dependency>
123122
<groupId>com.twitter</groupId>
124123
<artifactId>chill-java</artifactId>
125-
<version>0.3.1</version>
126124
</dependency>
127125
<dependency>
128126
<groupId>commons-net</groupId>

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ private[deploy] object DeployMessages {
8686

8787
case class KillDriver(driverId: String) extends DeployMessage
8888

89+
// Worker internal
90+
91+
case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
92+
8993
// AppClient to Master
9094

9195
case class RegisterApplication(appDescription: ApplicationDescription)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ private[spark] class Worker(
6464
val REGISTRATION_TIMEOUT = 20.seconds
6565
val REGISTRATION_RETRIES = 3
6666

67+
val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true)
68+
// How often worker will clean up old app folders
69+
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
70+
// TTL for app folders/data; after TTL expires it will be cleaned up
71+
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
72+
6773
// Index into masterUrls that we're currently trying to register with.
6874
var masterIndex = 0
6975

@@ -179,12 +185,28 @@ private[spark] class Worker(
179185
registered = true
180186
changeMaster(masterUrl, masterWebUiUrl)
181187
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
188+
if (CLEANUP_ENABLED) {
189+
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
190+
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
191+
}
182192

183193
case SendHeartbeat =>
184194
masterLock.synchronized {
185195
if (connected) { master ! Heartbeat(workerId) }
186196
}
187197

198+
case WorkDirCleanup =>
199+
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
200+
val cleanupFuture = concurrent.future {
201+
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
202+
Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
203+
.foreach(Utils.deleteRecursively)
204+
}
205+
cleanupFuture onFailure {
206+
case e: Throwable =>
207+
logError("App dir cleanup failed: " + e.getMessage, e)
208+
}
209+
188210
case MasterChanged(masterUrl, masterWebUiUrl) =>
189211
logInfo("Master has changed, new master is at " + masterUrl)
190212
changeMaster(masterUrl, masterWebUiUrl)
@@ -331,7 +353,6 @@ private[spark] class Worker(
331353
}
332354

333355
private[spark] object Worker {
334-
335356
def main(argStrings: Array[String]) {
336357
val args = new WorkerArguments(argStrings)
337358
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,9 +597,24 @@ private[spark] object Utils extends Logging {
597597
}
598598

599599
if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
600-
return false;
600+
return false
601601
} else {
602-
return true;
602+
return true
603+
}
604+
}
605+
606+
/**
607+
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
608+
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
609+
* @param cutoff measured in seconds. Files older than this are returned.
610+
*/
611+
def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
612+
val currentTimeMillis = System.currentTimeMillis
613+
if (dir.isDirectory) {
614+
val files = listFilesSafely(dir)
615+
files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
616+
} else {
617+
throw new IllegalArgumentException(dir + " is not a directory!")
603618
}
604619
}
605620

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.util
1919

2020
import scala.util.Random
2121

22-
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
22+
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
2323
import java.nio.{ByteBuffer, ByteOrder}
2424

2525
import com.google.common.base.Charsets
@@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite {
154154
val iterator = Iterator.range(0, 5)
155155
assert(Utils.getIteratorSize(iterator) === 5L)
156156
}
157+
158+
test("findOldFiles") {
159+
// create some temporary directories and files
160+
val parent: File = Utils.createTempDir()
161+
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
162+
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
163+
// set the last modified time of child1 to 10 secs old
164+
child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
165+
166+
val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
167+
assert(result.size.equals(1))
168+
assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
169+
}
157170
}
158171

dev/audit-release/maven_app_core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
<plugins>
5050
<plugin>
5151
<artifactId>maven-compiler-plugin</artifactId>
52-
<version>2.3.2</version>
52+
<version>3.1</version>
5353
</plugin>
5454
</plugins>
5555
</build>

dev/create-release/create-release.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@ mvn -DskipTests \
4949
-Darguments="-DskipTests=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
5050
-Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
5151
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
52-
-Pyarn -Pspark-ganglia-lgpl \
52+
-Pyarn -Phive -Pspark-ganglia-lgpl\
5353
-Dtag=$GIT_TAG -DautoVersionSubmodules=true \
5454
--batch-mode release:prepare
5555

5656
mvn -DskipTests \
5757
-Darguments="-DskipTests=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
5858
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
59-
-Pyarn -Pspark-ganglia-lgpl\
59+
-Pyarn -Phive -Pspark-ganglia-lgpl\
6060
release:perform
6161

6262
rm -rf spark

0 commit comments

Comments
 (0)