From ff8e8bb5f7f78718a22425f2c84dbebe493c4ff6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Sat, 27 Apr 2019 23:01:02 +0200 Subject: [PATCH 01/11] initial version --- .../network/shuffle/ExecutorDiskReader.java | 65 ++++++++++++++++++ .../shuffle/ExternalShuffleBlockResolver.java | 44 ++---------- .../ExternalShuffleBlockResolverSuite.java | 2 +- .../shuffle/TestShuffleDataContext.java | 7 +- .../apache/spark/storage/BlockManager.scala | 67 +++++++++++++++---- .../spark/storage/BlockManagerMaster.scala | 12 ++-- .../storage/BlockManagerMasterEndpoint.scala | 24 ++++--- .../spark/storage/BlockManagerMessages.scala | 9 ++- .../spark/storage/DiskBlockManager.scala | 5 +- .../spark/storage/BlockManagerSuite.scala | 39 +++++++++-- 10 files changed, 196 insertions(+), 78 deletions(-) create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java new file mode 100644 index 0000000000000..0d3363a531f62 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.spark.network.util.JavaUtils; + +import java.io.File; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ExecutorDiskReader { + + private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}"); + + /** + * Hashes a filename into the corresponding local directory, in a manner consistent with + * Spark's DiskBlockManager.getFile(). + */ + public static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) { + int hash = JavaUtils.nonNegativeHash(filename); + String localDir = localDirs[hash % localDirs.length]; + int subDirId = (hash / localDirs.length) % subDirsPerLocalDir; + return new File(createNormalizedInternedPathname( + localDir, String.format("%02x", subDirId), filename)); + } + + /** + * This method is needed to avoid the situation when multiple File instances for the + * same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String. + * According to measurements, in some scenarios such duplicate strings may waste a lot + * of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that + * we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise, + * the internal code in java.io.File would normalize it later, creating a new "foo/bar" + * String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File + * uses, since it is in the package-private class java.io.FileSystem. + */ + @VisibleForTesting + static String createNormalizedInternedPathname(String dir1, String dir2, String fname) { + String pathname = dir1 + File.separator + dir2 + File.separator + fname; + Matcher m = MULTIPLE_SEPARATORS.matcher(pathname); + pathname = m.replaceAll("/"); + // A single trailing slash needs to be taken care of separately + if (pathname.length() > 1 && pathname.endsWith("/")) { + pathname = pathname.substring(0, pathname.length() - 1); + } + return pathname.intern(); + } + +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 87e6fe12b59fa..8804250093ace 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -298,7 +298,7 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) { */ private ManagedBuffer getSortBasedShuffleBlockData( ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { - File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, + File indexFile = ExecutorDiskReader.getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.index"); try { @@ -306,7 +306,7 @@ private ManagedBuffer getSortBasedShuffleBlockData( ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId); return new FileSegmentManagedBuffer( conf, - getFile(executor.localDirs, executor.subDirsPerLocalDir, + ExecutorDiskReader.getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.data"), shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength()); @@ -317,7 +317,7 @@ private ManagedBuffer getSortBasedShuffleBlockData( public ManagedBuffer getDiskPersistedRddBlockData( ExecutorShuffleInfo executor, int rddId, int splitIndex) { - File file = getFile(executor.localDirs, executor.subDirsPerLocalDir, + File file = ExecutorDiskReader.getFile(executor.localDirs, executor.subDirsPerLocalDir, "rdd_" + rddId + "_" + splitIndex); long fileLength = file.length(); ManagedBuffer res = null; @@ -327,19 +327,6 @@ public ManagedBuffer getDiskPersistedRddBlockData( return res; } - /** - * Hashes a filename into the corresponding local directory, in a manner consistent with - * Spark's DiskBlockManager.getFile(). - */ - @VisibleForTesting - static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) { - int hash = JavaUtils.nonNegativeHash(filename); - String localDir = localDirs[hash % localDirs.length]; - int subDirId = (hash / localDirs.length) % subDirsPerLocalDir; - return new File(createNormalizedInternedPathname( - localDir, String.format("%02x", subDirId), filename)); - } - void close() { if (db != null) { try { @@ -350,28 +337,6 @@ void close() { } } - /** - * This method is needed to avoid the situation when multiple File instances for the - * same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String. - * According to measurements, in some scenarios such duplicate strings may waste a lot - * of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that - * we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise, - * the internal code in java.io.File would normalize it later, creating a new "foo/bar" - * String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File - * uses, since it is in the package-private class java.io.FileSystem. - */ - @VisibleForTesting - static String createNormalizedInternedPathname(String dir1, String dir2, String fname) { - String pathname = dir1 + File.separator + dir2 + File.separator + fname; - Matcher m = MULTIPLE_SEPARATORS.matcher(pathname); - pathname = m.replaceAll("/"); - // A single trailing slash needs to be taken care of separately - if (pathname.length() > 1 && pathname.endsWith("/")) { - pathname = pathname.substring(0, pathname.length() - 1); - } - return pathname.intern(); - } - public int removeBlocks(String appId, String execId, String[] blockIds) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { @@ -380,7 +345,8 @@ public int removeBlocks(String appId, String execId, String[] blockIds) { } int numRemovedBlocks = 0; for (String blockId : blockIds) { - File file = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); + File file = + ExecutorDiskReader.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); if (file.delete()) { numRemovedBlocks++; } else { diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 459629c5f05fe..4994c4eb5bcae 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -149,7 +149,7 @@ public void testNormalizeAndInternPathname() { private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) { String normPathname = - ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3); + ExecutorDiskReader.createNormalizedInternedPathname(p1, p2, p3); assertEquals(expectedPathname, normPathname); File file = new File(normPathname); String returnedPath = file.getPath(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 10be95ec50c38..914d9edfef0f3 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -76,9 +76,9 @@ public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) thr try { dataStream = new FileOutputStream( - ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); + ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); indexStream = new DataOutputStream(new FileOutputStream( - ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index"))); + ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, blockId + ".index"))); long offset = 0; indexStream.writeLong(offset); @@ -124,7 +124,8 @@ private void insertFile(String filename, byte[] block) throws IOException { File file = ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, filename); assert(!file.exists()) : "this test file has been already generated"; try { - dataStream = new FileOutputStream(file); + dataStream = new FileOutputStream( + ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, filename)); dataStream.write(block); } finally { Closeables.close(dataStream, false); diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6dbef784fe0c0..f7ce984905bd6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -43,7 +43,7 @@ import org.apache.spark.internal.config.Network import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source import org.apache.spark.network._ -import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo @@ -138,6 +138,8 @@ private[spark] class BlockManager( private val remoteReadNioBufferConversion = conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION) + private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES) + val diskBlockManager = { // Only perform cleanup if an external service is not serving our shuffle files. val deleteFilesOnStop = @@ -411,6 +413,7 @@ private[spark] class BlockManager( val idFromMaster = master.registerBlockManager( id, + diskBlockManager.localDirsString, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) @@ -445,7 +448,7 @@ private[spark] class BlockManager( private def registerWithExternalShuffleServer() { logInfo("Registering executor with local external shuffle service.") val shuffleConfig = new ExecutorShuffleInfo( - diskBlockManager.localDirs.map(_.toString), + diskBlockManager.localDirsString, diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName) @@ -500,7 +503,8 @@ private[spark] class BlockManager( def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo(s"BlockManager $blockManagerId re-registering with master") - master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) + master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory, + maxOffHeapMemory, slaveEndpoint) reportAllBlocks() } @@ -866,17 +870,24 @@ private[spark] class BlockManager( private def getRemoteManagedBuffer(blockId: BlockId): Option[ManagedBuffer] = { logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") - var runningFailureCount = 0 - var totalFailureCount = 0 - // Because all the remote blocks are registered in driver, it is not necessary to ask // all the slave executors to get block status. - val locationsAndStatus = master.getLocationsAndStatus(blockId) - val blockSize = locationsAndStatus.map { b => - b.status.diskSize.max(b.status.memSize) - }.getOrElse(0L) - val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty) - + val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host) + if (locationsAndStatusOption.isEmpty) { + logDebug(s"Block $blockId is unknown by block manager master") + return None + } + val locationsAndStatus = locationsAndStatusOption.get + + val blockSize = locationsAndStatus.status.diskSize.max(locationsAndStatus.status.memSize) + if (locationsAndStatus.localDirs.isDefined) { + val blockDataOption = + readDiskBlockFromSameHostExecutor(blockId, locationsAndStatus.localDirs.get, blockSize) + if (blockDataOption.isDefined) { + logDebug(s"Read $blockId from the disk of a same host executor") + return blockDataOption + } + } // If the block size is above the threshold, we should pass our FileManger to // BlockTransferService, which will leverage it to spill the block; if not, then passed-in // null value means the block will be persisted in memory. @@ -885,8 +896,9 @@ private[spark] class BlockManager( } else { null } - - val locations = sortLocations(blockLocations) + var runningFailureCount = 0 + var totalFailureCount = 0 + val locations = sortLocations(locationsAndStatus.locations) val maxFetchFailures = locations.size var locationIterator = locations.iterator while (locationIterator.hasNext) { @@ -946,6 +958,33 @@ private[spark] class BlockManager( None } + private def readDiskBlockFromSameHostExecutor( + blockId: BlockId, + localDirs: Array[String], + blockSize: Long): Option[ManagedBuffer] = { + val file = ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, blockId.name) + if (file.exists()) { + val mangedBuffer = securityManager.getIOEncryptionKey() match { + case Some(key) => + // Encrypted blocks cannot be memory mapped; return a special object that does decryption + // and provides InputStream / FileRegion implementations for reading the data. + new EncryptedManagedBuffer( + new EncryptedBlockData(file, blockSize, conf, key)) + + case _ => + val transportConf = SparkTransportConf.fromSparkConf(conf, "files") + new FileSegmentManagedBuffer(transportConf, file, 0, file.length) + } + if (blockSize > 0 && mangedBuffer.size() == 0) { + None + } else { + Some(mangedBuffer) + } + } else { + None + } + } + /** * Get block from remote block managers as serialized bytes. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index b18d38fe52533..160a8d4babb69 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -56,13 +56,14 @@ class BlockManagerMaster( * updated BlockManagerId fleshed out with this information. */ def registerBlockManager( - blockManagerId: BlockManagerId, + id: BlockManagerId, + localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { - logInfo(s"Registering BlockManager $blockManagerId") + logInfo(s"Registering BlockManager $id") val updatedId = driverEndpoint.askSync[BlockManagerId]( - RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) + RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") updatedId } @@ -85,9 +86,10 @@ class BlockManagerMaster( } /** Get locations as well as status of the blockId from the driver */ - def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = { + def getLocationsAndStatus( + blockId: BlockId, requesterHost: String): Option[BlockLocationsAndStatus] = { driverEndpoint.askSync[Option[BlockLocationsAndStatus]]( - GetLocationsAndStatus(blockId)) + GetLocationsAndStatus(blockId, requesterHost)) } /** Get locations of multiple blockIds from the driver */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 65ec1c3f0dc61..7fe380aadbe9c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -86,8 +86,8 @@ class BlockManagerMasterEndpoint( private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf) override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => - context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) + case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => + context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => @@ -97,8 +97,8 @@ class BlockManagerMasterEndpoint( case GetLocations(blockId) => context.reply(getLocations(blockId)) - case GetLocationsAndStatus(blockId) => - context.reply(getLocationsAndStatus(blockId)) + case GetLocationsAndStatus(blockId, requesterHost) => + context.reply(getLocationsAndStatus(blockId, requesterHost)) case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds)) @@ -410,6 +410,7 @@ class BlockManagerMasterEndpoint( */ private def register( idWithoutTopologyInfo: BlockManagerId, + localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { @@ -445,8 +446,8 @@ class BlockManagerMasterEndpoint( None } - blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxOnHeapMemSize, - maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus) + blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), localDirs, + maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) @@ -511,7 +512,8 @@ class BlockManagerMasterEndpoint( if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty } - private def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = { + private def getLocationsAndStatus( + blockId: BlockId, requesterHost: String): Option[BlockLocationsAndStatus] = { val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty) val status = locations.headOption.flatMap { bmId => if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { @@ -522,7 +524,12 @@ class BlockManagerMasterEndpoint( } if (locations.nonEmpty && status.isDefined) { - Some(BlockLocationsAndStatus(locations, status.get)) + val localDirs = if (status.get.storageLevel.useDisk) { + locations.find(_.host == requesterHost).map(blockManagerInfo(_).localDirs) + } else { + None + } + Some(BlockLocationsAndStatus(locations, status.get, localDirs)) } else { None } @@ -573,6 +580,7 @@ object BlockStatus { private[spark] class BlockManagerInfo( val blockManagerId: BlockManagerId, timeMs: Long, + val localDirs: Array[String], val maxOnHeapMem: Long, val maxOffHeapMem: Long, val slaveEndpoint: RpcEndpointRef, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 3dbac694cf818..3b33050d1399c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -58,6 +58,7 @@ private[spark] object BlockManagerMessages { case class RegisterBlockManager( blockManagerId: BlockManagerId, + localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, sender: RpcEndpointRef) @@ -93,10 +94,14 @@ private[spark] object BlockManagerMessages { case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster - case class GetLocationsAndStatus(blockId: BlockId) extends ToBlockManagerMaster + case class GetLocationsAndStatus(blockId: BlockId, requesterHost: String) + extends ToBlockManagerMaster // The response message of `GetLocationsAndStatus` request. - case class BlockLocationsAndStatus(locations: Seq[BlockManagerId], status: BlockStatus) { + case class BlockLocationsAndStatus( + locations: Seq[BlockManagerId], + status: BlockStatus, + localDirs: Option[Array[String]]) { assert(locations.nonEmpty) } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 95ce4b0f09f5f..6a30758042b4f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -44,6 +44,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } + + private[spark] val localDirsString: Array[String] = localDirs.map(_.toString) + // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) @@ -52,7 +55,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with - // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). + // org.apache.spark.network.shuffle.ExecutorDiskReader#getFile(). def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ab4693e86b3a2..c81500951291a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -81,6 +81,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Implicitly convert strings to BlockIds for test clarity. implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) + def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId) private def init(sparkConf: SparkConf): Unit = { @@ -567,6 +568,33 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getRemoteBytes("list1").isEmpty) } + Seq( + StorageLevel(useDisk = true, useMemory = false, deserialized = false), + StorageLevel(useDisk = true, useMemory = false, deserialized = true) + ).foreach { storageLevel => + test(s"SPARK-27622: avoid the network when block requested from same host, $storageLevel") { + conf.set("spark.shuffle.io.maxRetries", "0") + val noFetcher = new MockBlockTransferService(0) { + override def fetchBlockSync( + host: String, + port: Int, + execId: String, + blockId: String, + tempFileManager: DownloadFileManager): ManagedBuffer = { + fail("Fetching over network is not expected when the block is requested from same host") + } + } + val store1 = makeBlockManager(8000, "executor1", this.master, Some(noFetcher)) + val store2 = makeBlockManager(8000, "executor2", this.master, Some(noFetcher)) + val list = List(new Array[Byte](4000)) + store2.putIterator("list", list.iterator, storageLevel, tellMaster = true) + val bytesViaStore1 = store1.getRemoteBytes("list") + assert(bytesViaStore1.isDefined, "list expected to be accessed") + val expectedContent = store2.getBlockData("list").nioByteBuffer().array() + assert(bytesViaStore1.get.toArray === expectedContent) + } + } + test("SPARK-14252: getOrElseUpdate should still read from remote storage") { val store = makeBlockManager(8000, "executor1") val store2 = makeBlockManager(8000, "executor2") @@ -1315,8 +1343,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // so that we have a chance to do location refresh val blockManagerIds = (0 to maxFailuresBeforeLocationRefresh) .map { i => BlockManagerId(s"id-$i", s"host-$i", i + 1) } - when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn( - Option(BlockLocationsAndStatus(blockManagerIds, BlockStatus.empty))) + when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId], mc.any[String])).thenReturn( + Option(BlockLocationsAndStatus(blockManagerIds, BlockStatus.empty, None))) when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn( blockManagerIds) @@ -1325,7 +1353,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val block = store.getRemoteBytes("item") .asInstanceOf[Option[ByteBuffer]] assert(block.isDefined) - verify(mockBlockManagerMaster, times(1)).getLocationsAndStatus("item") + verify(mockBlockManagerMaster, times(1)) + .getLocationsAndStatus("item", "MockBlockTransferServiceHost") verify(mockBlockManagerMaster, times(1)).getLocations("item") } @@ -1502,8 +1531,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockLocations = Seq(BlockManagerId("id-0", "host-0", 1)) val blockStatus = BlockStatus(StorageLevel.DISK_ONLY, 0L, 2000L) - when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn( - Option(BlockLocationsAndStatus(blockLocations, blockStatus))) + when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId], mc.any[String])).thenReturn( + Option(BlockLocationsAndStatus(blockLocations, blockStatus, None))) when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockLocations) val store = makeBlockManager(8000, "executor1", mockBlockManagerMaster, From e4b8ca29584b5dbc066da2bea5cc8a83c0efeae9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Thu, 9 May 2019 12:30:49 +0200 Subject: [PATCH 02/11] remove the remote location of the block manager to which localDirs belongs to (if defined) --- .../storage/BlockManagerMasterEndpoint.scala | 15 ++++++++++++--- .../spark/storage/BlockManagerMessages.scala | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 7fe380aadbe9c..81f8042589fff 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -524,12 +524,21 @@ class BlockManagerMasterEndpoint( } if (locations.nonEmpty && status.isDefined) { - val localDirs = if (status.get.storageLevel.useDisk) { - locations.find(_.host == requesterHost).map(blockManagerInfo(_).localDirs) + val bmIdToLocalDirs = if (status.get.storageLevel.useDisk) { + locations + .find(_.host == requesterHost) + .map { sameHostBlockId => + val bmInfo = blockManagerInfo(sameHostBlockId) + bmInfo.blockManagerId -> bmInfo.localDirs + } } else { None } - Some(BlockLocationsAndStatus(locations, status.get, localDirs)) + bmIdToLocalDirs.map { case (bmId, localDirs) => + Some(BlockLocationsAndStatus(locations.filter(_ != bmId), status.get, Some(localDirs))) + }.getOrElse( + Some(BlockLocationsAndStatus(locations, status.get, None)) + ) } else { None } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 3b33050d1399c..c0ae26a15cd22 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -102,7 +102,7 @@ private[spark] object BlockManagerMessages { locations: Seq[BlockManagerId], status: BlockStatus, localDirs: Option[Array[String]]) { - assert(locations.nonEmpty) + assert(locations.nonEmpty || localDirs.isDefined) } case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster From 1b9c52303fb168003c1febf9e4557ef5a95fe68d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Fri, 10 May 2019 14:04:17 +0200 Subject: [PATCH 03/11] applying review comments 1.0 --- .../scala/org/apache/spark/storage/BlockManager.scala | 2 ++ .../spark/storage/BlockManagerMasterEndpoint.scala | 4 ++-- .../apache/spark/storage/BlockManagerMessages.scala | 8 +++++++- .../org/apache/spark/storage/BlockManagerSuite.scala | 10 +++++----- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f7ce984905bd6..0a7b50b435d0d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -881,6 +881,8 @@ private[spark] class BlockManager( val blockSize = locationsAndStatus.status.diskSize.max(locationsAndStatus.status.memSize) if (locationsAndStatus.localDirs.isDefined) { + // as the block content can be found on the same host using the network can be avoided by + // accessing the file directly and using the local directories of the other executor val blockDataOption = readDiskBlockFromSameHostExecutor(blockId, locationsAndStatus.localDirs.get, blockSize) if (blockDataOption.isDefined) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 81f8042589fff..2195d238c2379 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -535,8 +535,8 @@ class BlockManagerMasterEndpoint( None } bmIdToLocalDirs.map { case (bmId, localDirs) => - Some(BlockLocationsAndStatus(locations.filter(_ != bmId), status.get, Some(localDirs))) - }.getOrElse( + BlockLocationsAndStatus(locations.filter(_ != bmId), status.get, Some(localDirs)) + }.orElse( Some(BlockLocationsAndStatus(locations, status.get, None)) ) } else { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index c0ae26a15cd22..4306e27e40f60 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -97,7 +97,13 @@ private[spark] object BlockManagerMessages { case class GetLocationsAndStatus(blockId: BlockId, requesterHost: String) extends ToBlockManagerMaster - // The response message of `GetLocationsAndStatus` request. + /** + * The response message of `GetLocationsAndStatus` request. + * + * @param localDirs if it is persisted-to-disk on the same host as the requester executor is + * running on then localDirs will be Some and the cached data will be in a file + * in one of those dirs, otherwise it is None. + */ case class BlockLocationsAndStatus( locations: Seq[BlockManagerId], status: BlockStatus, diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c81500951291a..b4e21bd808c6b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -576,11 +576,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.shuffle.io.maxRetries", "0") val noFetcher = new MockBlockTransferService(0) { override def fetchBlockSync( - host: String, - port: Int, - execId: String, - blockId: String, - tempFileManager: DownloadFileManager): ManagedBuffer = { + host: String, + port: Int, + execId: String, + blockId: String, + tempFileManager: DownloadFileManager): ManagedBuffer = { fail("Fetching over network is not expected when the block is requested from same host") } } From b2cff9e92ffc3ebe8c7f267f85ab0391ef67eac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Fri, 10 May 2019 14:13:02 +0200 Subject: [PATCH 04/11] applying review comments 1.1 --- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0a7b50b435d0d..77560094d3b4f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -960,6 +960,9 @@ private[spark] class BlockManager( None } + /** + * Reads the block from the local directories of another executor which runs on the same host. + */ private def readDiskBlockFromSameHostExecutor( blockId: BlockId, localDirs: Array[String], From 8543b938d94416c882b3a415ea870baccd458f01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Mon, 20 May 2019 22:47:09 +0200 Subject: [PATCH 05/11] introduce getAndMapRemoteManagedBuf and open the block file early --- .../network/shuffle/ExecutorDiskReader.java | 7 +- .../shuffle/ExternalShuffleBlockResolver.java | 1 - .../shuffle/TestShuffleDataContext.java | 2 +- .../apache/spark/storage/BlockManager.scala | 93 ++++++---- .../storage/BlockManagerMasterEndpoint.scala | 18 +- core/src/test/resources/log4j.properties | 2 +- .../spark/storage/BlockManagerInfoSuite.scala | 1 + .../spark/storage/BlockManagerSuite.scala | 160 +++++++++++++++++- .../org/apache/spark/util/UtilsSuite.scala | 23 ++- 9 files changed, 244 insertions(+), 63 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java index 0d3363a531f62..00e20185e46d1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java @@ -17,13 +17,14 @@ package org.apache.spark.network.shuffle; -import com.google.common.annotations.VisibleForTesting; -import org.apache.spark.network.util.JavaUtils; - import java.io.File; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; + +import org.apache.spark.network.util.JavaUtils; + public class ExecutorDiskReader { private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}"); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 8804250093ace..31b6369520295 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -24,7 +24,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.regex.Matcher; import java.util.regex.Pattern; import com.fasterxml.jackson.annotation.JsonCreator; diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 914d9edfef0f3..817767e907e7c 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -121,7 +121,7 @@ private void insertFile(String filename) throws IOException { private void insertFile(String filename, byte[] block) throws IOException { OutputStream dataStream = null; - File file = ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, filename); + File file = ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, filename); assert(!file.exists()) : "this test file has been already generated"; try { dataStream = new FileOutputStream( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 77560094d3b4f..2fafbbf2ac6c2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -29,7 +29,7 @@ import scala.collection.mutable.HashMap import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.reflect.ClassTag -import scala.util.Random +import scala.util.{Random, Try} import scala.util.control.NonFatal import com.codahale.metrics.{MetricRegistry, MetricSet} @@ -45,6 +45,7 @@ import org.apache.spark.metrics.source.Source import org.apache.spark.network._ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.client.StreamCallbackWithID +import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.network.util.TransportConf @@ -831,10 +832,57 @@ private[spark] class BlockManager( */ private[spark] def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { val ct = implicitly[ClassTag[T]] - getRemoteManagedBuffer(blockId).map { data => + getRemoteBlock(blockId, (data: ManagedBuffer) => { val values = serializerManager.dataDeserializeStream(blockId, data.createInputStream())(ct) new BlockResult(values, DataReadMethod.Network, data.size) + }) + } + + /** + * Get the remote block and transform it to the provided data type. + * + * If the block is persisted to the disk and stored at an executor running on the same host then + * first it is tried to be accessed using the local directories of the other executor directly. + * If the file is successfully identified then tried to be transformed by the provided + * transformation function which expected to open the file. If there is any exception during this + * transformation then block access falls back to fetching it from the remote executor via the + * network. + * + * @param blockId identifies the block to get + * @param bufferTransformer this transformer expected to open the file if the block is backed by a + * file by this it is guaranteed the whole content can be loaded + * @tparam T result type + * @return + */ + private[spark] def getRemoteBlock[T]( + blockId: BlockId, + bufferTransformer: ManagedBuffer => T): Option[T] = { + logDebug(s"Getting remote block $blockId") + require(blockId != null, "BlockId is null") + + // Because all the remote blocks are registered in driver, it is not necessary to ask + // all the slave executors to get block status. + val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host) + if (locationsAndStatusOption.isEmpty) { + logDebug(s"Block $blockId is unknown by block manager master") + None + } else { + val locationsAndStatus = locationsAndStatusOption.get + val blockSize = locationsAndStatus.status.diskSize.max(locationsAndStatus.status.memSize) + + locationsAndStatus.localDirs.flatMap { localDirs => + val blockDataOption = + readDiskBlockFromSameHostExecutor(blockId, localDirs, locationsAndStatus.status.diskSize) + val res = blockDataOption.flatMap { blockData => + Try(bufferTransformer(blockData)).toOption + } + logDebug(s"Read $blockId from the disk of a same host executor is " + + (if (res.isDefined) "successful." else "failed.")) + res + }.orElse { + fetchRemoteManagedBuffer(blockId, blockSize, locationsAndStatus).map(bufferTransformer) + } } } @@ -865,31 +913,12 @@ private[spark] class BlockManager( } /** - * Get block from remote block managers as a ManagedBuffer. + * Fetch the block from remote block managers as a ManagedBuffer. */ - private def getRemoteManagedBuffer(blockId: BlockId): Option[ManagedBuffer] = { - logDebug(s"Getting remote block $blockId") - require(blockId != null, "BlockId is null") - // Because all the remote blocks are registered in driver, it is not necessary to ask - // all the slave executors to get block status. - val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host) - if (locationsAndStatusOption.isEmpty) { - logDebug(s"Block $blockId is unknown by block manager master") - return None - } - val locationsAndStatus = locationsAndStatusOption.get - - val blockSize = locationsAndStatus.status.diskSize.max(locationsAndStatus.status.memSize) - if (locationsAndStatus.localDirs.isDefined) { - // as the block content can be found on the same host using the network can be avoided by - // accessing the file directly and using the local directories of the other executor - val blockDataOption = - readDiskBlockFromSameHostExecutor(blockId, locationsAndStatus.localDirs.get, blockSize) - if (blockDataOption.isDefined) { - logDebug(s"Read $blockId from the disk of a same host executor") - return blockDataOption - } - } + def fetchRemoteManagedBuffer( + blockId: BlockId, + blockSize: Long, + locationsAndStatus: BlockManagerMessages.BlockLocationsAndStatus): Option[ManagedBuffer] = { // If the block size is above the threshold, we should pass our FileManger to // BlockTransferService, which will leverage it to spill the block; if not, then passed-in // null value means the block will be persisted in memory. @@ -963,7 +992,7 @@ private[spark] class BlockManager( /** * Reads the block from the local directories of another executor which runs on the same host. */ - private def readDiskBlockFromSameHostExecutor( + private[spark] def readDiskBlockFromSameHostExecutor( blockId: BlockId, localDirs: Array[String], blockSize: Long): Option[ManagedBuffer] = { @@ -980,11 +1009,7 @@ private[spark] class BlockManager( val transportConf = SparkTransportConf.fromSparkConf(conf, "files") new FileSegmentManagedBuffer(transportConf, file, 0, file.length) } - if (blockSize > 0 && mangedBuffer.size() == 0) { - None - } else { - Some(mangedBuffer) - } + Some(mangedBuffer) } else { None } @@ -994,7 +1019,7 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { - getRemoteManagedBuffer(blockId).map { data => + getRemoteBlock(blockId, (data: ManagedBuffer) => { // SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to // ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if // new path is stable. @@ -1003,7 +1028,7 @@ private[spark] class BlockManager( } else { ChunkedByteBuffer.fromManagedBuffer(data) } - } + }) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 2195d238c2379..7b82a43868a66 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -513,7 +513,8 @@ class BlockManagerMasterEndpoint( } private def getLocationsAndStatus( - blockId: BlockId, requesterHost: String): Option[BlockLocationsAndStatus] = { + blockId: BlockId, + requesterHost: String): Option[BlockLocationsAndStatus] = { val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty) val status = locations.headOption.flatMap { bmId => if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { @@ -524,21 +525,12 @@ class BlockManagerMasterEndpoint( } if (locations.nonEmpty && status.isDefined) { - val bmIdToLocalDirs = if (status.get.storageLevel.useDisk) { - locations - .find(_.host == requesterHost) - .map { sameHostBlockId => - val bmInfo = blockManagerInfo(sameHostBlockId) - bmInfo.blockManagerId -> bmInfo.localDirs - } + val localDirs = if (status.get.storageLevel.useDisk) { + locations.find(_.host == requesterHost).map(blockManagerInfo(_).localDirs) } else { None } - bmIdToLocalDirs.map { case (bmId, localDirs) => - BlockLocationsAndStatus(locations.filter(_ != bmId), status.get, Some(localDirs)) - }.orElse( - Some(BlockLocationsAndStatus(locations, status.get, None)) - ) + Some(BlockLocationsAndStatus(locations, status.get, localDirs)) } else { None } diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 2f46ce1553ee6..b288ea9cf6f53 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -17,7 +17,7 @@ # Set everything to be logged to the file target/unit-tests.log test.appender=file -log4j.rootCategory=INFO, ${test.appender} +log4j.rootCategory=DEBUG, ${test.appender} log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala index 8df1232503037..7dcbfa0103a9d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala @@ -31,6 +31,7 @@ class BlockManagerInfoSuite extends SparkFunSuite { val bmInfo = new BlockManagerInfo( BlockManagerId("executor0", "host", 1234, None), timeMs = 300, + Array(), maxOnHeapMem = 10000, maxOffHeapMem = 20000, slaveEndpoint = null, diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index b4e21bd808c6b..93416bb024b2a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -40,18 +40,19 @@ import org.apache.spark.executor.DataReadMethod import org.apache.spark.internal.config import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests._ -import org.apache.spark.memory.UnifiedMemoryManager +import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager} import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} -import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExternalShuffleClient} +import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskReader, ExternalShuffleClient} import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager} +import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util._ @@ -94,12 +95,51 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) } + type BlockManagerFactory = ( + String, + RpcEnv, + BlockManagerMaster, + SerializerManager, + SparkConf, + MemoryManager, + MapOutputTracker, + ShuffleManager, + BlockTransferService, + SecurityManager, + Option[ExternalShuffleClient]) => BlockManager + + private val defaultBlockManagerFactory: BlockManagerFactory = ( + executorId: String, + rpcEnv: RpcEnv, + master: BlockManagerMaster, + serializerManager: SerializerManager, + conf: SparkConf, + memoryManager: MemoryManager, + mapOutputTracker: MapOutputTracker, + shuffleManager: ShuffleManager, + blockTransferService: BlockTransferService, + securityManager: SecurityManager, + externalShuffleClient: Option[ExternalShuffleClient]) => + new BlockManager( + executorId, + rpcEnv, + master, + serializerManager, + conf, + memoryManager, + mapOutputTracker, + shuffleManager, + blockTransferService, + securityManager, + externalShuffleClient) + private def makeBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER, master: BlockManagerMaster = this.master, transferService: Option[BlockTransferService] = Option.empty, - testConf: Option[SparkConf] = None): BlockManager = { + testConf: Option[SparkConf] = None, + blockManagerFactory: BlockManagerFactory = defaultBlockManagerFactory): BlockManager = { val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf) bmConf.set(TEST_MEMORY, maxMem) bmConf.set(MEMORY_OFFHEAP_SIZE, maxMem) @@ -121,7 +161,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } else { None } - val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, bmConf, + val blockManager = blockManagerFactory(name, rpcEnv, master, serializerManager, bmConf, memManager, mapOutputTracker, shuffleManager, transfer, bmSecurityMgr, externalShuffleClient) memManager.setMemoryStore(blockManager.memoryStore) allStores += blockManager @@ -586,12 +626,116 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } val store1 = makeBlockManager(8000, "executor1", this.master, Some(noFetcher)) val store2 = makeBlockManager(8000, "executor2", this.master, Some(noFetcher)) - val list = List(new Array[Byte](4000)) - store2.putIterator("list", list.iterator, storageLevel, tellMaster = true) - val bytesViaStore1 = store1.getRemoteBytes("list") + val blockId = "list" + val array = new Array[Byte](4000) + store2.putIterator(blockId, List(array).iterator, storageLevel, true) + + // check getRemoteBytes + val bytesViaStore1 = store1.getRemoteBytes(blockId) assert(bytesViaStore1.isDefined, "list expected to be accessed") - val expectedContent = store2.getBlockData("list").nioByteBuffer().array() + val expectedContent = store2.getBlockData(blockId).nioByteBuffer().array() assert(bytesViaStore1.get.toArray === expectedContent) + + // check getRemoteValues + val valueViaStore1 = store1.getRemoteValues[List.type](blockId) + assert(valueViaStore1.isDefined, "list expected to be accessed") + assert(valueViaStore1.get.data.toList.head === array) + } + } + + private def testWithFileDelAfterLocalDiskRead(level: StorageLevel, getValueOrBytes: Boolean) = { + val testedFunc = if (getValueOrBytes) "getRemoteValue()" else "getRemoteBytes()" + val testNameSuffix = s"$level, $testedFunc" + test(s"SPARK-27622: as file is removed fall back to network fetch, $testNameSuffix") { + conf.set("spark.shuffle.io.maxRetries", "0") + // variable to check the usage of the local disk of the remote executor on the same host + var sameHostExecutorTried: Boolean = false + val store2 = makeBlockManager(8000, "executor2", this.master, + Some(new MockBlockTransferService(0))) + val blockId = "list" + val array = new Array[Byte](4000) + store2.putIterator(blockId, List(array).iterator, level, true) + val expectedBlockData = store2.getLocalBytes(blockId) + assert(expectedBlockData.isDefined) + val expectedByteBuffer = expectedBlockData.get.toByteBuffer() + + val transferServiceAfterLocalAccess = new MockBlockTransferService(0) { + override def fetchBlockSync( + host: String, + port: Int, + execId: String, + blockId: String, + tempFileManager: DownloadFileManager): ManagedBuffer = { + assert(sameHostExecutorTried, "before using the network local disk of the remote " + + "executor (running on the same host) is expected to be tried") + new NioManagedBuffer(expectedByteBuffer) + } + } + + val blockManagerWithDeleteFactory: BlockManagerFactory = ( + executorId: String, + rpcEnv: RpcEnv, + master: BlockManagerMaster, + serializerManager: SerializerManager, + conf: SparkConf, + memoryManager: MemoryManager, + mapOutputTracker: MapOutputTracker, + shuffleManager: ShuffleManager, + blockTransferService: BlockTransferService, + securityManager: SecurityManager, + externalShuffleClient: Option[ExternalShuffleClient]) => { + new BlockManager( + executorId, + rpcEnv, + master, + serializerManager, + conf, + memoryManager, + mapOutputTracker, + shuffleManager, + blockTransferService, + securityManager, + externalShuffleClient) { + + override def readDiskBlockFromSameHostExecutor( + blockId: BlockId, + localDirs: Array[String], + blockSize: Long): Option[ManagedBuffer] = { + val res = super.readDiskBlockFromSameHostExecutor(blockId, localDirs, blockSize) + assert(res.isDefined) + // delete the file behind the blockId + ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, blockId.name).delete() + sameHostExecutorTried = true + res + } + } + } + + val store1 = makeBlockManager( + 8000, + "executor1", + this.master, + Some(transferServiceAfterLocalAccess), + blockManagerFactory = blockManagerWithDeleteFactory) + + if (getValueOrBytes) { + val valuesViaStore1 = store1.getRemoteValues(blockId) + assert(valuesViaStore1.isDefined, "list expected to be accessed") + assert(valuesViaStore1.get.data.toList.head === array) + } else { + val bytesViaStore1 = store1.getRemoteBytes(blockId) + assert(bytesViaStore1.isDefined, "list expected to be accessed") + assert(bytesViaStore1.get.toByteBuffer === expectedByteBuffer) + } + } + } + + Seq( + StorageLevel(useDisk = true, useMemory = false, deserialized = false), + StorageLevel(useDisk = true, useMemory = false, deserialized = true) + ).foreach { storageLevel => + Seq(true, false).foreach { valueOrBytes => + testWithFileDelAfterLocalDiskRead(storageLevel, valueOrBytes) } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index d2d9eb06339cb..7e5f7299b31c4 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.util -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File, - FileOutputStream, InputStream, PrintStream, SequenceInputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File, FileInputStream, FileOutputStream, InputStream, PrintStream, SequenceInputStream} import java.lang.{Double => JDouble, Float => JFloat} import java.lang.reflect.Field import java.net.{BindException, ServerSocket, URI} @@ -1310,6 +1309,26 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.trimExceptCRLF(s"b${s}b") === s"b${s}b") } } + + test("deleting an already opened file does not interrupt the reading process") { + withTempDir { dir => + val testFile = Utils.tempFileWith(dir) + Utils.tryWithResource(new FileOutputStream(testFile)) { outputStream => + (1 to 1000).foreach { index => + outputStream.write((42 + index) % 256) + } + } + Utils.tryWithResource(new FileInputStream(testFile)) { inputStream => + // deleting the file + // the success return value indicates the entry from its directory is removed + assert(testFile.delete() === true) + assert(testFile.exists() === false) + (1 to 1000).foreach { index => + assert(inputStream.read() === (index + 42) % 256) + } + } + } + } } private class SimpleExtension From 847afc220e7e2711083f1add50ace0458d30cb76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Tue, 28 May 2019 14:42:51 +0200 Subject: [PATCH 06/11] make test windows compatible --- core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 7e5f7299b31c4..df14d2cf5cb4e 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1320,9 +1320,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { } Utils.tryWithResource(new FileInputStream(testFile)) { inputStream => // deleting the file - // the success return value indicates the entry from its directory is removed - assert(testFile.delete() === true) - assert(testFile.exists() === false) + testFile.delete() (1 to 1000).foreach { index => assert(inputStream.read() === (index + 42) % 256) } From c048ecaaee29ac70242ada67c224db98425680e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Sun, 2 Jun 2019 19:16:34 +0200 Subject: [PATCH 07/11] applying review comments --- ...DiskReader.java => ExecutorDiskUtils.java} | 2 +- .../shuffle/ExternalShuffleBlockResolver.java | 8 +- .../ExternalShuffleBlockResolverSuite.java | 2 +- .../shuffle/TestShuffleDataContext.java | 8 +- .../apache/spark/storage/BlockManager.scala | 18 ++- .../spark/storage/BlockManagerMaster.scala | 3 +- .../storage/BlockManagerMasterEndpoint.scala | 10 +- .../spark/storage/BlockManagerMessages.scala | 2 +- .../spark/storage/DiskBlockManager.scala | 2 +- core/src/test/resources/log4j.properties | 2 +- .../spark/storage/BlockManagerSuite.scala | 124 ++++-------------- 11 files changed, 58 insertions(+), 123 deletions(-) rename common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/{ExecutorDiskReader.java => ExecutorDiskUtils.java} (98%) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java similarity index 98% rename from common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java rename to common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java index 00e20185e46d1..13f6046dd856b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java @@ -25,7 +25,7 @@ import org.apache.spark.network.util.JavaUtils; -public class ExecutorDiskReader { +public class ExecutorDiskUtils { private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}"); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 31b6369520295..50f16fc700f12 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -297,7 +297,7 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) { */ private ManagedBuffer getSortBasedShuffleBlockData( ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { - File indexFile = ExecutorDiskReader.getFile(executor.localDirs, executor.subDirsPerLocalDir, + File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.index"); try { @@ -305,7 +305,7 @@ private ManagedBuffer getSortBasedShuffleBlockData( ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId); return new FileSegmentManagedBuffer( conf, - ExecutorDiskReader.getFile(executor.localDirs, executor.subDirsPerLocalDir, + ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.data"), shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength()); @@ -316,7 +316,7 @@ private ManagedBuffer getSortBasedShuffleBlockData( public ManagedBuffer getDiskPersistedRddBlockData( ExecutorShuffleInfo executor, int rddId, int splitIndex) { - File file = ExecutorDiskReader.getFile(executor.localDirs, executor.subDirsPerLocalDir, + File file = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, "rdd_" + rddId + "_" + splitIndex); long fileLength = file.length(); ManagedBuffer res = null; @@ -345,7 +345,7 @@ public int removeBlocks(String appId, String execId, String[] blockIds) { int numRemovedBlocks = 0; for (String blockId : blockIds) { File file = - ExecutorDiskReader.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); + ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); if (file.delete()) { numRemovedBlocks++; } else { diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 4994c4eb5bcae..09eb699be305a 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -149,7 +149,7 @@ public void testNormalizeAndInternPathname() { private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) { String normPathname = - ExecutorDiskReader.createNormalizedInternedPathname(p1, p2, p3); + ExecutorDiskUtils.createNormalizedInternedPathname(p1, p2, p3); assertEquals(expectedPathname, normPathname); File file = new File(normPathname); String returnedPath = file.getPath(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 817767e907e7c..457805feeac45 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -76,9 +76,9 @@ public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) thr try { dataStream = new FileOutputStream( - ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); + ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); indexStream = new DataOutputStream(new FileOutputStream( - ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, blockId + ".index"))); + ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".index"))); long offset = 0; indexStream.writeLong(offset); @@ -121,11 +121,11 @@ private void insertFile(String filename) throws IOException { private void insertFile(String filename, byte[] block) throws IOException { OutputStream dataStream = null; - File file = ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, filename); + File file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename); assert(!file.exists()) : "this test file has been already generated"; try { dataStream = new FileOutputStream( - ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, filename)); + ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename)); dataStream.write(block); } finally { Closeables.close(dataStream, false); diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2fafbbf2ac6c2..adf456c2bcfd7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -855,9 +855,9 @@ private[spark] class BlockManager( * @tparam T result type * @return */ - private[spark] def getRemoteBlock[T]( - blockId: BlockId, - bufferTransformer: ManagedBuffer => T): Option[T] = { + private[spark] def getRemoteBlock[T]( + blockId: BlockId, + bufferTransformer: ManagedBuffer => T): Option[T] = { logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") @@ -875,7 +875,13 @@ private[spark] class BlockManager( val blockDataOption = readDiskBlockFromSameHostExecutor(blockId, localDirs, locationsAndStatus.status.diskSize) val res = blockDataOption.flatMap { blockData => - Try(bufferTransformer(blockData)).toOption + Try(bufferTransformer(blockData)) + .fold( { throwable => + logDebug("Block from the same host executor cannot be opened: ", throwable) + None + }, { block => + Some(block) + }) } logDebug(s"Read $blockId from the disk of a same host executor is " + (if (res.isDefined) "successful." else "failed.")) @@ -915,7 +921,7 @@ private[spark] class BlockManager( /** * Fetch the block from remote block managers as a ManagedBuffer. */ - def fetchRemoteManagedBuffer( + private def fetchRemoteManagedBuffer( blockId: BlockId, blockSize: Long, locationsAndStatus: BlockManagerMessages.BlockLocationsAndStatus): Option[ManagedBuffer] = { @@ -996,7 +1002,7 @@ private[spark] class BlockManager( blockId: BlockId, localDirs: Array[String], blockSize: Long): Option[ManagedBuffer] = { - val file = ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, blockId.name) + val file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId.name) if (file.exists()) { val mangedBuffer = securityManager.getIOEncryptionKey() match { case Some(key) => diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 160a8d4babb69..96bd2e3a5e0ed 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -87,7 +87,8 @@ class BlockManagerMaster( /** Get locations as well as status of the blockId from the driver */ def getLocationsAndStatus( - blockId: BlockId, requesterHost: String): Option[BlockLocationsAndStatus] = { + blockId: BlockId, + requesterHost: String): Option[BlockLocationsAndStatus] = { driverEndpoint.askSync[Option[BlockLocationsAndStatus]]( GetLocationsAndStatus(blockId, requesterHost)) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 7b82a43868a66..b749b33bc93b4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -525,11 +525,11 @@ class BlockManagerMasterEndpoint( } if (locations.nonEmpty && status.isDefined) { - val localDirs = if (status.get.storageLevel.useDisk) { - locations.find(_.host == requesterHost).map(blockManagerInfo(_).localDirs) - } else { - None - } + val localDirs = locations.find { loc => + loc.host == requesterHost && loc.port != externalShuffleServicePort + val status = blockManagerInfo(loc).getStatus(blockId) + status.isDefined && status.get.storageLevel.useDisk + }.map(blockManagerInfo(_).localDirs) Some(BlockLocationsAndStatus(locations, status.get, localDirs)) } else { None diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 4306e27e40f60..219afaf0792f4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -108,7 +108,7 @@ private[spark] object BlockManagerMessages { locations: Seq[BlockManagerId], status: BlockStatus, localDirs: Option[Array[String]]) { - assert(locations.nonEmpty || localDirs.isDefined) + assert(locations.nonEmpty) } case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 6a30758042b4f..c3990bf71e604 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with - // org.apache.spark.network.shuffle.ExecutorDiskReader#getFile(). + // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile(). def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index b288ea9cf6f53..2f46ce1553ee6 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -17,7 +17,7 @@ # Set everything to be logged to the file target/unit-tests.log test.appender=file -log4j.rootCategory=DEBUG, ${test.appender} +log4j.rootCategory=INFO, ${test.appender} log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 93416bb024b2a..0c328d527d66b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -29,7 +29,8 @@ import scala.reflect.ClassTag import org.apache.commons.lang3.RandomUtils import org.mockito.{ArgumentMatchers => mc} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{doAnswer, mock, spy, times, verify, when} +import org.mockito.invocation.InvocationOnMock import org.scalatest._ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ @@ -40,19 +41,18 @@ import org.apache.spark.executor.DataReadMethod import org.apache.spark.internal.config import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests._ -import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager} +import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} -import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskReader, ExternalShuffleClient} +import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalShuffleClient} import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager} -import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util._ @@ -82,7 +82,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Implicitly convert strings to BlockIds for test clarity. implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) - def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId) private def init(sparkConf: SparkConf): Unit = { @@ -95,51 +94,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) } - type BlockManagerFactory = ( - String, - RpcEnv, - BlockManagerMaster, - SerializerManager, - SparkConf, - MemoryManager, - MapOutputTracker, - ShuffleManager, - BlockTransferService, - SecurityManager, - Option[ExternalShuffleClient]) => BlockManager - - private val defaultBlockManagerFactory: BlockManagerFactory = ( - executorId: String, - rpcEnv: RpcEnv, - master: BlockManagerMaster, - serializerManager: SerializerManager, - conf: SparkConf, - memoryManager: MemoryManager, - mapOutputTracker: MapOutputTracker, - shuffleManager: ShuffleManager, - blockTransferService: BlockTransferService, - securityManager: SecurityManager, - externalShuffleClient: Option[ExternalShuffleClient]) => - new BlockManager( - executorId, - rpcEnv, - master, - serializerManager, - conf, - memoryManager, - mapOutputTracker, - shuffleManager, - blockTransferService, - securityManager, - externalShuffleClient) - private def makeBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER, master: BlockManagerMaster = this.master, transferService: Option[BlockTransferService] = Option.empty, - testConf: Option[SparkConf] = None, - blockManagerFactory: BlockManagerFactory = defaultBlockManagerFactory): BlockManager = { + testConf: Option[SparkConf] = None): BlockManager = { val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf) bmConf.set(TEST_MEMORY, maxMem) bmConf.set(MEMORY_OFFHEAP_SIZE, maxMem) @@ -161,7 +121,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } else { None } - val blockManager = blockManagerFactory(name, rpcEnv, master, serializerManager, bmConf, + val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, bmConf, memManager, mapOutputTracker, shuffleManager, transfer, bmSecurityMgr, externalShuffleClient) memManager.setMemoryStore(blockManager.memoryStore) allStores += blockManager @@ -658,8 +618,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val expectedBlockData = store2.getLocalBytes(blockId) assert(expectedBlockData.isDefined) val expectedByteBuffer = expectedBlockData.get.toByteBuffer() - - val transferServiceAfterLocalAccess = new MockBlockTransferService(0) { + val mockTransferService = new MockBlockTransferService(0) { override def fetchBlockSync( host: String, port: Int, @@ -671,60 +630,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE new NioManagedBuffer(expectedByteBuffer) } } - - val blockManagerWithDeleteFactory: BlockManagerFactory = ( - executorId: String, - rpcEnv: RpcEnv, - master: BlockManagerMaster, - serializerManager: SerializerManager, - conf: SparkConf, - memoryManager: MemoryManager, - mapOutputTracker: MapOutputTracker, - shuffleManager: ShuffleManager, - blockTransferService: BlockTransferService, - securityManager: SecurityManager, - externalShuffleClient: Option[ExternalShuffleClient]) => { - new BlockManager( - executorId, - rpcEnv, - master, - serializerManager, - conf, - memoryManager, - mapOutputTracker, - shuffleManager, - blockTransferService, - securityManager, - externalShuffleClient) { - - override def readDiskBlockFromSameHostExecutor( - blockId: BlockId, - localDirs: Array[String], - blockSize: Long): Option[ManagedBuffer] = { - val res = super.readDiskBlockFromSameHostExecutor(blockId, localDirs, blockSize) - assert(res.isDefined) - // delete the file behind the blockId - ExecutorDiskReader.getFile(localDirs, subDirsPerLocalDir, blockId.name).delete() - sameHostExecutorTried = true - res - } - } - } - - val store1 = makeBlockManager( - 8000, - "executor1", - this.master, - Some(transferServiceAfterLocalAccess), - blockManagerFactory = blockManagerWithDeleteFactory) + val store1 = makeBlockManager(8000, "executor1", this.master, Some(mockTransferService)) + val spiedStore1 = spy(store1) + doAnswer { inv => + val blockId = inv.getArguments()(0).asInstanceOf[BlockId] + val localDirs = inv.getArguments()(1).asInstanceOf[Array[String]] + val blockSize = inv.getArguments()(2).asInstanceOf[Long] + val res = store1.readDiskBlockFromSameHostExecutor(blockId, localDirs, blockSize) + assert(res.isDefined) + // delete the file behind the blockId + ExecutorDiskUtils.getFile(localDirs, store1.subDirsPerLocalDir, blockId.name).delete() + sameHostExecutorTried = true + res + }.when(spiedStore1).readDiskBlockFromSameHostExecutor(mc.any(), mc.any(), mc.any()) if (getValueOrBytes) { - val valuesViaStore1 = store1.getRemoteValues(blockId) - assert(valuesViaStore1.isDefined, "list expected to be accessed") + val valuesViaStore1 = spiedStore1.getRemoteValues(blockId) + assert(sameHostExecutorTried) + assert(valuesViaStore1.isDefined) assert(valuesViaStore1.get.data.toList.head === array) } else { - val bytesViaStore1 = store1.getRemoteBytes(blockId) - assert(bytesViaStore1.isDefined, "list expected to be accessed") + val bytesViaStore1 = spiedStore1.getRemoteBytes(blockId) + assert(sameHostExecutorTried) + assert(bytesViaStore1.isDefined) assert(bytesViaStore1.get.toByteBuffer === expectedByteBuffer) } } From 02c213e7fb717a2f4f8fa19c22a9418ab5203dce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Sun, 2 Jun 2019 19:25:05 +0200 Subject: [PATCH 08/11] store different values in test data --- .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 0c328d527d66b..4b97c6f2ac8b6 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -613,7 +613,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val store2 = makeBlockManager(8000, "executor2", this.master, Some(new MockBlockTransferService(0))) val blockId = "list" - val array = new Array[Byte](4000) + val array = (0 to 4000).map(_ % Byte.MaxValue).toArray store2.putIterator(blockId, List(array).iterator, level, true) val expectedBlockData = store2.getLocalBytes(blockId) assert(expectedBlockData.isDefined) From 78e360c558bee9932131d06c4a1bf2fcfd67535d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Sun, 2 Jun 2019 23:47:49 +0200 Subject: [PATCH 09/11] fix tests --- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../spark/storage/BlockManagerMasterEndpoint.scala | 13 +++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index adf456c2bcfd7..114133c64f501 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -883,7 +883,7 @@ private[spark] class BlockManager( Some(block) }) } - logDebug(s"Read $blockId from the disk of a same host executor is " + + logInfo(s"Read $blockId from the disk of a same host executor is " + (if (res.isDefined) "successful." else "failed.")) res }.orElse { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index b749b33bc93b4..edbf1551ae97c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -520,15 +520,20 @@ class BlockManagerMasterEndpoint( if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { Option(blockStatusByShuffleService(bmId).get(blockId)) } else { - blockManagerInfo(bmId).getStatus(blockId) + blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId)) } } if (locations.nonEmpty && status.isDefined) { val localDirs = locations.find { loc => - loc.host == requesterHost && loc.port != externalShuffleServicePort - val status = blockManagerInfo(loc).getStatus(blockId) - status.isDefined && status.get.storageLevel.useDisk + if (loc.port != externalShuffleServicePort && loc.host == requesterHost) { + blockManagerInfo + .get(loc) + .flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk)) + .getOrElse(false) + } else { + false + } }.map(blockManagerInfo(_).localDirs) Some(BlockLocationsAndStatus(locations, status.get, localDirs)) } else { From 49f950893aa2d41bfddb7218613f070eefb9ba9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Tue, 4 Jun 2019 22:33:05 +0200 Subject: [PATCH 10/11] applying fixes for Vanzin's comment --- .../spark/storage/BlockManagerSuite.scala | 50 +++++++++++-------- .../org/apache/spark/util/UtilsSuite.scala | 21 +------- 2 files changed, 31 insertions(+), 40 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 4b97c6f2ac8b6..894581d22d4b5 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -570,35 +570,43 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE Seq( StorageLevel(useDisk = true, useMemory = false, deserialized = false), - StorageLevel(useDisk = true, useMemory = false, deserialized = true) + StorageLevel(useDisk = true, useMemory = false, deserialized = true), + StorageLevel(useDisk = true, useMemory = false, deserialized = true, replication = 2) ).foreach { storageLevel => test(s"SPARK-27622: avoid the network when block requested from same host, $storageLevel") { conf.set("spark.shuffle.io.maxRetries", "0") - val noFetcher = new MockBlockTransferService(0) { - override def fetchBlockSync( - host: String, - port: Int, - execId: String, - blockId: String, - tempFileManager: DownloadFileManager): ManagedBuffer = { - fail("Fetching over network is not expected when the block is requested from same host") - } - } - val store1 = makeBlockManager(8000, "executor1", this.master, Some(noFetcher)) - val store2 = makeBlockManager(8000, "executor2", this.master, Some(noFetcher)) + val sameHostBm = makeBlockManager(8000, "sameHost", master) + + val otherHostTransferSrv = spy(sameHostBm.blockTransferService) + doAnswer { _ => + "otherHost" + }.when(otherHostTransferSrv).hostName + val otherHostBm = makeBlockManager(8000, "otherHost", master, Some(otherHostTransferSrv)) + + // This test always uses the cleanBm to get the block. In case of replication + // the block can be added to the otherHostBm as direct disk read will use + // the local disk of sameHostBm where the block is replicated to. + // When there is no replication then block must be added via sameHostBm directly. + val bmToPutBlock = if (storageLevel.replication > 1) otherHostBm else sameHostBm + val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten val blockId = "list" - val array = new Array[Byte](4000) - store2.putIterator(blockId, List(array).iterator, storageLevel, true) + bmToPutBlock.putIterator(blockId, List(array).iterator, storageLevel, tellMaster = true) + + val sameHostTransferSrv = spy(sameHostBm.blockTransferService) + doAnswer { _ => + fail("Fetching over network is not expected when the block is requested from same host") + }.when(sameHostTransferSrv).fetchBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) + val cleanBm = makeBlockManager(8000, "clean", master, Some(sameHostTransferSrv)) // check getRemoteBytes - val bytesViaStore1 = store1.getRemoteBytes(blockId) - assert(bytesViaStore1.isDefined, "list expected to be accessed") - val expectedContent = store2.getBlockData(blockId).nioByteBuffer().array() + val bytesViaStore1 = cleanBm.getRemoteBytes(blockId) + assert(bytesViaStore1.isDefined) + val expectedContent = sameHostBm.getBlockData(blockId).nioByteBuffer().array() assert(bytesViaStore1.get.toArray === expectedContent) // check getRemoteValues - val valueViaStore1 = store1.getRemoteValues[List.type](blockId) - assert(valueViaStore1.isDefined, "list expected to be accessed") + val valueViaStore1 = cleanBm.getRemoteValues[List.type](blockId) + assert(valueViaStore1.isDefined) assert(valueViaStore1.get.data.toList.head === array) } } @@ -613,7 +621,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val store2 = makeBlockManager(8000, "executor2", this.master, Some(new MockBlockTransferService(0))) val blockId = "list" - val array = (0 to 4000).map(_ % Byte.MaxValue).toArray + val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten store2.putIterator(blockId, List(array).iterator, level, true) val expectedBlockData = store2.getLocalBytes(blockId) assert(expectedBlockData.isDefined) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index df14d2cf5cb4e..d2d9eb06339cb 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.util -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File, FileInputStream, FileOutputStream, InputStream, PrintStream, SequenceInputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File, + FileOutputStream, InputStream, PrintStream, SequenceInputStream} import java.lang.{Double => JDouble, Float => JFloat} import java.lang.reflect.Field import java.net.{BindException, ServerSocket, URI} @@ -1309,24 +1310,6 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.trimExceptCRLF(s"b${s}b") === s"b${s}b") } } - - test("deleting an already opened file does not interrupt the reading process") { - withTempDir { dir => - val testFile = Utils.tempFileWith(dir) - Utils.tryWithResource(new FileOutputStream(testFile)) { outputStream => - (1 to 1000).foreach { index => - outputStream.write((42 + index) % 256) - } - } - Utils.tryWithResource(new FileInputStream(testFile)) { inputStream => - // deleting the file - testFile.delete() - (1 to 1000).foreach { index => - assert(inputStream.read() === (index + 42) % 256) - } - } - } - } } private class SimpleExtension From 261fc767a23061844e32b1a7ec0e8fd58c42e12c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Fri, 7 Jun 2019 19:58:02 +0200 Subject: [PATCH 11/11] fixes --- .../org/apache/spark/storage/BlockManager.scala | 16 ++++++++-------- .../apache/spark/storage/BlockManagerSuite.scala | 3 ++- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 114133c64f501..990f92aa860a7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -29,7 +29,7 @@ import scala.collection.mutable.HashMap import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.reflect.ClassTag -import scala.util.{Random, Try} +import scala.util.Random import scala.util.control.NonFatal import com.codahale.metrics.{MetricRegistry, MetricSet} @@ -875,13 +875,13 @@ private[spark] class BlockManager( val blockDataOption = readDiskBlockFromSameHostExecutor(blockId, localDirs, locationsAndStatus.status.diskSize) val res = blockDataOption.flatMap { blockData => - Try(bufferTransformer(blockData)) - .fold( { throwable => - logDebug("Block from the same host executor cannot be opened: ", throwable) + try { + Some(bufferTransformer(blockData)) + } catch { + case NonFatal(e) => + logDebug("Block from the same host executor cannot be opened: ", e) None - }, { block => - Some(block) - }) + } } logInfo(s"Read $blockId from the disk of a same host executor is " + (if (res.isDefined) "successful." else "failed.")) @@ -1012,7 +1012,7 @@ private[spark] class BlockManager( new EncryptedBlockData(file, blockSize, conf, key)) case _ => - val transportConf = SparkTransportConf.fromSparkConf(conf, "files") + val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle") new FileSegmentManagedBuffer(transportConf, file, 0, file.length) } Some(mangedBuffer) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 894581d22d4b5..2d6e151f81150 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -646,8 +646,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockSize = inv.getArguments()(2).asInstanceOf[Long] val res = store1.readDiskBlockFromSameHostExecutor(blockId, localDirs, blockSize) assert(res.isDefined) + val file = ExecutorDiskUtils.getFile(localDirs, store1.subDirsPerLocalDir, blockId.name) // delete the file behind the blockId - ExecutorDiskUtils.getFile(localDirs, store1.subDirsPerLocalDir, blockId.name).delete() + assert(file.delete()) sameHostExecutorTried = true res }.when(spiedStore1).readDiskBlockFromSameHostExecutor(mc.any(), mc.any(), mc.any())