Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private[spark] object FallbackStorage extends Logging {
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
f.seek(offset)
f.read(array)
f.readFully(array)
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
f.close()
new NioManagedBuffer(ByteBuffer.wrap(array))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/
package org.apache.spark.storage

import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.io.{DataOutputStream, File, FileOutputStream, InputStream, IOException}
import java.nio.file.Files

import scala.concurrent.duration._
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
Expand Down Expand Up @@ -107,6 +109,49 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
}

test("SPARK-39200: fallback storage APIs - readFully") {
val conf = new SparkConf(false)
.set("spark.app.id", "testId")
.set("spark.hadoop.fs.file.impl", classOf[ReadPartialFileSystem].getName)
.set(SHUFFLE_COMPRESS, false)
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
"file://" + Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
val fallbackStorage = new FallbackStorage(conf)
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)

val bm = mock(classOf[BlockManager])
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
when(bm.diskBlockManager).thenReturn(dbm)
when(bm.master).thenReturn(bmm)
val resolver = new IndexShuffleBlockResolver(conf, bm)
when(bm.migratableResolver).thenReturn(resolver)

val length = 100000
val content = new Array[Byte](length)
Random.nextBytes(content)

val indexFile = resolver.getIndexFile(1, 2L)
tryWithResource(new FileOutputStream(indexFile)) { fos =>
val dos = new DataOutputStream(fos)
dos.writeLong(0)
dos.writeLong(length)
}

val dataFile = resolver.getDataFile(1, 2L)
tryWithResource(new FileOutputStream(dataFile)) { fos =>
fos.write(content)
}

fallbackStorage.copy(ShuffleBlockInfo(1, 2L), bm)

assert(fallbackStorage.exists(1, ShuffleIndexBlockId(1, 2L, NOOP_REDUCE_ID).name))
assert(fallbackStorage.exists(1, ShuffleDataBlockId(1, 2L, NOOP_REDUCE_ID).name))

val readResult = FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
assert(readResult.nioByteBuffer().array().sameElements(content))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not checking for readFully and would work even for read, depending on whether the read ends up satisfying the request or not (We are relying on what the buffer size might be internally, which is subject to change).
As in, the test could work even without the fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a mock filesystem to read partially on read calls so we can test this behavior.

}

test("SPARK-34142: fallback storage API - cleanUp") {
withTempDir { dir =>
Seq(true, false).foreach { cleanUp =>
Expand Down Expand Up @@ -289,3 +334,46 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}
}
}
class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream
with Seekable with PositionedReadable {
override def read: Int = in.read

override def read(b: Array[Byte], off: Int, len: Int): Int = {
if (len > 1) {
in.read(b, off, len - 1)
} else {
in.read(b, off, len)
}
}

override def seek(pos: Long): Unit = {
in.seek(pos)
}

override def getPos: Long = in.getPos

override def seekToNewSource(targetPos: Long): Boolean = in.seekToNewSource(targetPos)

override def read(position: Long, buffer: Array[Byte], offset: Int, length: Int): Int = {
if (length > 1) {
in.read(position, buffer, offset, length - 1)
} else {
in.read(position, buffer, offset, length)
}
}

override def readFully(position: Long, buffer: Array[Byte], offset: Int, length: Int): Unit = {
in.readFully(position, buffer, offset, length)
}

override def readFully(position: Long, buffer: Array[Byte]): Unit = {
in.readFully(position, buffer)
}
}

class ReadPartialFileSystem extends LocalFileSystem {
override def open(f: Path): FSDataInputStream = {
val stream = super.open(f)
new FSDataInputStream(new ReadPartialInputStream(stream))
}
}