diff --git a/assembly/pom.xml b/assembly/pom.xml
index 4b617050ca4d5..de14086bbf65f 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent
- 1.0.0-candidate-csd-1-SNAPSHOT
+ 1.0.1-candidate-csd-1-SNAPSHOT
../pom.xml
diff --git a/bagel/pom.xml b/bagel/pom.xml
index eb66f1daf9736..783e2102c981a 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent
- 1.0.0-candidate-csd-1-SNAPSHOT
+ 1.0.1-candidate-csd-1-SNAPSHOT
../pom.xml
diff --git a/core/pom.xml b/core/pom.xml
index b30b8b15ae9de..a69b86822d27c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent
- 1.0.0-candidate-csd-1-SNAPSHOT
+ 1.0.1-candidate-csd-1-SNAPSHOT
../pom.xml
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index a6e300d345786..0e3750fdde415 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -59,6 +59,13 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
}
def addFileToDir(file: File, dir: File) : String = {
+ // Check whether the file is a directory. If it is, throw a more meaningful exception.
+ // If we don't catch this, Guava throws a very confusing error message:
+ // java.io.FileNotFoundException: [file] (No such file or directory)
+ // even though the directory ([file]) exists.
+ if (file.isDirectory) {
+ throw new IllegalArgumentException(s"$file cannot be a directory.")
+ }
Files.copy(file, new File(dir, file.getName))
dir + "/" + file.getName
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 49737fa4be56b..03ceff8bf1fb0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -794,7 +794,7 @@ class SparkContext(config: SparkConf) extends Logging {
addedFiles(key) = System.currentTimeMillis
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
- Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
+ Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager)
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
@@ -932,13 +932,12 @@ class SparkContext(config: SparkConf) extends Logging {
try {
env.httpFileServer.addJar(new File(fileName))
} catch {
- case e: Exception => {
+ case e: Exception =>
// For now just log an error but allow to go through so spark examples work.
// The spark examples don't really need the jar distributed since its also
// the app jar.
logError("Error adding jar (" + e + "), was the --addJars option used?")
null
- }
}
} else {
env.httpFileServer.addJar(new File(uri.getPath))
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 2051403682737..d27e0e1f15c65 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -61,17 +61,23 @@ private[spark] class ExecutorRunner(
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
- killProcess()
+ killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}
- private def killProcess() {
+ /**
+ * kill executor process, wait for exit and notify worker to update resource status
+ *
+ * @param message the exception message which caused the executor's death
+ */
+ private def killProcess(message: Option[String]) {
if (process != null) {
logInfo("Killing process!")
process.destroy()
- process.waitFor()
+ val exitCode = process.waitFor()
+ worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
}
}
@@ -82,7 +88,6 @@ private[spark] class ExecutorRunner(
workerThread.interrupt()
workerThread = null
state = ExecutorState.KILLED
- worker ! ExecutorStateChanged(appId, execId, state, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
@@ -148,14 +153,13 @@ private[spark] class ExecutorRunner(
} catch {
case interrupted: InterruptedException => {
logInfo("Runner thread for executor " + fullId + " interrupted")
- killProcess()
+ state = ExecutorState.KILLED
+ killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
- killProcess()
state = ExecutorState.FAILED
- val message = e.getClass + ": " + e.getMessage
- worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
+ killProcess(Some(e.toString))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 8b6747977eb87..100de26170a50 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -317,10 +317,14 @@ private[spark] class Worker(
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
+ case DriverState.FAILED =>
+ logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
+ case _ =>
+ logDebug(s"Driver $driverId changed state to $state")
}
masterLock.synchronized {
master ! DriverStateChanged(driverId, state, exception)
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 84aec65b7765d..2279d77c91c89 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,11 +22,12 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class CoarseGrainedExecutorBackend(
@@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Slave registration failed: " + message)
System.exit(1)
- case LaunchTask(taskDesc) =>
- logInfo("Got assigned task " + taskDesc.taskId)
+ case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val taskDesc = ser.deserialize[TaskDescription](data.value)
+ logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index dcbbc1853186b..5dd5fd0047c0d 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
implicit val futureExecContext = ExecutionContext.fromExecutor(
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
- private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
+ @volatile
+ private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null
private val authEnabled = securityManager.isAuthenticationEnabled()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index ddbc74e82ac49..ca74069ef885c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
private[spark] object CoarseGrainedClusterMessages {
// Driver to executors
- case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
+ case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index a6d6b3d26a3c6..e47a060683a2d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{Logging, SparkException, TaskState}
+import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
var totalCoreCount = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
+ private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
@@ -140,8 +141,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
- freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
- executorActor(task.executorId) ! LaunchTask(task)
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val serializedTask = ser.serialize(task)
+ if (serializedTask.limit >= akkaFrameSize - 1024) {
+ val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
+ scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
+ try {
+ var msg = "Serialized task %s:%d was %d bytes which " +
+ "exceeds spark.akka.frameSize (%d bytes). " +
+ "Consider using broadcast variables for large values."
+ msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
+ taskSet.abort(msg)
+ } catch {
+ case e: Exception => logError("Exception in error callback", e)
+ }
+ }
+ }
+ else {
+ freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
+ executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6534095811907..6e450081dcb11 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/
- var cachedPeers: Seq[BlockManagerId] = null
+ @volatile var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
new file mode 100644
index 0000000000000..efef9d26dadca
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext}
+import org.apache.spark.util.{SerializableBuffer, AkkaUtils}
+
+import org.scalatest.FunSuite
+
+class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext {
+
+ test("serialized task larger than akka frame size") {
+ val conf = new SparkConf
+ conf.set("spark.akka.frameSize","1")
+ conf.set("spark.default.parallelism","1")
+ sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
+ val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
+ val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
+ val larger = sc.parallelize(Seq(buffer))
+ val thrown = intercept[SparkException] {
+ larger.collect()
+ }
+ assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
+ val smaller = sc.parallelize(1 to 4).collect()
+ assert(smaller.size === 4)
+ }
+
+}
diff --git a/dev/audit-release/audit_release.py b/dev/audit-release/audit_release.py
index 8c7573b91f688..230e900ecd4de 100755
--- a/dev/audit-release/audit_release.py
+++ b/dev/audit-release/audit_release.py
@@ -30,18 +30,18 @@
import time
import urllib2
-## Fill in release details here:
+# Fill in release details here:
RELEASE_URL = "http://people.apache.org/~pwendell/spark-1.0.0-rc1/"
RELEASE_KEY = "9E4FE3AF"
RELEASE_REPOSITORY = "https://repository.apache.org/content/repositories/orgapachespark-1006/"
RELEASE_VERSION = "1.0.0"
SCALA_VERSION = "2.10.4"
SCALA_BINARY_VERSION = "2.10"
-##
+#
LOG_FILE_NAME = "spark_audit_%s" % time.strftime("%h_%m_%Y_%I_%M_%S")
LOG_FILE = open(LOG_FILE_NAME, 'w')
-WORK_DIR = "/tmp/audit_%s" % int(time.time())
+WORK_DIR = "/tmp/audit_%s" % int(time.time())
MAVEN_CMD = "mvn"
GPG_CMD = "gpg"
@@ -50,54 +50,62 @@
# Track failures
failures = []
+
def clean_work_files():
- print "OK to delete scratch directory '%s'? (y/N): " % WORK_DIR
- response = raw_input()
- if response == "y":
- shutil.rmtree(WORK_DIR)
- print "Should I delete the log output file '%s'? (y/N): " % LOG_FILE_NAME
- response = raw_input()
- if response == "y":
- os.unlink(LOG_FILE_NAME)
+ print "OK to delete scratch directory '%s'? (y/N): " % WORK_DIR
+ response = raw_input()
+ if response == "y":
+ shutil.rmtree(WORK_DIR)
+ print "Should I delete the log output file '%s'? (y/N): " % LOG_FILE_NAME
+ response = raw_input()
+ if response == "y":
+ os.unlink(LOG_FILE_NAME)
+
def run_cmd(cmd, exit_on_failure=True):
- print >> LOG_FILE, "Running command: %s" % cmd
- ret = subprocess.call(cmd, shell=True, stdout=LOG_FILE, stderr=LOG_FILE)
- if ret != 0 and exit_on_failure:
- print "Command failed: %s" % cmd
- clean_work_files()
- sys.exit(-1)
- return ret
+ print >> LOG_FILE, "Running command: %s" % cmd
+ ret = subprocess.call(cmd, shell=True, stdout=LOG_FILE, stderr=LOG_FILE)
+ if ret != 0 and exit_on_failure:
+ print "Command failed: %s" % cmd
+ clean_work_files()
+ sys.exit(-1)
+ return ret
+
def run_cmd_with_output(cmd):
- print >> sys.stderr, "Running command: %s" % cmd
- return subprocess.check_output(cmd, shell=True, stderr=LOG_FILE)
+ print >> sys.stderr, "Running command: %s" % cmd
+ return subprocess.check_output(cmd, shell=True, stderr=LOG_FILE)
+
def test(bool, str):
- if bool:
- return passed(str)
- failed(str)
+ if bool:
+ return passed(str)
+ failed(str)
+
def passed(str):
- print "[PASSED] %s" % str
+ print "[PASSED] %s" % str
+
def failed(str):
- failures.append(str)
- print "[**FAILED**] %s" % str
+ failures.append(str)
+ print "[**FAILED**] %s" % str
+
def get_url(url):
- return urllib2.urlopen(url).read()
+ return urllib2.urlopen(url).read()
+
original_dir = os.getcwd()
-# For each of these modules, we'll test an 'empty' application in sbt and
+# For each of these modules, we'll test an 'empty' application in sbt and
# maven that links against them. This will catch issues with messed up
# dependencies within those projects.
modules = [
- "spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl",
- "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
- "spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq",
- "spark-catalyst", "spark-sql", "spark-hive"
+ "spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl",
+ "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
+ "spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq",
+ "spark-catalyst", "spark-sql", "spark-hive"
]
modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)
@@ -106,54 +114,57 @@ def get_url(url):
cache_ivy_spark = "~/.ivy2/cache/org.apache.spark"
local_maven_kafka = "~/.m2/repository/org/apache/kafka"
local_maven_kafka = "~/.m2/repository/org/apache/spark"
+
+
def ensure_path_not_present(x):
- if os.path.exists(os.path.expanduser(x)):
- print "Please remove %s, it can interfere with testing published artifacts." % x
- sys.exit(-1)
+ if os.path.exists(os.path.expanduser(x)):
+ print "Please remove %s, it can interfere with testing published artifacts." % x
+ sys.exit(-1)
+
map(ensure_path_not_present, [local_ivy_spark, cache_ivy_spark, local_maven_kafka])
-# SBT build tests
+# SBT build tests
os.chdir("blank_sbt_build")
os.environ["SPARK_VERSION"] = RELEASE_VERSION
os.environ["SCALA_VERSION"] = SCALA_VERSION
os.environ["SPARK_RELEASE_REPOSITORY"] = RELEASE_REPOSITORY
os.environ["SPARK_AUDIT_MASTER"] = "local"
for module in modules:
- os.environ["SPARK_MODULE"] = module
- ret = run_cmd("sbt clean update", exit_on_failure=False)
- test(ret == 0, "sbt build against '%s' module" % module)
+ os.environ["SPARK_MODULE"] = module
+ ret = run_cmd("sbt clean update", exit_on_failure=False)
+ test(ret == 0, "sbt build against '%s' module" % module)
os.chdir(original_dir)
# SBT application tests
for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive"]:
- os.chdir(app)
- ret = run_cmd("sbt clean run", exit_on_failure=False)
- test(ret == 0, "sbt application (%s)" % app)
- os.chdir(original_dir)
+ os.chdir(app)
+ ret = run_cmd("sbt clean run", exit_on_failure=False)
+ test(ret == 0, "sbt application (%s)" % app)
+ os.chdir(original_dir)
# Maven build tests
os.chdir("blank_maven_build")
for module in modules:
- cmd = ('%s --update-snapshots -Dspark.release.repository="%s" -Dspark.version="%s" '
- '-Dspark.module="%s" clean compile' %
- (MAVEN_CMD, RELEASE_REPOSITORY, RELEASE_VERSION, module))
- ret = run_cmd(cmd, exit_on_failure=False)
- test(ret == 0, "maven build against '%s' module" % module)
+ cmd = ('%s --update-snapshots -Dspark.release.repository="%s" -Dspark.version="%s" '
+ '-Dspark.module="%s" clean compile' %
+ (MAVEN_CMD, RELEASE_REPOSITORY, RELEASE_VERSION, module))
+ ret = run_cmd(cmd, exit_on_failure=False)
+ test(ret == 0, "maven build against '%s' module" % module)
os.chdir(original_dir)
os.chdir("maven_app_core")
mvn_exec_cmd = ('%s --update-snapshots -Dspark.release.repository="%s" -Dspark.version="%s" '
'-Dscala.binary.version="%s" clean compile '
- 'exec:java -Dexec.mainClass="SimpleApp"' %
- (MAVEN_CMD, RELEASE_REPOSITORY, RELEASE_VERSION, SCALA_BINARY_VERSION))
+ 'exec:java -Dexec.mainClass="SimpleApp"' %
+ (MAVEN_CMD, RELEASE_REPOSITORY, RELEASE_VERSION, SCALA_BINARY_VERSION))
ret = run_cmd(mvn_exec_cmd, exit_on_failure=False)
test(ret == 0, "maven application (core)")
os.chdir(original_dir)
# Binary artifact tests
if os.path.exists(WORK_DIR):
- print "Working directory '%s' already exists" % WORK_DIR
- sys.exit(-1)
+ print "Working directory '%s' already exists" % WORK_DIR
+ sys.exit(-1)
os.mkdir(WORK_DIR)
os.chdir(WORK_DIR)
@@ -162,66 +173,66 @@ def ensure_path_not_present(x):
artifacts = r.findall(index_page)
for artifact in artifacts:
- print "==== Verifying download integrity for artifact: %s ====" % artifact
-
- artifact_url = "%s/%s" % (RELEASE_URL, artifact)
- run_cmd("wget %s" % artifact_url)
-
- key_file = "%s.asc" % artifact
- run_cmd("wget %s/%s" % (RELEASE_URL, key_file))
-
- run_cmd("wget %s%s" % (artifact_url, ".sha"))
-
- # Verify signature
- run_cmd("%s --keyserver pgp.mit.edu --recv-key %s" % (GPG_CMD, RELEASE_KEY))
- run_cmd("%s %s" % (GPG_CMD, key_file))
- passed("Artifact signature verified.")
-
- # Verify md5
- my_md5 = run_cmd_with_output("%s --print-md MD5 %s" % (GPG_CMD, artifact)).strip()
- release_md5 = get_url("%s.md5" % artifact_url).strip()
- test(my_md5 == release_md5, "Artifact MD5 verified.")
-
- # Verify sha
- my_sha = run_cmd_with_output("%s --print-md SHA512 %s" % (GPG_CMD, artifact)).strip()
- release_sha = get_url("%s.sha" % artifact_url).strip()
- test(my_sha == release_sha, "Artifact SHA verified.")
-
- # Verify Apache required files
- dir_name = artifact.replace(".tgz", "")
- run_cmd("tar xvzf %s" % artifact)
- base_files = os.listdir(dir_name)
- test("CHANGES.txt" in base_files, "Tarball contains CHANGES.txt file")
- test("NOTICE" in base_files, "Tarball contains NOTICE file")
- test("LICENSE" in base_files, "Tarball contains LICENSE file")
-
- os.chdir(WORK_DIR)
-
+ print "==== Verifying download integrity for artifact: %s ====" % artifact
+
+ artifact_url = "%s/%s" % (RELEASE_URL, artifact)
+ run_cmd("wget %s" % artifact_url)
+
+ key_file = "%s.asc" % artifact
+ run_cmd("wget %s/%s" % (RELEASE_URL, key_file))
+
+ run_cmd("wget %s%s" % (artifact_url, ".sha"))
+
+ # Verify signature
+ run_cmd("%s --keyserver pgp.mit.edu --recv-key %s" % (GPG_CMD, RELEASE_KEY))
+ run_cmd("%s %s" % (GPG_CMD, key_file))
+ passed("Artifact signature verified.")
+
+ # Verify md5
+ my_md5 = run_cmd_with_output("%s --print-md MD5 %s" % (GPG_CMD, artifact)).strip()
+ release_md5 = get_url("%s.md5" % artifact_url).strip()
+ test(my_md5 == release_md5, "Artifact MD5 verified.")
+
+ # Verify sha
+ my_sha = run_cmd_with_output("%s --print-md SHA512 %s" % (GPG_CMD, artifact)).strip()
+ release_sha = get_url("%s.sha" % artifact_url).strip()
+ test(my_sha == release_sha, "Artifact SHA verified.")
+
+ # Verify Apache required files
+ dir_name = artifact.replace(".tgz", "")
+ run_cmd("tar xvzf %s" % artifact)
+ base_files = os.listdir(dir_name)
+ test("CHANGES.txt" in base_files, "Tarball contains CHANGES.txt file")
+ test("NOTICE" in base_files, "Tarball contains NOTICE file")
+ test("LICENSE" in base_files, "Tarball contains LICENSE file")
+
+ os.chdir(WORK_DIR)
+
for artifact in artifacts:
- print "==== Verifying build and tests for artifact: %s ====" % artifact
- os.chdir(os.path.join(WORK_DIR, dir_name))
-
- os.environ["MAVEN_OPTS"] = "-Xmx3g -XX:MaxPermSize=1g -XX:ReservedCodeCacheSize=1g"
- # Verify build
- print "==> Running build"
- run_cmd("sbt assembly")
- passed("sbt build successful")
- run_cmd("%s package -DskipTests" % MAVEN_CMD)
- passed("Maven build successful")
-
- # Verify tests
- print "==> Performing unit tests"
- run_cmd("%s test" % MAVEN_CMD)
- passed("Tests successful")
- os.chdir(WORK_DIR)
+ print "==== Verifying build and tests for artifact: %s ====" % artifact
+ os.chdir(os.path.join(WORK_DIR, dir_name))
+
+ os.environ["MAVEN_OPTS"] = "-Xmx3g -XX:MaxPermSize=1g -XX:ReservedCodeCacheSize=1g"
+ # Verify build
+ print "==> Running build"
+ run_cmd("sbt assembly")
+ passed("sbt build successful")
+ run_cmd("%s package -DskipTests" % MAVEN_CMD)
+ passed("Maven build successful")
+
+ # Verify tests
+ print "==> Performing unit tests"
+ run_cmd("%s test" % MAVEN_CMD)
+ passed("Tests successful")
+ os.chdir(WORK_DIR)
clean_work_files()
if len(failures) == 0:
- print "ALL TESTS PASSED"
+ print "ALL TESTS PASSED"
else:
- print "SOME TESTS DID NOT PASS"
- for f in failures:
- print f
+ print "SOME TESTS DID NOT PASS"
+ for f in failures:
+ print f
os.chdir(original_dir)
diff --git a/dev/create-release/generate-changelist.py b/dev/create-release/generate-changelist.py
index 13b744ec1b37e..de1b5d4ae1314 100755
--- a/dev/create-release/generate-changelist.py
+++ b/dev/create-release/generate-changelist.py
@@ -29,16 +29,16 @@
import subprocess
import time
import traceback
-
+
SPARK_HOME = os.environ["SPARK_HOME"]
NEW_RELEASE_VERSION = "1.0.0"
PREV_RELEASE_GIT_TAG = "v0.9.1"
-
-CHANGELIST = "CHANGES.txt"
+
+CHANGELIST = "CHANGES.txt"
OLD_CHANGELIST = "%s.old" % (CHANGELIST)
NEW_CHANGELIST = "%s.new" % (CHANGELIST)
TMP_CHANGELIST = "%s.tmp" % (CHANGELIST)
-
+
# date before first PR in TLP Spark repo
SPARK_REPO_CHANGE_DATE1 = time.strptime("2014-02-26", "%Y-%m-%d")
# date after last PR in incubator Spark repo
@@ -46,99 +46,103 @@
# Threshold PR number that differentiates PRs to TLP
# and incubator repos
SPARK_REPO_PR_NUM_THRESH = 200
-
+
LOG_FILE_NAME = "changes_%s" % time.strftime("%h_%m_%Y_%I_%M_%S")
LOG_FILE = open(LOG_FILE_NAME, 'w')
-
+
+
def run_cmd(cmd):
- try:
- print >> LOG_FILE, "Running command: %s" % cmd
- output = subprocess.check_output(cmd, shell=True, stderr=LOG_FILE)
- print >> LOG_FILE, "Output: %s" % output
- return output
- except:
- traceback.print_exc()
- cleanup()
- sys.exit(1)
-
+ try:
+ print >> LOG_FILE, "Running command: %s" % cmd
+ output = subprocess.check_output(cmd, shell=True, stderr=LOG_FILE)
+ print >> LOG_FILE, "Output: %s" % output
+ return output
+ except:
+ traceback.print_exc()
+ cleanup()
+ sys.exit(1)
+
+
def append_to_changelist(string):
- with open(TMP_CHANGELIST, "a") as f:
- print >> f, string
-
-def cleanup(ask = True):
- if ask == True:
- print "OK to delete temporary and log files? (y/N): "
- response = raw_input()
- if ask == False or (ask == True and response == "y"):
- if os.path.isfile(TMP_CHANGELIST):
- os.remove(TMP_CHANGELIST)
- if os.path.isfile(OLD_CHANGELIST):
- os.remove(OLD_CHANGELIST)
- LOG_FILE.close()
- os.remove(LOG_FILE_NAME)
-
+ with open(TMP_CHANGELIST, "a") as f:
+ print >> f, string
+
+
+def cleanup(ask=True):
+ if ask is True:
+ print "OK to delete temporary and log files? (y/N): "
+ response = raw_input()
+ if ask is False or (ask is True and response == "y"):
+ if os.path.isfile(TMP_CHANGELIST):
+ os.remove(TMP_CHANGELIST)
+ if os.path.isfile(OLD_CHANGELIST):
+ os.remove(OLD_CHANGELIST)
+ LOG_FILE.close()
+ os.remove(LOG_FILE_NAME)
+
+
print "Generating new %s for Spark release %s" % (CHANGELIST, NEW_RELEASE_VERSION)
os.chdir(SPARK_HOME)
if os.path.isfile(TMP_CHANGELIST):
- os.remove(TMP_CHANGELIST)
+ os.remove(TMP_CHANGELIST)
if os.path.isfile(OLD_CHANGELIST):
- os.remove(OLD_CHANGELIST)
-
+ os.remove(OLD_CHANGELIST)
+
append_to_changelist("Spark Change Log")
append_to_changelist("----------------")
append_to_changelist("")
append_to_changelist("Release %s" % NEW_RELEASE_VERSION)
append_to_changelist("")
-
+
print "Getting commits between tag %s and HEAD" % PREV_RELEASE_GIT_TAG
hashes = run_cmd("git log %s..HEAD --pretty='%%h'" % PREV_RELEASE_GIT_TAG).split()
-
+
print "Getting details of %s commits" % len(hashes)
for h in hashes:
- date = run_cmd("git log %s -1 --pretty='%%ad' --date=iso | head -1" % h).strip()
- subject = run_cmd("git log %s -1 --pretty='%%s' | head -1" % h).strip()
- body = run_cmd("git log %s -1 --pretty='%%b'" % h)
- committer = run_cmd("git log %s -1 --pretty='%%cn <%%ce>' | head -1" % h).strip()
- body_lines = body.split("\n")
-
- if "Merge pull" in subject:
- ## Parse old format commit message
- append_to_changelist(" %s %s" % (h, date))
- append_to_changelist(" %s" % subject)
- append_to_changelist(" [%s]" % body_lines[0])
- append_to_changelist("")
-
- elif "maven-release" not in subject:
- ## Parse new format commit message
- # Get authors from commit message, committer otherwise
- authors = [committer]
- if "Author:" in body:
- authors = [line.split(":")[1].strip() for line in body_lines if "Author:" in line]
-
- # Generate GitHub PR URL for easy access if possible
- github_url = ""
- if "Closes #" in body:
- pr_num = [line.split()[1].lstrip("#") for line in body_lines if "Closes #" in line][0]
- github_url = "github.com/apache/spark/pull/%s" % pr_num
- day = time.strptime(date.split()[0], "%Y-%m-%d")
- if day < SPARK_REPO_CHANGE_DATE1 or (day < SPARK_REPO_CHANGE_DATE2 and pr_num < SPARK_REPO_PR_NUM_THRESH):
- github_url = "github.com/apache/incubator-spark/pull/%s" % pr_num
-
- append_to_changelist(" %s" % subject)
- append_to_changelist(" %s" % ', '.join(authors))
- # for author in authors:
- # append_to_changelist(" %s" % author)
- append_to_changelist(" %s" % date)
- if len(github_url) > 0:
- append_to_changelist(" Commit: %s, %s" % (h, github_url))
- else:
- append_to_changelist(" Commit: %s" % h)
- append_to_changelist("")
-
+ date = run_cmd("git log %s -1 --pretty='%%ad' --date=iso | head -1" % h).strip()
+ subject = run_cmd("git log %s -1 --pretty='%%s' | head -1" % h).strip()
+ body = run_cmd("git log %s -1 --pretty='%%b'" % h)
+ committer = run_cmd("git log %s -1 --pretty='%%cn <%%ce>' | head -1" % h).strip()
+ body_lines = body.split("\n")
+
+ if "Merge pull" in subject:
+ # Parse old format commit message
+ append_to_changelist(" %s %s" % (h, date))
+ append_to_changelist(" %s" % subject)
+ append_to_changelist(" [%s]" % body_lines[0])
+ append_to_changelist("")
+
+ elif "maven-release" not in subject:
+ # Parse new format commit message
+ # Get authors from commit message, committer otherwise
+ authors = [committer]
+ if "Author:" in body:
+ authors = [line.split(":")[1].strip() for line in body_lines if "Author:" in line]
+
+ # Generate GitHub PR URL for easy access if possible
+ github_url = ""
+ if "Closes #" in body:
+ pr_num = [line.split()[1].lstrip("#") for line in body_lines if "Closes #" in line][0]
+ github_url = "github.com/apache/spark/pull/%s" % pr_num
+ day = time.strptime(date.split()[0], "%Y-%m-%d")
+ if day < SPARK_REPO_CHANGE_DATE1 or
+ (day < SPARK_REPO_CHANGE_DATE2 and pr_num < SPARK_REPO_PR_NUM_THRESH):
+ github_url = "github.com/apache/incubator-spark/pull/%s" % pr_num
+
+ append_to_changelist(" %s" % subject)
+ append_to_changelist(" %s" % ', '.join(authors))
+ # for author in authors:
+ # append_to_changelist(" %s" % author)
+ append_to_changelist(" %s" % date)
+ if len(github_url) > 0:
+ append_to_changelist(" Commit: %s, %s" % (h, github_url))
+ else:
+ append_to_changelist(" Commit: %s" % h)
+ append_to_changelist("")
+
# Append old change list
-print "Appending changelist from tag %s" % PREV_RELEASE_GIT_TAG
+print "Appending changelist from tag %s" % PREV_RELEASE_GIT_TAG
run_cmd("git show %s:%s | tail -n +3 >> %s" % (PREV_RELEASE_GIT_TAG, CHANGELIST, TMP_CHANGELIST))
run_cmd("cp %s %s" % (TMP_CHANGELIST, NEW_CHANGELIST))
print "New change list generated as %s" % NEW_CHANGELIST
cleanup(False)
-
diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py
index 83618c8068d35..7f744d5589ef7 100755
--- a/dev/merge_spark_pr.py
+++ b/dev/merge_spark_pr.py
@@ -21,7 +21,7 @@
# usage: ./apache-pr-merge.py (see config env vars below)
#
# This utility assumes you already have local a Spark git folder and that you
-# have added remotes corresponding to both (i) the github apache Spark
+# have added remotes corresponding to both (i) the github apache Spark
# mirror and (ii) the apache git repo.
import json
@@ -33,10 +33,10 @@
import urllib2
try:
- import jira.client
- JIRA_IMPORTED=True
+ import jira.client
+ JIRA_IMPORTED = True
except ImportError:
- JIRA_IMPORTED=False
+ JIRA_IMPORTED = False
# Location of your Spark git development area
SPARK_HOME = os.environ.get("SPARK_HOME", "/home/patrick/Documents/spark")
@@ -58,204 +58,217 @@
os.chdir(SPARK_HOME)
+
def get_json(url):
- try:
- return json.load(urllib2.urlopen(url))
- except urllib2.HTTPError as e:
- print "Unable to fetch URL, exiting: %s" % url
- sys.exit(-1)
+ try:
+ return json.load(urllib2.urlopen(url))
+ except urllib2.HTTPError as e:
+ print "Unable to fetch URL, exiting: %s" % url
+ sys.exit(-1)
+
def fail(msg):
- print msg
- clean_up()
- sys.exit(-1)
+ print msg
+ clean_up()
+ sys.exit(-1)
+
def run_cmd(cmd):
- if isinstance(cmd, list):
- return subprocess.check_output(cmd)
- else:
- return subprocess.check_output(cmd.split(" "))
+ if isinstance(cmd, list):
+ return subprocess.check_output(cmd)
+ else:
+ return subprocess.check_output(cmd.split(" "))
+
def continue_maybe(prompt):
- result = raw_input("\n%s (y/n): " % prompt)
- if result.lower() != "y":
- fail("Okay, exiting")
+ result = raw_input("\n%s (y/n): " % prompt)
+ if result.lower() != "y":
+ fail("Okay, exiting")
+
original_head = run_cmd("git rev-parse HEAD")[:8]
+
def clean_up():
- print "Restoring head pointer to %s" % original_head
- run_cmd("git checkout %s" % original_head)
+ print "Restoring head pointer to %s" % original_head
+ run_cmd("git checkout %s" % original_head)
+
+ branches = run_cmd("git branch").replace(" ", "").split("\n")
- branches = run_cmd("git branch").replace(" ", "").split("\n")
+ for branch in filter(lambda x: x.startswith(BRANCH_PREFIX), branches):
+ print "Deleting local branch %s" % branch
+ run_cmd("git branch -D %s" % branch)
- for branch in filter(lambda x: x.startswith(BRANCH_PREFIX), branches):
- print "Deleting local branch %s" % branch
- run_cmd("git branch -D %s" % branch)
# merge the requested PR and return the merge hash
def merge_pr(pr_num, target_ref):
- pr_branch_name = "%s_MERGE_PR_%s" % (BRANCH_PREFIX, pr_num)
- target_branch_name = "%s_MERGE_PR_%s_%s" % (BRANCH_PREFIX, pr_num, target_ref.upper())
- run_cmd("git fetch %s pull/%s/head:%s" % (PR_REMOTE_NAME, pr_num, pr_branch_name))
- run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, target_ref, target_branch_name))
- run_cmd("git checkout %s" % target_branch_name)
-
- had_conflicts = False
- try:
- run_cmd(['git', 'merge', pr_branch_name, '--squash'])
- except Exception as e:
- msg = "Error merging: %s\nWould you like to manually fix-up this merge?" % e
- continue_maybe(msg)
- msg = "Okay, please fix any conflicts and 'git add' conflicting files... Finished?"
- continue_maybe(msg)
- had_conflicts = True
-
- commit_authors = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
- '--pretty=format:%an <%ae>']).split("\n")
- distinct_authors = sorted(set(commit_authors), key=lambda x: commit_authors.count(x),
- reverse=True)
- primary_author = distinct_authors[0]
- commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
- '--pretty=format:%h [%an] %s']).split("\n\n")
-
- merge_message_flags = []
-
- for p in [title, body]:
- merge_message_flags += ["-m", p]
-
- authors = "\n".join(["Author: %s" % a for a in distinct_authors])
-
- merge_message_flags += ["-m", authors]
+ pr_branch_name = "%s_MERGE_PR_%s" % (BRANCH_PREFIX, pr_num)
+ target_branch_name = "%s_MERGE_PR_%s_%s" % (BRANCH_PREFIX, pr_num, target_ref.upper())
+ run_cmd("git fetch %s pull/%s/head:%s" % (PR_REMOTE_NAME, pr_num, pr_branch_name))
+ run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, target_ref, target_branch_name))
+ run_cmd("git checkout %s" % target_branch_name)
+
+ had_conflicts = False
+ try:
+ run_cmd(['git', 'merge', pr_branch_name, '--squash'])
+ except Exception as e:
+ msg = "Error merging: %s\nWould you like to manually fix-up this merge?" % e
+ continue_maybe(msg)
+ msg = "Okay, please fix any conflicts and 'git add' conflicting files... Finished?"
+ continue_maybe(msg)
+ had_conflicts = True
+
+ commit_authors = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
+ '--pretty=format:%an <%ae>']).split("\n")
+ distinct_authors = sorted(set(commit_authors),
+ key=lambda x: commit_authors.count(x), reverse=True)
+ primary_author = distinct_authors[0]
+ commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
+ '--pretty=format:%h [%an] %s']).split("\n\n")
+
+ merge_message_flags = []
+
+ for p in [title, body]:
+ merge_message_flags += ["-m", p]
+
+ authors = "\n".join(["Author: %s" % a for a in distinct_authors])
+
+ merge_message_flags += ["-m", authors]
+
+ if had_conflicts:
+ committer_name = run_cmd("git config --get user.name").strip()
+ committer_email = run_cmd("git config --get user.email").strip()
+ message = "This patch had conflicts when merged, resolved by\nCommitter: %s <%s>" % (
+ committer_name, committer_email)
+ merge_message_flags += ["-m", message]
+
+ # The string "Closes #%s" string is required for GitHub to correctly close the PR
+ merge_message_flags += [
+ "-m",
+ "Closes #%s from %s and squashes the following commits:" % (pr_num, pr_repo_desc)]
+ for c in commits:
+ merge_message_flags += ["-m", c]
+
+ run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags)
+
+ continue_maybe("Merge complete (local ref %s). Push to %s?" % (
+ target_branch_name, PUSH_REMOTE_NAME))
+
+ try:
+ run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, target_branch_name, target_ref))
+ except Exception as e:
+ clean_up()
+ fail("Exception while pushing: %s" % e)
+
+ merge_hash = run_cmd("git rev-parse %s" % target_branch_name)[:8]
+ clean_up()
+ print("Pull request #%s merged!" % pr_num)
+ print("Merge hash: %s" % merge_hash)
+ return merge_hash
- if had_conflicts:
- committer_name = run_cmd("git config --get user.name").strip()
- committer_email = run_cmd("git config --get user.email").strip()
- message = "This patch had conflicts when merged, resolved by\nCommitter: %s <%s>" % (
- committer_name, committer_email)
- merge_message_flags += ["-m", message]
- # The string "Closes #%s" string is required for GitHub to correctly close the PR
- merge_message_flags += ["-m",
- "Closes #%s from %s and squashes the following commits:" % (pr_num, pr_repo_desc)]
- for c in commits:
- merge_message_flags += ["-m", c]
+def cherry_pick(pr_num, merge_hash, default_branch):
+ pick_ref = raw_input("Enter a branch name [%s]: " % default_branch)
+ if pick_ref == "":
+ pick_ref = default_branch
- run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags)
+ pick_branch_name = "%s_PICK_PR_%s_%s" % (BRANCH_PREFIX, pr_num, pick_ref.upper())
- continue_maybe("Merge complete (local ref %s). Push to %s?" % (
- target_branch_name, PUSH_REMOTE_NAME))
+ run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, pick_ref, pick_branch_name))
+ run_cmd("git checkout %s" % pick_branch_name)
+ run_cmd("git cherry-pick -sx %s" % merge_hash)
- try:
- run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, target_branch_name, target_ref))
- except Exception as e:
- clean_up()
- fail("Exception while pushing: %s" % e)
-
- merge_hash = run_cmd("git rev-parse %s" % target_branch_name)[:8]
- clean_up()
- print("Pull request #%s merged!" % pr_num)
- print("Merge hash: %s" % merge_hash)
- return merge_hash
+ continue_maybe("Pick complete (local ref %s). Push to %s?" % (
+ pick_branch_name, PUSH_REMOTE_NAME))
+ try:
+ run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, pick_branch_name, pick_ref))
+ except Exception as e:
+ clean_up()
+ fail("Exception while pushing: %s" % e)
-def cherry_pick(pr_num, merge_hash, default_branch):
- pick_ref = raw_input("Enter a branch name [%s]: " % default_branch)
- if pick_ref == "":
- pick_ref = default_branch
-
- pick_branch_name = "%s_PICK_PR_%s_%s" % (BRANCH_PREFIX, pr_num, pick_ref.upper())
-
- run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, pick_ref, pick_branch_name))
- run_cmd("git checkout %s" % pick_branch_name)
- run_cmd("git cherry-pick -sx %s" % merge_hash)
-
- continue_maybe("Pick complete (local ref %s). Push to %s?" % (
- pick_branch_name, PUSH_REMOTE_NAME))
-
- try:
- run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, pick_branch_name, pick_ref))
- except Exception as e:
+ pick_hash = run_cmd("git rev-parse %s" % pick_branch_name)[:8]
clean_up()
- fail("Exception while pushing: %s" % e)
- pick_hash = run_cmd("git rev-parse %s" % pick_branch_name)[:8]
- clean_up()
+ print("Pull request #%s picked into %s!" % (pr_num, pick_ref))
+ print("Pick hash: %s" % pick_hash)
+ return pick_ref
- print("Pull request #%s picked into %s!" % (pr_num, pick_ref))
- print("Pick hash: %s" % pick_hash)
- return pick_ref
def fix_version_from_branch(branch, versions):
- # Note: Assumes this is a sorted (newest->oldest) list of un-released versions
- if branch == "master":
- return versions[0]
- else:
- branch_ver = branch.replace("branch-", "")
- return filter(lambda x: x.name.startswith(branch_ver), versions)[-1]
+ # Note: Assumes this is a sorted (newest->oldest) list of un-released versions
+ if branch == "master":
+ return versions[0]
+ else:
+ branch_ver = branch.replace("branch-", "")
+ return filter(lambda x: x.name.startswith(branch_ver), versions)[-1]
+
def resolve_jira(title, merge_branches, comment):
- asf_jira = jira.client.JIRA({'server': JIRA_API_BASE},
- basic_auth=(JIRA_USERNAME, JIRA_PASSWORD))
-
- default_jira_id = ""
- search = re.findall("SPARK-[0-9]{4,5}", title)
- if len(search) > 0:
- default_jira_id = search[0]
-
- jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id)
- if jira_id == "":
- jira_id = default_jira_id
-
- try:
- issue = asf_jira.issue(jira_id)
- except Exception as e:
- fail("ASF JIRA could not find %s\n%s" % (jira_id, e))
-
- cur_status = issue.fields.status.name
- cur_summary = issue.fields.summary
- cur_assignee = issue.fields.assignee
- if cur_assignee == None:
- cur_assignee = "NOT ASSIGNED!!!"
- else:
- cur_assignee = cur_assignee.displayName
-
- if cur_status == "Resolved" or cur_status == "Closed":
- fail("JIRA issue %s already has status '%s'" % (jira_id, cur_status))
- print ("=== JIRA %s ===" % jira_id)
- print ("summary\t\t%s\nassignee\t%s\nstatus\t\t%s\nurl\t\t%s/%s\n" % (
- cur_summary, cur_assignee, cur_status, JIRA_BASE, jira_id))
-
- versions = asf_jira.project_versions("SPARK")
- versions = sorted(versions, key = lambda x: x.name, reverse=True)
- versions = filter(lambda x: x.raw['released'] == False, versions)
-
- default_fix_versions = map(lambda x: fix_version_from_branch(x, versions).name, merge_branches)
- for v in default_fix_versions:
- # Handles the case where we have forked a release branch but not yet made the release.
- # In this case, if the PR is committed to the master branch and the release branch, we
- # only consider the release branch to be the fix version. E.g. it is not valid to have
- # both 1.1.0 and 1.0.0 as fix versions.
- (major, minor, patch) = v.split(".")
- if patch == "0":
- previous = "%s.%s.%s" % (major, int(minor) - 1, 0)
- if previous in default_fix_versions:
- default_fix_versions = filter(lambda x: x != v, default_fix_versions)
- default_fix_versions = ",".join(default_fix_versions)
-
- fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % default_fix_versions)
- if fix_versions == "":
- fix_versions = default_fix_versions
- fix_versions = fix_versions.replace(" ", "").split(",")
-
- def get_version_json(version_str):
- return filter(lambda v: v.name == version_str, versions)[0].raw
- jira_fix_versions = map(lambda v: get_version_json(v), fix_versions)
-
- resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0]
- asf_jira.transition_issue(jira_id, resolve["id"], fixVersions=jira_fix_versions, comment=comment)
-
- print "Succesfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)
+ asf_jira = jira.client.JIRA({'server': JIRA_API_BASE},
+ basic_auth=(JIRA_USERNAME, JIRA_PASSWORD))
+
+ default_jira_id = ""
+ search = re.findall("SPARK-[0-9]{4,5}", title)
+ if len(search) > 0:
+ default_jira_id = search[0]
+
+ jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id)
+ if jira_id == "":
+ jira_id = default_jira_id
+
+ try:
+ issue = asf_jira.issue(jira_id)
+ except Exception as e:
+ fail("ASF JIRA could not find %s\n%s" % (jira_id, e))
+
+ cur_status = issue.fields.status.name
+ cur_summary = issue.fields.summary
+ cur_assignee = issue.fields.assignee
+ if cur_assignee is None:
+ cur_assignee = "NOT ASSIGNED!!!"
+ else:
+ cur_assignee = cur_assignee.displayName
+
+ if cur_status == "Resolved" or cur_status == "Closed":
+ fail("JIRA issue %s already has status '%s'" % (jira_id, cur_status))
+ print ("=== JIRA %s ===" % jira_id)
+ print ("summary\t\t%s\nassignee\t%s\nstatus\t\t%s\nurl\t\t%s/%s\n" % (
+ cur_summary, cur_assignee, cur_status, JIRA_BASE, jira_id))
+
+ versions = asf_jira.project_versions("SPARK")
+ versions = sorted(versions, key=lambda x: x.name, reverse=True)
+ versions = filter(lambda x: x.raw['released'] is False, versions)
+
+ default_fix_versions = map(lambda x: fix_version_from_branch(x, versions).name, merge_branches)
+ for v in default_fix_versions:
+ # Handles the case where we have forked a release branch but not yet made the release.
+ # In this case, if the PR is committed to the master branch and the release branch, we
+ # only consider the release branch to be the fix version. E.g. it is not valid to have
+ # both 1.1.0 and 1.0.0 as fix versions.
+ (major, minor, patch) = v.split(".")
+ if patch == "0":
+ previous = "%s.%s.%s" % (major, int(minor) - 1, 0)
+ if previous in default_fix_versions:
+ default_fix_versions = filter(lambda x: x != v, default_fix_versions)
+ default_fix_versions = ",".join(default_fix_versions)
+
+ fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % default_fix_versions)
+ if fix_versions == "":
+ fix_versions = default_fix_versions
+ fix_versions = fix_versions.replace(" ", "").split(",")
+
+ def get_version_json(version_str):
+ return filter(lambda v: v.name == version_str, versions)[0].raw
+
+ jira_fix_versions = map(lambda v: get_version_json(v), fix_versions)
+
+ resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0]
+ asf_jira.transition_issue(
+ jira_id, resolve["id"], fixVersions=jira_fix_versions, comment=comment)
+
+ print "Succesfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)
+
branches = get_json("%s/branches" % GITHUB_API_BASE)
branch_names = filter(lambda x: x.startswith("branch-"), [x['name'] for x in branches])
@@ -273,28 +286,29 @@ def get_version_json(version_str):
base_ref = pr["head"]["ref"]
pr_repo_desc = "%s/%s" % (user_login, base_ref)
-if pr["merged"] == True:
- print "Pull request %s has already been merged, assuming you want to backport" % pr_num
- merge_commit_desc = run_cmd(['git', 'log', '--merges', '--first-parent',
- '--grep=pull request #%s' % pr_num, '--oneline']).split("\n")[0]
- if merge_commit_desc == "":
- fail("Couldn't find any merge commit for #%s, you may need to update HEAD." % pr_num)
+if pr["merged"] is True:
+ print "Pull request %s has already been merged, assuming you want to backport" % pr_num
+ merge_commit_desc = run_cmd([
+ 'git', 'log', '--merges', '--first-parent',
+ '--grep=pull request #%s' % pr_num, '--oneline']).split("\n")[0]
+ if merge_commit_desc == "":
+ fail("Couldn't find any merge commit for #%s, you may need to update HEAD." % pr_num)
+
+ merge_hash = merge_commit_desc[:7]
+ message = merge_commit_desc[8:]
- merge_hash = merge_commit_desc[:7]
- message = merge_commit_desc[8:]
-
- print "Found: %s" % message
- maybe_cherry_pick(pr_num, merge_hash, latest_branch)
- sys.exit(0)
+ print "Found: %s" % message
+ maybe_cherry_pick(pr_num, merge_hash, latest_branch)
+ sys.exit(0)
if not bool(pr["mergeable"]):
- msg = "Pull request %s is not mergeable in its current form.\n" % pr_num + \
- "Continue? (experts only!)"
- continue_maybe(msg)
+ msg = "Pull request %s is not mergeable in its current form.\n" % pr_num + \
+ "Continue? (experts only!)"
+ continue_maybe(msg)
print ("\n=== Pull Request #%s ===" % pr_num)
print ("title\t%s\nsource\t%s\ntarget\t%s\nurl\t%s" % (
- title, pr_repo_desc, target_ref, url))
+ title, pr_repo_desc, target_ref, url))
continue_maybe("Proceed with merging pull request #%s?" % pr_num)
merged_refs = [target_ref]
@@ -303,12 +317,12 @@ def get_version_json(version_str):
pick_prompt = "Would you like to pick %s into another branch?" % merge_hash
while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y":
- merged_refs = merged_refs + [cherry_pick(pr_num, merge_hash, latest_branch)]
+ merged_refs = merged_refs + [cherry_pick(pr_num, merge_hash, latest_branch)]
if JIRA_IMPORTED:
- continue_maybe("Would you like to update an associated JIRA?")
- jira_comment = "Issue resolved by pull request %s\n[%s/%s]" % (pr_num, GITHUB_BASE, pr_num)
- resolve_jira(title, merged_refs, jira_comment)
+ continue_maybe("Would you like to update an associated JIRA?")
+ jira_comment = "Issue resolved by pull request %s\n[%s/%s]" % (pr_num, GITHUB_BASE, pr_num)
+ resolve_jira(title, merged_refs, jira_comment)
else:
- print "Could not find jira-python library. Run 'sudo pip install jira-python' to install."
- print "Exiting without trying to close the associated JIRA."
+ print "Could not find jira-python library. Run 'sudo pip install jira-python' to install."
+ print "Exiting without trying to close the associated JIRA."
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index fb808129bb65d..4ba20e590f2c2 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -9,6 +9,11 @@
{{ page.title }} - Spark {{site.SPARK_VERSION_SHORT}} Documentation
+ {% if page.redirect %}
+
+
+ {% endif %}
+