Skip to content

Commit 03180a4

Browse files
committed
Merge branch 'master' of github.com:nchammas/spark
2 parents d4c5f43 + 0d1cc4a commit 03180a4

File tree

81 files changed

+1455
-317
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+1455
-317
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.1.0-SNAPSHOT</version>
24+
<version>1.2.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.1.0-SNAPSHOT</version>
24+
<version>1.2.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.1.0-SNAPSHOT</version>
24+
<version>1.2.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD
4949
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
5050
import org.apache.spark.scheduler.local.LocalBackend
5151
import org.apache.spark.storage._
52+
import org.apache.spark.SPARK_VERSION
5253
import org.apache.spark.ui.SparkUI
5354
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
5455

@@ -825,7 +826,7 @@ class SparkContext(config: SparkConf) extends Logging {
825826
}
826827

827828
/** The version of Spark on which this application is running. */
828-
def version = SparkContext.SPARK_VERSION
829+
def version = SPARK_VERSION
829830

830831
/**
831832
* Return a map from the slave to the max memory available for caching and the remaining
@@ -1297,8 +1298,6 @@ class SparkContext(config: SparkConf) extends Logging {
12971298
*/
12981299
object SparkContext extends Logging {
12991300

1300-
private[spark] val SPARK_VERSION = "1.2.0-SNAPSHOT"
1301-
13021301
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
13031302

13041303
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ object SparkEnv extends Logging {
217217
val shortShuffleMgrNames = Map(
218218
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
219219
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
220-
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
220+
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
221221
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
222222
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
223223

core/src/main/scala/org/apache/spark/package.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,5 @@ package org.apache
4444

4545
package object spark {
4646
// For package docs only
47+
val SPARK_VERSION = "1.2.0-SNAPSHOT"
4748
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,15 +1127,19 @@ abstract class RDD[T: ClassTag](
11271127
* @return an array of top elements
11281128
*/
11291129
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
1130-
mapPartitions { items =>
1131-
// Priority keeps the largest elements, so let's reverse the ordering.
1132-
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
1133-
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
1134-
Iterator.single(queue)
1135-
}.reduce { (queue1, queue2) =>
1136-
queue1 ++= queue2
1137-
queue1
1138-
}.toArray.sorted(ord)
1130+
if (num == 0) {
1131+
Array.empty
1132+
} else {
1133+
mapPartitions { items =>
1134+
// Priority keeps the largest elements, so let's reverse the ordering.
1135+
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
1136+
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
1137+
Iterator.single(queue)
1138+
}.reduce { (queue1, queue2) =>
1139+
queue1 ++= queue2
1140+
queue1
1141+
}.toArray.sorted(ord)
1142+
}
11391143
}
11401144

11411145
/**

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,9 @@ class DAGScheduler(
241241
callSite: CallSite)
242242
: Stage =
243243
{
244+
val parentStages = getParentStages(rdd, jobId)
244245
val id = nextStageId.getAndIncrement()
245-
val stage =
246-
new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
246+
val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
247247
stageIdToStage(id) = stage
248248
updateJobIdStageIdMaps(jobId, stage)
249249
stage

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.json4s.jackson.JsonMethods._
2929
import org.apache.spark.{Logging, SparkConf, SparkContext}
3030
import org.apache.spark.deploy.SparkHadoopUtil
3131
import org.apache.spark.io.CompressionCodec
32+
import org.apache.spark.SPARK_VERSION
3233
import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
3334

3435
/**
@@ -86,7 +87,7 @@ private[spark] class EventLoggingListener(
8687
sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
8788
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
8889
}
89-
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
90+
logger.newFile(SPARK_VERSION_PREFIX + SPARK_VERSION)
9091
logger.newFile(LOG_PREFIX + logger.fileIndex)
9192
}
9293

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import org.scalatest.BeforeAndAfterAll
21+
22+
class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
23+
24+
// This test suite should run all tests in ShuffleSuite with hash-based shuffle.
25+
26+
override def beforeAll() {
27+
System.setProperty("spark.shuffle.manager", "hash")
28+
}
29+
30+
override def afterAll() {
31+
System.clearProperty("spark.shuffle.manager")
32+
}
33+
}

0 commit comments

Comments
 (0)