Skip to content

Commit 43717de

Browse files
committed
[SPARK-25704][CORE] Allocate a bit less than Int.MaxValue
JVMs don't you allocate arrays of length exactly Int.MaxValue, so leave a little extra room. This is necessary when reading blocks >2GB off the network (for remote reads or for cache replication). Unit tests via jenkins, ran a test with blocks over 2gb on a cluster Closes #22705 from squito/SPARK-25704. Authored-by: Imran Rashid <[email protected]> Signed-off-by: Imran Rashid <[email protected]>
1 parent 1301217 commit 43717de

File tree

2 files changed

+11
-11
lines changed

2 files changed

+11
-11
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,6 @@ private[spark] class BlockManager(
133133

134134
private[spark] val externalShuffleServiceEnabled =
135135
conf.get(config.SHUFFLE_SERVICE_ENABLED)
136-
private val chunkSize =
137-
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt
138136
private val remoteReadNioBufferConversion =
139137
conf.getBoolean("spark.network.remoteReadNioBufferConversion", false)
140138

@@ -451,7 +449,7 @@ private[spark] class BlockManager(
451449
new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)
452450

453451
case None =>
454-
ChunkedByteBuffer.fromFile(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
452+
ChunkedByteBuffer.fromFile(tmpFile)
455453
}
456454
putBytes(blockId, buffer, level)(classTag)
457455
tmpFile.delete()
@@ -797,7 +795,7 @@ private[spark] class BlockManager(
797795
if (remoteReadNioBufferConversion) {
798796
return Some(new ChunkedByteBuffer(data.nioByteBuffer()))
799797
} else {
800-
return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize))
798+
return Some(ChunkedByteBuffer.fromManagedBuffer(data))
801799
}
802800
}
803801
logDebug(s"The value of block $blockId is null")

core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.internal.config
3030
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
3131
import org.apache.spark.network.util.{ByteArrayWritableChannel, LimitedInputStream}
3232
import org.apache.spark.storage.StorageUtils
33+
import org.apache.spark.unsafe.array.ByteArrayMethods
3334
import org.apache.spark.util.Utils
3435

3536
/**
@@ -169,24 +170,25 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
169170

170171
}
171172

172-
object ChunkedByteBuffer {
173+
private[spark] object ChunkedByteBuffer {
174+
175+
173176
// TODO eliminate this method if we switch BlockManager to getting InputStreams
174-
def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = {
177+
def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = {
175178
data match {
176179
case f: FileSegmentManagedBuffer =>
177-
fromFile(f.getFile, maxChunkSize, f.getOffset, f.getLength)
180+
fromFile(f.getFile, f.getOffset, f.getLength)
178181
case other =>
179182
new ChunkedByteBuffer(other.nioByteBuffer())
180183
}
181184
}
182185

183-
def fromFile(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
184-
fromFile(file, maxChunkSize, 0, file.length())
186+
def fromFile(file: File): ChunkedByteBuffer = {
187+
fromFile(file, 0, file.length())
185188
}
186189

187190
private def fromFile(
188191
file: File,
189-
maxChunkSize: Int,
190192
offset: Long,
191193
length: Long): ChunkedByteBuffer = {
192194
// We do *not* memory map the file, because we may end up putting this into the memory store,
@@ -195,7 +197,7 @@ object ChunkedByteBuffer {
195197
val is = new FileInputStream(file)
196198
ByteStreams.skipFully(is, offset)
197199
val in = new LimitedInputStream(is, length)
198-
val chunkSize = math.min(maxChunkSize, length).toInt
200+
val chunkSize = math.min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, length).toInt
199201
val out = new ChunkedByteBufferOutputStream(chunkSize, ByteBuffer.allocate _)
200202
Utils.tryWithSafeFinally {
201203
IOUtils.copy(in, out)

0 commit comments

Comments
 (0)