diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java new file mode 100644 index 000000000000..13f6046dd856 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.File; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.spark.network.util.JavaUtils; + +public class ExecutorDiskUtils { + + private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}"); + + /** + * Hashes a filename into the corresponding local directory, in a manner consistent with + * Spark's DiskBlockManager.getFile(). + */ + public static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) { + int hash = JavaUtils.nonNegativeHash(filename); + String localDir = localDirs[hash % localDirs.length]; + int subDirId = (hash / localDirs.length) % subDirsPerLocalDir; + return new File(createNormalizedInternedPathname( + localDir, String.format("%02x", subDirId), filename)); + } + + /** + * This method is needed to avoid the situation when multiple File instances for the + * same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String. + * According to measurements, in some scenarios such duplicate strings may waste a lot + * of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that + * we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise, + * the internal code in java.io.File would normalize it later, creating a new "foo/bar" + * String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File + * uses, since it is in the package-private class java.io.FileSystem. + */ + @VisibleForTesting + static String createNormalizedInternedPathname(String dir1, String dir2, String fname) { + String pathname = dir1 + File.separator + dir2 + File.separator + fname; + Matcher m = MULTIPLE_SEPARATORS.matcher(pathname); + pathname = m.replaceAll("/"); + // A single trailing slash needs to be taken care of separately + if (pathname.length() > 1 && pathname.endsWith("/")) { + pathname = pathname.substring(0, pathname.length() - 1); + } + return pathname.intern(); + } + +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 87e6fe12b59f..50f16fc700f1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -24,7 +24,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.regex.Matcher; import java.util.regex.Pattern; import com.fasterxml.jackson.annotation.JsonCreator; @@ -298,7 +297,7 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) { */ private ManagedBuffer getSortBasedShuffleBlockData( ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { - File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, + File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.index"); try { @@ -306,7 +305,7 @@ private ManagedBuffer getSortBasedShuffleBlockData( ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId); return new FileSegmentManagedBuffer( conf, - getFile(executor.localDirs, executor.subDirsPerLocalDir, + ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.data"), shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength()); @@ -317,7 +316,7 @@ private ManagedBuffer getSortBasedShuffleBlockData( public ManagedBuffer getDiskPersistedRddBlockData( ExecutorShuffleInfo executor, int rddId, int splitIndex) { - File file = getFile(executor.localDirs, executor.subDirsPerLocalDir, + File file = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, "rdd_" + rddId + "_" + splitIndex); long fileLength = file.length(); ManagedBuffer res = null; @@ -327,19 +326,6 @@ public ManagedBuffer getDiskPersistedRddBlockData( return res; } - /** - * Hashes a filename into the corresponding local directory, in a manner consistent with - * Spark's DiskBlockManager.getFile(). - */ - @VisibleForTesting - static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) { - int hash = JavaUtils.nonNegativeHash(filename); - String localDir = localDirs[hash % localDirs.length]; - int subDirId = (hash / localDirs.length) % subDirsPerLocalDir; - return new File(createNormalizedInternedPathname( - localDir, String.format("%02x", subDirId), filename)); - } - void close() { if (db != null) { try { @@ -350,28 +336,6 @@ void close() { } } - /** - * This method is needed to avoid the situation when multiple File instances for the - * same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String. - * According to measurements, in some scenarios such duplicate strings may waste a lot - * of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that - * we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise, - * the internal code in java.io.File would normalize it later, creating a new "foo/bar" - * String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File - * uses, since it is in the package-private class java.io.FileSystem. - */ - @VisibleForTesting - static String createNormalizedInternedPathname(String dir1, String dir2, String fname) { - String pathname = dir1 + File.separator + dir2 + File.separator + fname; - Matcher m = MULTIPLE_SEPARATORS.matcher(pathname); - pathname = m.replaceAll("/"); - // A single trailing slash needs to be taken care of separately - if (pathname.length() > 1 && pathname.endsWith("/")) { - pathname = pathname.substring(0, pathname.length() - 1); - } - return pathname.intern(); - } - public int removeBlocks(String appId, String execId, String[] blockIds) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { @@ -380,7 +344,8 @@ public int removeBlocks(String appId, String execId, String[] blockIds) { } int numRemovedBlocks = 0; for (String blockId : blockIds) { - File file = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); + File file = + ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); if (file.delete()) { numRemovedBlocks++; } else { diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 459629c5f05f..09eb699be305 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -149,7 +149,7 @@ public void testNormalizeAndInternPathname() { private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) { String normPathname = - ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3); + ExecutorDiskUtils.createNormalizedInternedPathname(p1, p2, p3); assertEquals(expectedPathname, normPathname); File file = new File(normPathname); String returnedPath = file.getPath(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 10be95ec50c3..457805feeac4 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -76,9 +76,9 @@ public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) thr try { dataStream = new FileOutputStream( - ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); + ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); indexStream = new DataOutputStream(new FileOutputStream( - ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index"))); + ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".index"))); long offset = 0; indexStream.writeLong(offset); @@ -121,10 +121,11 @@ private void insertFile(String filename) throws IOException { private void insertFile(String filename, byte[] block) throws IOException { OutputStream dataStream = null; - File file = ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, filename); + File file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename); assert(!file.exists()) : "this test file has been already generated"; try { - dataStream = new FileOutputStream(file); + dataStream = new FileOutputStream( + ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename)); dataStream.write(block); } finally { Closeables.close(dataStream, false); diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6dbef784fe0c..990f92aa860a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -43,8 +43,9 @@ import org.apache.spark.internal.config.Network import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source import org.apache.spark.network._ -import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.client.StreamCallbackWithID +import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.network.util.TransportConf @@ -138,6 +139,8 @@ private[spark] class BlockManager( private val remoteReadNioBufferConversion = conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION) + private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES) + val diskBlockManager = { // Only perform cleanup if an external service is not serving our shuffle files. val deleteFilesOnStop = @@ -411,6 +414,7 @@ private[spark] class BlockManager( val idFromMaster = master.registerBlockManager( id, + diskBlockManager.localDirsString, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) @@ -445,7 +449,7 @@ private[spark] class BlockManager( private def registerWithExternalShuffleServer() { logInfo("Registering executor with local external shuffle service.") val shuffleConfig = new ExecutorShuffleInfo( - diskBlockManager.localDirs.map(_.toString), + diskBlockManager.localDirsString, diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName) @@ -500,7 +504,8 @@ private[spark] class BlockManager( def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo(s"BlockManager $blockManagerId re-registering with master") - master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) + master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory, + maxOffHeapMemory, slaveEndpoint) reportAllBlocks() } @@ -827,10 +832,63 @@ private[spark] class BlockManager( */ private[spark] def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { val ct = implicitly[ClassTag[T]] - getRemoteManagedBuffer(blockId).map { data => + getRemoteBlock(blockId, (data: ManagedBuffer) => { val values = serializerManager.dataDeserializeStream(blockId, data.createInputStream())(ct) new BlockResult(values, DataReadMethod.Network, data.size) + }) + } + + /** + * Get the remote block and transform it to the provided data type. + * + * If the block is persisted to the disk and stored at an executor running on the same host then + * first it is tried to be accessed using the local directories of the other executor directly. + * If the file is successfully identified then tried to be transformed by the provided + * transformation function which expected to open the file. If there is any exception during this + * transformation then block access falls back to fetching it from the remote executor via the + * network. + * + * @param blockId identifies the block to get + * @param bufferTransformer this transformer expected to open the file if the block is backed by a + * file by this it is guaranteed the whole content can be loaded + * @tparam T result type + * @return + */ + private[spark] def getRemoteBlock[T]( + blockId: BlockId, + bufferTransformer: ManagedBuffer => T): Option[T] = { + logDebug(s"Getting remote block $blockId") + require(blockId != null, "BlockId is null") + + // Because all the remote blocks are registered in driver, it is not necessary to ask + // all the slave executors to get block status. + val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host) + if (locationsAndStatusOption.isEmpty) { + logDebug(s"Block $blockId is unknown by block manager master") + None + } else { + val locationsAndStatus = locationsAndStatusOption.get + val blockSize = locationsAndStatus.status.diskSize.max(locationsAndStatus.status.memSize) + + locationsAndStatus.localDirs.flatMap { localDirs => + val blockDataOption = + readDiskBlockFromSameHostExecutor(blockId, localDirs, locationsAndStatus.status.diskSize) + val res = blockDataOption.flatMap { blockData => + try { + Some(bufferTransformer(blockData)) + } catch { + case NonFatal(e) => + logDebug("Block from the same host executor cannot be opened: ", e) + None + } + } + logInfo(s"Read $blockId from the disk of a same host executor is " + + (if (res.isDefined) "successful." else "failed.")) + res + }.orElse { + fetchRemoteManagedBuffer(blockId, blockSize, locationsAndStatus).map(bufferTransformer) + } } } @@ -861,22 +919,12 @@ private[spark] class BlockManager( } /** - * Get block from remote block managers as a ManagedBuffer. + * Fetch the block from remote block managers as a ManagedBuffer. */ - private def getRemoteManagedBuffer(blockId: BlockId): Option[ManagedBuffer] = { - logDebug(s"Getting remote block $blockId") - require(blockId != null, "BlockId is null") - var runningFailureCount = 0 - var totalFailureCount = 0 - - // Because all the remote blocks are registered in driver, it is not necessary to ask - // all the slave executors to get block status. - val locationsAndStatus = master.getLocationsAndStatus(blockId) - val blockSize = locationsAndStatus.map { b => - b.status.diskSize.max(b.status.memSize) - }.getOrElse(0L) - val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty) - + private def fetchRemoteManagedBuffer( + blockId: BlockId, + blockSize: Long, + locationsAndStatus: BlockManagerMessages.BlockLocationsAndStatus): Option[ManagedBuffer] = { // If the block size is above the threshold, we should pass our FileManger to // BlockTransferService, which will leverage it to spill the block; if not, then passed-in // null value means the block will be persisted in memory. @@ -885,8 +933,9 @@ private[spark] class BlockManager( } else { null } - - val locations = sortLocations(blockLocations) + var runningFailureCount = 0 + var totalFailureCount = 0 + val locations = sortLocations(locationsAndStatus.locations) val maxFetchFailures = locations.size var locationIterator = locations.iterator while (locationIterator.hasNext) { @@ -946,11 +995,37 @@ private[spark] class BlockManager( None } + /** + * Reads the block from the local directories of another executor which runs on the same host. + */ + private[spark] def readDiskBlockFromSameHostExecutor( + blockId: BlockId, + localDirs: Array[String], + blockSize: Long): Option[ManagedBuffer] = { + val file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId.name) + if (file.exists()) { + val mangedBuffer = securityManager.getIOEncryptionKey() match { + case Some(key) => + // Encrypted blocks cannot be memory mapped; return a special object that does decryption + // and provides InputStream / FileRegion implementations for reading the data. + new EncryptedManagedBuffer( + new EncryptedBlockData(file, blockSize, conf, key)) + + case _ => + val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle") + new FileSegmentManagedBuffer(transportConf, file, 0, file.length) + } + Some(mangedBuffer) + } else { + None + } + } + /** * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { - getRemoteManagedBuffer(blockId).map { data => + getRemoteBlock(blockId, (data: ManagedBuffer) => { // SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to // ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if // new path is stable. @@ -959,7 +1034,7 @@ private[spark] class BlockManager( } else { ChunkedByteBuffer.fromManagedBuffer(data) } - } + }) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index b18d38fe5253..96bd2e3a5e0e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -56,13 +56,14 @@ class BlockManagerMaster( * updated BlockManagerId fleshed out with this information. */ def registerBlockManager( - blockManagerId: BlockManagerId, + id: BlockManagerId, + localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { - logInfo(s"Registering BlockManager $blockManagerId") + logInfo(s"Registering BlockManager $id") val updatedId = driverEndpoint.askSync[BlockManagerId]( - RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) + RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") updatedId } @@ -85,9 +86,11 @@ class BlockManagerMaster( } /** Get locations as well as status of the blockId from the driver */ - def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = { + def getLocationsAndStatus( + blockId: BlockId, + requesterHost: String): Option[BlockLocationsAndStatus] = { driverEndpoint.askSync[Option[BlockLocationsAndStatus]]( - GetLocationsAndStatus(blockId)) + GetLocationsAndStatus(blockId, requesterHost)) } /** Get locations of multiple blockIds from the driver */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 65ec1c3f0dc6..edbf1551ae97 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -86,8 +86,8 @@ class BlockManagerMasterEndpoint( private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf) override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => - context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) + case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => + context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => @@ -97,8 +97,8 @@ class BlockManagerMasterEndpoint( case GetLocations(blockId) => context.reply(getLocations(blockId)) - case GetLocationsAndStatus(blockId) => - context.reply(getLocationsAndStatus(blockId)) + case GetLocationsAndStatus(blockId, requesterHost) => + context.reply(getLocationsAndStatus(blockId, requesterHost)) case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds)) @@ -410,6 +410,7 @@ class BlockManagerMasterEndpoint( */ private def register( idWithoutTopologyInfo: BlockManagerId, + localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { @@ -445,8 +446,8 @@ class BlockManagerMasterEndpoint( None } - blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxOnHeapMemSize, - maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus) + blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), localDirs, + maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) @@ -511,18 +512,30 @@ class BlockManagerMasterEndpoint( if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty } - private def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = { + private def getLocationsAndStatus( + blockId: BlockId, + requesterHost: String): Option[BlockLocationsAndStatus] = { val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty) val status = locations.headOption.flatMap { bmId => if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { Option(blockStatusByShuffleService(bmId).get(blockId)) } else { - blockManagerInfo(bmId).getStatus(blockId) + blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId)) } } if (locations.nonEmpty && status.isDefined) { - Some(BlockLocationsAndStatus(locations, status.get)) + val localDirs = locations.find { loc => + if (loc.port != externalShuffleServicePort && loc.host == requesterHost) { + blockManagerInfo + .get(loc) + .flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk)) + .getOrElse(false) + } else { + false + } + }.map(blockManagerInfo(_).localDirs) + Some(BlockLocationsAndStatus(locations, status.get, localDirs)) } else { None } @@ -573,6 +586,7 @@ object BlockStatus { private[spark] class BlockManagerInfo( val blockManagerId: BlockManagerId, timeMs: Long, + val localDirs: Array[String], val maxOnHeapMem: Long, val maxOffHeapMem: Long, val slaveEndpoint: RpcEndpointRef, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 3dbac694cf81..219afaf0792f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -58,6 +58,7 @@ private[spark] object BlockManagerMessages { case class RegisterBlockManager( blockManagerId: BlockManagerId, + localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, sender: RpcEndpointRef) @@ -93,10 +94,20 @@ private[spark] object BlockManagerMessages { case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster - case class GetLocationsAndStatus(blockId: BlockId) extends ToBlockManagerMaster + case class GetLocationsAndStatus(blockId: BlockId, requesterHost: String) + extends ToBlockManagerMaster - // The response message of `GetLocationsAndStatus` request. - case class BlockLocationsAndStatus(locations: Seq[BlockManagerId], status: BlockStatus) { + /** + * The response message of `GetLocationsAndStatus` request. + * + * @param localDirs if it is persisted-to-disk on the same host as the requester executor is + * running on then localDirs will be Some and the cached data will be in a file + * in one of those dirs, otherwise it is None. + */ + case class BlockLocationsAndStatus( + locations: Seq[BlockManagerId], + status: BlockStatus, + localDirs: Option[Array[String]]) { assert(locations.nonEmpty) } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 95ce4b0f09f5..c3990bf71e60 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -44,6 +44,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } + + private[spark] val localDirsString: Array[String] = localDirs.map(_.toString) + // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) @@ -52,7 +55,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with - // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). + // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile(). def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala index 8df123250303..7dcbfa0103a9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala @@ -31,6 +31,7 @@ class BlockManagerInfoSuite extends SparkFunSuite { val bmInfo = new BlockManagerInfo( BlockManagerId("executor0", "host", 1234, None), timeMs = 300, + Array(), maxOnHeapMem = 10000, maxOffHeapMem = 20000, slaveEndpoint = null, diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ab4693e86b3a..2d6e151f8115 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -29,7 +29,8 @@ import scala.reflect.ClassTag import org.apache.commons.lang3.RandomUtils import org.mockito.{ArgumentMatchers => mc} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{doAnswer, mock, spy, times, verify, when} +import org.mockito.invocation.InvocationOnMock import org.scalatest._ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ @@ -46,7 +47,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} -import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExternalShuffleClient} +import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalShuffleClient} import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus @@ -567,6 +568,114 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getRemoteBytes("list1").isEmpty) } + Seq( + StorageLevel(useDisk = true, useMemory = false, deserialized = false), + StorageLevel(useDisk = true, useMemory = false, deserialized = true), + StorageLevel(useDisk = true, useMemory = false, deserialized = true, replication = 2) + ).foreach { storageLevel => + test(s"SPARK-27622: avoid the network when block requested from same host, $storageLevel") { + conf.set("spark.shuffle.io.maxRetries", "0") + val sameHostBm = makeBlockManager(8000, "sameHost", master) + + val otherHostTransferSrv = spy(sameHostBm.blockTransferService) + doAnswer { _ => + "otherHost" + }.when(otherHostTransferSrv).hostName + val otherHostBm = makeBlockManager(8000, "otherHost", master, Some(otherHostTransferSrv)) + + // This test always uses the cleanBm to get the block. In case of replication + // the block can be added to the otherHostBm as direct disk read will use + // the local disk of sameHostBm where the block is replicated to. + // When there is no replication then block must be added via sameHostBm directly. + val bmToPutBlock = if (storageLevel.replication > 1) otherHostBm else sameHostBm + val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten + val blockId = "list" + bmToPutBlock.putIterator(blockId, List(array).iterator, storageLevel, tellMaster = true) + + val sameHostTransferSrv = spy(sameHostBm.blockTransferService) + doAnswer { _ => + fail("Fetching over network is not expected when the block is requested from same host") + }.when(sameHostTransferSrv).fetchBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) + val cleanBm = makeBlockManager(8000, "clean", master, Some(sameHostTransferSrv)) + + // check getRemoteBytes + val bytesViaStore1 = cleanBm.getRemoteBytes(blockId) + assert(bytesViaStore1.isDefined) + val expectedContent = sameHostBm.getBlockData(blockId).nioByteBuffer().array() + assert(bytesViaStore1.get.toArray === expectedContent) + + // check getRemoteValues + val valueViaStore1 = cleanBm.getRemoteValues[List.type](blockId) + assert(valueViaStore1.isDefined) + assert(valueViaStore1.get.data.toList.head === array) + } + } + + private def testWithFileDelAfterLocalDiskRead(level: StorageLevel, getValueOrBytes: Boolean) = { + val testedFunc = if (getValueOrBytes) "getRemoteValue()" else "getRemoteBytes()" + val testNameSuffix = s"$level, $testedFunc" + test(s"SPARK-27622: as file is removed fall back to network fetch, $testNameSuffix") { + conf.set("spark.shuffle.io.maxRetries", "0") + // variable to check the usage of the local disk of the remote executor on the same host + var sameHostExecutorTried: Boolean = false + val store2 = makeBlockManager(8000, "executor2", this.master, + Some(new MockBlockTransferService(0))) + val blockId = "list" + val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten + store2.putIterator(blockId, List(array).iterator, level, true) + val expectedBlockData = store2.getLocalBytes(blockId) + assert(expectedBlockData.isDefined) + val expectedByteBuffer = expectedBlockData.get.toByteBuffer() + val mockTransferService = new MockBlockTransferService(0) { + override def fetchBlockSync( + host: String, + port: Int, + execId: String, + blockId: String, + tempFileManager: DownloadFileManager): ManagedBuffer = { + assert(sameHostExecutorTried, "before using the network local disk of the remote " + + "executor (running on the same host) is expected to be tried") + new NioManagedBuffer(expectedByteBuffer) + } + } + val store1 = makeBlockManager(8000, "executor1", this.master, Some(mockTransferService)) + val spiedStore1 = spy(store1) + doAnswer { inv => + val blockId = inv.getArguments()(0).asInstanceOf[BlockId] + val localDirs = inv.getArguments()(1).asInstanceOf[Array[String]] + val blockSize = inv.getArguments()(2).asInstanceOf[Long] + val res = store1.readDiskBlockFromSameHostExecutor(blockId, localDirs, blockSize) + assert(res.isDefined) + val file = ExecutorDiskUtils.getFile(localDirs, store1.subDirsPerLocalDir, blockId.name) + // delete the file behind the blockId + assert(file.delete()) + sameHostExecutorTried = true + res + }.when(spiedStore1).readDiskBlockFromSameHostExecutor(mc.any(), mc.any(), mc.any()) + + if (getValueOrBytes) { + val valuesViaStore1 = spiedStore1.getRemoteValues(blockId) + assert(sameHostExecutorTried) + assert(valuesViaStore1.isDefined) + assert(valuesViaStore1.get.data.toList.head === array) + } else { + val bytesViaStore1 = spiedStore1.getRemoteBytes(blockId) + assert(sameHostExecutorTried) + assert(bytesViaStore1.isDefined) + assert(bytesViaStore1.get.toByteBuffer === expectedByteBuffer) + } + } + } + + Seq( + StorageLevel(useDisk = true, useMemory = false, deserialized = false), + StorageLevel(useDisk = true, useMemory = false, deserialized = true) + ).foreach { storageLevel => + Seq(true, false).foreach { valueOrBytes => + testWithFileDelAfterLocalDiskRead(storageLevel, valueOrBytes) + } + } + test("SPARK-14252: getOrElseUpdate should still read from remote storage") { val store = makeBlockManager(8000, "executor1") val store2 = makeBlockManager(8000, "executor2") @@ -1315,8 +1424,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // so that we have a chance to do location refresh val blockManagerIds = (0 to maxFailuresBeforeLocationRefresh) .map { i => BlockManagerId(s"id-$i", s"host-$i", i + 1) } - when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn( - Option(BlockLocationsAndStatus(blockManagerIds, BlockStatus.empty))) + when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId], mc.any[String])).thenReturn( + Option(BlockLocationsAndStatus(blockManagerIds, BlockStatus.empty, None))) when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn( blockManagerIds) @@ -1325,7 +1434,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val block = store.getRemoteBytes("item") .asInstanceOf[Option[ByteBuffer]] assert(block.isDefined) - verify(mockBlockManagerMaster, times(1)).getLocationsAndStatus("item") + verify(mockBlockManagerMaster, times(1)) + .getLocationsAndStatus("item", "MockBlockTransferServiceHost") verify(mockBlockManagerMaster, times(1)).getLocations("item") } @@ -1502,8 +1612,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockLocations = Seq(BlockManagerId("id-0", "host-0", 1)) val blockStatus = BlockStatus(StorageLevel.DISK_ONLY, 0L, 2000L) - when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn( - Option(BlockLocationsAndStatus(blockLocations, blockStatus))) + when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId], mc.any[String])).thenReturn( + Option(BlockLocationsAndStatus(blockLocations, blockStatus, None))) when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockLocations) val store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,