Skip to content

Commit bfe40f7

Browse files
committed
Revert "In memory shuffle (cherry-picked from amplab/graphx#135)"
This reverts commit 5ec645d. Conflicts: graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
1 parent ad6590c commit bfe40f7

File tree

6 files changed

+20
-47
lines changed

6 files changed

+20
-47
lines changed

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import org.apache.spark.rdd.RDDCheckpointData
2929
import org.apache.spark.serializer.Serializer
3030
import org.apache.spark.storage._
3131
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
32-
import java.nio.ByteBuffer
3332

3433
private[spark] object ShuffleMapTask {
3534

@@ -169,11 +168,7 @@ private[spark] class ShuffleMapTask(
169168
var totalBytes = 0L
170169
var totalTime = 0L
171170
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
172-
// writer.commit()
173-
val bytes = writer.commit()
174-
if (bytes != null) {
175-
blockManager.putBytes(writer.blockId, ByteBuffer.wrap(bytes), StorageLevel.MEMORY_ONLY_SER, tellMaster = false)
176-
}
171+
writer.commit()
177172
writer.close()
178173
val size = writer.fileSegment().length
179174
totalBytes += size

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private[spark] class BlockManager(
5757

5858
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
5959

60-
private[storage] val memoryStore = new MemoryStore(this, maxMemory)
60+
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
6161
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
6262

6363
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
@@ -293,7 +293,7 @@ private[spark] class BlockManager(
293293
* never deletes (recent) items.
294294
*/
295295
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
296-
memoryStore.getValues(blockId, serializer).orElse(
296+
diskStore.getValues(blockId, serializer).orElse(
297297
sys.error("Block " + blockId + " not found on disk, though it should be"))
298298
}
299299

@@ -313,7 +313,7 @@ private[spark] class BlockManager(
313313
// As an optimization for map output fetches, if the block is for a shuffle, return it
314314
// without acquiring a lock; the disk store never deletes (recent) items so this should work
315315
if (blockId.isShuffle) {
316-
memoryStore.getBytes(blockId) match {
316+
diskStore.getBytes(blockId) match {
317317
case Some(bytes) =>
318318
Some(bytes)
319319
case None =>
@@ -831,7 +831,7 @@ private[spark] class BlockManager(
831831
if (info != null) info.synchronized {
832832
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
833833
val removedFromMemory = memoryStore.remove(blockId)
834-
val removedFromDisk = false //diskStore.remove(blockId)
834+
val removedFromDisk = diskStore.remove(blockId)
835835
if (!removedFromMemory && !removedFromDisk) {
836836
logWarning("Block " + blockId + " could not be removed as it was not found in either " +
837837
"the disk or memory store")

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.storage
1919

20-
import java.io.{ByteArrayOutputStream, FileOutputStream, File, OutputStream}
20+
import java.io.{FileOutputStream, File, OutputStream}
2121
import java.nio.channels.FileChannel
2222

2323
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
@@ -44,7 +44,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
4444
* Flush the partial writes and commit them as a single atomic block. Return the
4545
* number of bytes written for this commit.
4646
*/
47-
def commit(): Array[Byte]
47+
def commit(): Long
4848

4949
/**
5050
* Reverts writes that haven't been flushed yet. Callers should invoke this function
@@ -106,7 +106,7 @@ private[spark] class DiskBlockObjectWriter(
106106
/** The file channel, used for repositioning / truncating the file. */
107107
private var channel: FileChannel = null
108108
private var bs: OutputStream = null
109-
private var fos: ByteArrayOutputStream = null
109+
private var fos: FileOutputStream = null
110110
private var ts: TimeTrackingOutputStream = null
111111
private var objOut: SerializationStream = null
112112
private val initialPosition = file.length()
@@ -115,8 +115,9 @@ private[spark] class DiskBlockObjectWriter(
115115
private var _timeWriting = 0L
116116

117117
override def open(): BlockObjectWriter = {
118-
fos = new ByteArrayOutputStream()
118+
fos = new FileOutputStream(file, true)
119119
ts = new TimeTrackingOutputStream(fos)
120+
channel = fos.getChannel()
120121
lastValidPosition = initialPosition
121122
bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
122123
objOut = serializer.newInstance().serializeStream(bs)
@@ -129,6 +130,9 @@ private[spark] class DiskBlockObjectWriter(
129130
if (syncWrites) {
130131
// Force outstanding writes to disk and track how long it takes
131132
objOut.flush()
133+
val start = System.nanoTime()
134+
fos.getFD.sync()
135+
_timeWriting += System.nanoTime() - start
132136
}
133137
objOut.close()
134138

@@ -145,18 +149,18 @@ private[spark] class DiskBlockObjectWriter(
145149

146150
override def isOpen: Boolean = objOut != null
147151

148-
override def commit(): Array[Byte] = {
152+
override def commit(): Long = {
149153
if (initialized) {
150154
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
151155
// serializer stream and the lower level stream.
152156
objOut.flush()
153157
bs.flush()
154158
val prevPos = lastValidPosition
155-
lastValidPosition = fos.size()
156-
fos.toByteArray
159+
lastValidPosition = channel.position()
160+
lastValidPosition - prevPos
157161
} else {
158162
// lastValidPosition is zero if stream is uninitialized
159-
null
163+
lastValidPosition
160164
}
161165
}
162166

@@ -166,7 +170,7 @@ private[spark] class DiskBlockObjectWriter(
166170
// truncate the file to the last valid position.
167171
objOut.flush()
168172
bs.flush()
169-
throw new UnsupportedOperationException("Revert temporarily broken due to in memory shuffle code changes.")
173+
channel.truncate(lastValidPosition)
170174
}
171175
}
172176

@@ -178,7 +182,7 @@ private[spark] class DiskBlockObjectWriter(
178182
}
179183

180184
override def fileSegment(): FileSegment = {
181-
new FileSegment(null, initialPosition, bytesWritten)
185+
new FileSegment(file, initialPosition, bytesWritten)
182186
}
183187

184188
// Only valid if called after close()

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.util.LinkedHashMap
2323
import scala.collection.mutable.ArrayBuffer
2424

2525
import org.apache.spark.util.{SizeEstimator, Utils}
26-
import org.apache.spark.serializer.Serializer
2726

2827
/**
2928
* Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
@@ -120,14 +119,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
120119
}
121120
}
122121

123-
/**
124-
* A version of getValues that allows a custom serializer. This is used as part of the
125-
* shuffle short-circuit code.
126-
*/
127-
def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
128-
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
129-
}
130-
131122
override def remove(blockId: BlockId): Boolean = {
132123
entries.synchronized {
133124
val entry = entries.remove(blockId)

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -187,17 +187,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
187187
}
188188
})
189189
}
190-
191-
def removeAllShuffleStuff() {
192-
for (state <- shuffleStates.values;
193-
group <- state.allFileGroups;
194-
(mapId, _) <- group.mapIdToIndex.iterator;
195-
reducerId <- 0 until group.files.length) {
196-
val blockId = new ShuffleBlockId(group.shuffleId, mapId, reducerId)
197-
blockManager.removeBlock(blockId, tellMaster = false)
198-
}
199-
shuffleStates.clear()
200-
}
201190
}
202191

203192
private[spark]
@@ -211,7 +200,7 @@ object ShuffleBlockManager {
211200
* Stores the absolute index of each mapId in the files of this group. For instance,
212201
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
213202
*/
214-
val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
203+
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
215204

216205
/**
217206
* Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.

graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.graphx
1919

2020
import scala.reflect.ClassTag
2121
import org.apache.spark.Logging
22-
import org.apache.spark.SparkEnv
2322

2423

2524
/**
@@ -147,11 +146,6 @@ object Pregel extends Logging {
147146
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
148147
activeMessages = messages.count()
149148

150-
// Very ugly code to clear the in-memory shuffle data
151-
messages.foreachPartition { iter =>
152-
SparkEnv.get.blockManager.shuffleBlockManager.removeAllShuffleStuff()
153-
}
154-
155149
logWarning("Pregel finished iteration " + i)
156150

157151
// Unpersist the RDDs hidden by newly-materialized RDDs

0 commit comments

Comments
 (0)