Skip to content

Commit e3417aa

Browse files
committed
rebase with upstream
2 parents 402971c + d0b5633 commit e3417aa

File tree

47 files changed

+1527
-1400
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1527
-1400
lines changed

LICENSE

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,11 +250,11 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
250250
(Interpreter classes (all .scala files in repl/src/main/scala
251251
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
252252
and for SerializableMapWrapper in JavaUtils.scala)
253-
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.10.4 - http://www.scala-lang.org/)
254-
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.10.4 - http://www.scala-lang.org/)
255-
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.10.4 - http://www.scala-lang.org/)
256-
(BSD-like) Scala Library (org.scala-lang:scala-library:2.10.4 - http://www.scala-lang.org/)
257-
(BSD-like) Scalap (org.scala-lang:scalap:2.10.4 - http://www.scala-lang.org/)
253+
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.10.5 - http://www.scala-lang.org/)
254+
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.10.5 - http://www.scala-lang.org/)
255+
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.10.5 - http://www.scala-lang.org/)
256+
(BSD-like) Scala Library (org.scala-lang:scala-library:2.10.5 - http://www.scala-lang.org/)
257+
(BSD-like) Scalap (org.scala-lang:scalap:2.10.5 - http://www.scala-lang.org/)
258258
(BSD-style) scalacheck (org.scalacheck:scalacheck_2.10:1.10.0 - http://www.scalacheck.org)
259259
(BSD-style) spire (org.spire-math:spire_2.10:0.7.1 - http://spire-math.org)
260260
(BSD-style) spire-macros (org.spire-math:spire-macros_2.10:0.7.1 - http://spire-math.org)

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,8 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
254254
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
255255
final boolean fastMergeEnabled =
256256
sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
257-
final boolean fastMergeIsSupported =
258-
!compressionEnabled || compressionCodec instanceof LZFCompressionCodec;
257+
final boolean fastMergeIsSupported = !compressionEnabled ||
258+
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
259259
try {
260260
if (spills.length == 0) {
261261
new FileOutputStream(outputFile).close(); // Create an empty file

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 15 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.spark.unsafe.Platform;
3636
import org.apache.spark.unsafe.array.ByteArrayMethods;
3737
import org.apache.spark.unsafe.array.LongArray;
38-
import org.apache.spark.unsafe.bitset.BitSet;
3938
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
4039
import org.apache.spark.unsafe.memory.MemoryBlock;
4140
import org.apache.spark.unsafe.memory.MemoryLocation;
@@ -123,12 +122,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
123122
*/
124123
private boolean canGrowArray = true;
125124

126-
/**
127-
* A {@link BitSet} used to track location of the map where the key is set.
128-
* Size of the bitset should be half of the size of the long array.
129-
*/
130-
@Nullable private BitSet bitset;
131-
132125
private final double loadFactor;
133126

134127
/**
@@ -427,7 +420,6 @@ public Location lookup(Object keyBase, long keyOffset, int keyLength) {
427420
* This is a thread-safe version of `lookup`, could be used by multiple threads.
428421
*/
429422
public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc) {
430-
assert(bitset != null);
431423
assert(longArray != null);
432424

433425
if (enablePerfMetrics) {
@@ -440,7 +432,7 @@ public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location l
440432
if (enablePerfMetrics) {
441433
numProbes++;
442434
}
443-
if (!bitset.isSet(pos)) {
435+
if (longArray.get(pos * 2) == 0) {
444436
// This is a new key.
445437
loc.with(pos, hashcode, false);
446438
return;
@@ -644,7 +636,6 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
644636
assert (!isDefined) : "Can only set value once for a key";
645637
assert (keyLength % 8 == 0);
646638
assert (valueLength % 8 == 0);
647-
assert(bitset != null);
648639
assert(longArray != null);
649640

650641
if (numElements == MAX_CAPACITY || !canGrowArray) {
@@ -678,7 +669,6 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
678669
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
679670
pageCursor += recordLength;
680671
numElements++;
681-
bitset.set(pos);
682672
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
683673
currentPage, recordOffset);
684674
longArray.set(pos * 2, storedKeyAddress);
@@ -734,7 +724,6 @@ private void allocate(int capacity) {
734724
assert (capacity <= MAX_CAPACITY);
735725
acquireMemory(capacity * 16);
736726
longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2]));
737-
bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
738727

739728
this.growthThreshold = (int) (capacity * loadFactor);
740729
this.mask = capacity - 1;
@@ -749,7 +738,6 @@ public void freeArray() {
749738
long used = longArray.memoryBlock().size();
750739
longArray = null;
751740
releaseMemory(used);
752-
bitset = null;
753741
}
754742
}
755743

@@ -795,9 +783,7 @@ public long getTotalMemoryConsumption() {
795783
for (MemoryBlock dataPage : dataPages) {
796784
totalDataPagesSize += dataPage.size();
797785
}
798-
return totalDataPagesSize +
799-
((bitset != null) ? bitset.memoryBlock().size() : 0L) +
800-
((longArray != null) ? longArray.memoryBlock().size() : 0L);
786+
return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L);
801787
}
802788

803789
private void updatePeakMemoryUsed() {
@@ -852,7 +838,6 @@ public int getNumDataPages() {
852838
*/
853839
@VisibleForTesting
854840
void growAndRehash() {
855-
assert(bitset != null);
856841
assert(longArray != null);
857842

858843
long resizeStartTime = -1;
@@ -861,39 +846,26 @@ void growAndRehash() {
861846
}
862847
// Store references to the old data structures to be used when we re-hash
863848
final LongArray oldLongArray = longArray;
864-
final BitSet oldBitSet = bitset;
865-
final int oldCapacity = (int) oldBitSet.capacity();
849+
final int oldCapacity = (int) oldLongArray.size() / 2;
866850

867851
// Allocate the new data structures
868-
try {
869-
allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
870-
} catch (OutOfMemoryError oom) {
871-
longArray = oldLongArray;
872-
bitset = oldBitSet;
873-
throw oom;
874-
}
852+
allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
875853

876854
// Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
877-
for (int pos = oldBitSet.nextSetBit(0); pos >= 0; pos = oldBitSet.nextSetBit(pos + 1)) {
878-
final long keyPointer = oldLongArray.get(pos * 2);
879-
final int hashcode = (int) oldLongArray.get(pos * 2 + 1);
855+
for (int i = 0; i < oldLongArray.size(); i += 2) {
856+
final long keyPointer = oldLongArray.get(i);
857+
if (keyPointer == 0) {
858+
continue;
859+
}
860+
final int hashcode = (int) oldLongArray.get(i + 1);
880861
int newPos = hashcode & mask;
881862
int step = 1;
882-
boolean keepGoing = true;
883-
884-
// No need to check for equality here when we insert so this has one less if branch than
885-
// the similar code path in addWithoutResize.
886-
while (keepGoing) {
887-
if (!bitset.isSet(newPos)) {
888-
bitset.set(newPos);
889-
longArray.set(newPos * 2, keyPointer);
890-
longArray.set(newPos * 2 + 1, hashcode);
891-
keepGoing = false;
892-
} else {
893-
newPos = (newPos + step) & mask;
894-
step++;
895-
}
863+
while (longArray.get(newPos * 2) != 0) {
864+
newPos = (newPos + step) & mask;
865+
step++;
896866
}
867+
longArray.set(newPos * 2, keyPointer);
868+
longArray.set(newPos * 2 + 1, hashcode);
897869
}
898870
releaseMemory(oldLongArray.memoryBlock().size());
899871

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,14 @@ public void closeCurrentPage() {
157157
*/
158158
@Override
159159
public long spill(long size, MemoryConsumer trigger) throws IOException {
160+
assert(inMemSorter != null);
160161
if (trigger != this) {
161162
if (readingIterator != null) {
162163
return readingIterator.spill();
164+
} else {
165+
163166
}
164-
return 0L;
167+
return 0L; // this should throw exception
165168
}
166169

167170
if (inMemSorter == null || inMemSorter.numRecords() <= 0) {

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ trait CompressionCodec {
4747
private[spark] object CompressionCodec {
4848

4949
private val configKey = "spark.io.compression.codec"
50+
51+
private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
52+
codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
53+
}
54+
5055
private val shortCompressionCodecNames = Map(
5156
"lz4" -> classOf[LZ4CompressionCodec].getName,
5257
"lzf" -> classOf[LZFCompressionCodec].getName,

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -947,7 +947,13 @@ class DAGScheduler(
947947
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
948948
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
949949
// event.
950-
outputCommitCoordinator.stageStart(stage.id)
950+
stage match {
951+
case s: ShuffleMapStage =>
952+
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
953+
case s: ResultStage =>
954+
outputCommitCoordinator.stageStart(
955+
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
956+
}
951957
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
952958
stage match {
953959
case s: ShuffleMapStage =>

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
4747
private type PartitionId = Int
4848
private type TaskAttemptNumber = Int
4949

50+
private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1
51+
5052
/**
5153
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
5254
* output for that partition.
@@ -56,9 +58,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
5658
*
5759
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
5860
*/
59-
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
60-
private type CommittersByStageMap =
61-
mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
61+
private val authorizedCommittersByStage = mutable.Map[StageId, Array[TaskAttemptNumber]]()
6262

6363
/**
6464
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
@@ -95,9 +95,21 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
9595
}
9696
}
9797

98-
// Called by DAGScheduler
99-
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
100-
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
98+
/**
99+
* Called by the DAGScheduler when a stage starts.
100+
*
101+
* @param stage the stage id.
102+
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
103+
* the maximum possible value of `context.partitionId`).
104+
*/
105+
private[scheduler] def stageStart(
106+
stage: StageId,
107+
maxPartitionId: Int): Unit = {
108+
val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
109+
java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
110+
synchronized {
111+
authorizedCommittersByStage(stage) = arr
112+
}
101113
}
102114

103115
// Called by DAGScheduler
@@ -122,10 +134,10 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
122134
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
123135
s"attempt: $attemptNumber")
124136
case otherReason =>
125-
if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
137+
if (authorizedCommitters(partition) == attemptNumber) {
126138
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
127139
s"partition=$partition) failed; clearing lock")
128-
authorizedCommitters.remove(partition)
140+
authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
129141
}
130142
}
131143
}
@@ -145,16 +157,16 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
145157
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
146158
authorizedCommittersByStage.get(stage) match {
147159
case Some(authorizedCommitters) =>
148-
authorizedCommitters.get(partition) match {
149-
case Some(existingCommitter) =>
150-
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
151-
s"partition=$partition; existingCommitter = $existingCommitter")
152-
false
153-
case None =>
160+
authorizedCommitters(partition) match {
161+
case NO_AUTHORIZED_COMMITTER =>
154162
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
155163
s"partition=$partition")
156164
authorizedCommitters(partition) = attemptNumber
157165
true
166+
case existingCommitter =>
167+
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
168+
s"partition=$partition; existingCommitter = $existingCommitter")
169+
false
158170
}
159171
case None =>
160172
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +

core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,10 @@ class CompressionCodecSuite extends SparkFunSuite {
100100
testCodec(codec)
101101
}
102102

103-
test("snappy does not support concatenation of serialized streams") {
103+
test("snappy supports concatenation of serialized streams") {
104104
val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
105105
assert(codec.getClass === classOf[SnappyCompressionCodec])
106-
intercept[Exception] {
107-
testConcatenationOfSerializedStreams(codec)
108-
}
106+
testConcatenationOfSerializedStreams(codec)
109107
}
110108

111109
test("bad compression codec") {

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
171171
val partition: Int = 2
172172
val authorizedCommitter: Int = 3
173173
val nonAuthorizedCommitter: Int = 100
174-
outputCommitCoordinator.stageStart(stage)
174+
outputCommitCoordinator.stageStart(stage, maxPartitionId = 2)
175175

176176
assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
177177
assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))

dev/audit-release/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ run them locally by setting appropriate environment variables.
44

55
```
66
$ cd sbt_app_core
7-
$ SCALA_VERSION=2.10.4 \
7+
$ SCALA_VERSION=2.10.5 \
88
SPARK_VERSION=1.0.0-SNAPSHOT \
99
SPARK_RELEASE_REPOSITORY=file:///home/patrick/.ivy2/local \
1010
sbt run

0 commit comments

Comments
 (0)