Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.io.File;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.annotations.VisibleForTesting;

import org.apache.spark.network.util.JavaUtils;

public class ExecutorDiskUtils {

private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}");

/**
* Hashes a filename into the corresponding local directory, in a manner consistent with
* Spark's DiskBlockManager.getFile().
*/
public static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
int hash = JavaUtils.nonNegativeHash(filename);
String localDir = localDirs[hash % localDirs.length];
int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
return new File(createNormalizedInternedPathname(
localDir, String.format("%02x", subDirId), filename));
}

/**
* This method is needed to avoid the situation when multiple File instances for the
* same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String.
* According to measurements, in some scenarios such duplicate strings may waste a lot
* of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that
* we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise,
* the internal code in java.io.File would normalize it later, creating a new "foo/bar"
* String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File
* uses, since it is in the package-private class java.io.FileSystem.
*/
@VisibleForTesting
static String createNormalizedInternedPathname(String dir1, String dir2, String fname) {
String pathname = dir1 + File.separator + dir2 + File.separator + fname;
Matcher m = MULTIPLE_SEPARATORS.matcher(pathname);
pathname = m.replaceAll("/");
// A single trailing slash needs to be taken care of separately
if (pathname.length() > 1 && pathname.endsWith("/")) {
pathname = pathname.substring(0, pathname.length() - 1);
}
return pathname.intern();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -298,15 +297,15 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) {
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");

try {
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
Expand All @@ -317,7 +316,7 @@ private ManagedBuffer getSortBasedShuffleBlockData(

public ManagedBuffer getDiskPersistedRddBlockData(
ExecutorShuffleInfo executor, int rddId, int splitIndex) {
File file = getFile(executor.localDirs, executor.subDirsPerLocalDir,
File file = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"rdd_" + rddId + "_" + splitIndex);
long fileLength = file.length();
ManagedBuffer res = null;
Expand All @@ -327,19 +326,6 @@ public ManagedBuffer getDiskPersistedRddBlockData(
return res;
}

/**
* Hashes a filename into the corresponding local directory, in a manner consistent with
* Spark's DiskBlockManager.getFile().
*/
@VisibleForTesting
static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
int hash = JavaUtils.nonNegativeHash(filename);
String localDir = localDirs[hash % localDirs.length];
int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
return new File(createNormalizedInternedPathname(
localDir, String.format("%02x", subDirId), filename));
}

void close() {
if (db != null) {
try {
Expand All @@ -350,28 +336,6 @@ void close() {
}
}

/**
* This method is needed to avoid the situation when multiple File instances for the
* same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String.
* According to measurements, in some scenarios such duplicate strings may waste a lot
* of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that
* we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise,
* the internal code in java.io.File would normalize it later, creating a new "foo/bar"
* String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File
* uses, since it is in the package-private class java.io.FileSystem.
*/
@VisibleForTesting
static String createNormalizedInternedPathname(String dir1, String dir2, String fname) {
String pathname = dir1 + File.separator + dir2 + File.separator + fname;
Matcher m = MULTIPLE_SEPARATORS.matcher(pathname);
pathname = m.replaceAll("/");
// A single trailing slash needs to be taken care of separately
if (pathname.length() > 1 && pathname.endsWith("/")) {
pathname = pathname.substring(0, pathname.length() - 1);
}
return pathname.intern();
}

public int removeBlocks(String appId, String execId, String[] blockIds) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
Expand All @@ -380,7 +344,8 @@ public int removeBlocks(String appId, String execId, String[] blockIds) {
}
int numRemovedBlocks = 0;
for (String blockId : blockIds) {
File file = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
File file =
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
if (file.delete()) {
numRemovedBlocks++;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testNormalizeAndInternPathname() {

private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) {
String normPathname =
ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3);
ExecutorDiskUtils.createNormalizedInternedPathname(p1, p2, p3);
assertEquals(expectedPathname, normPathname);
File file = new File(normPathname);
String returnedPath = file.getPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) thr

try {
dataStream = new FileOutputStream(
ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
indexStream = new DataOutputStream(new FileOutputStream(
ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));

long offset = 0;
indexStream.writeLong(offset);
Expand Down Expand Up @@ -121,10 +121,11 @@ private void insertFile(String filename) throws IOException {

private void insertFile(String filename, byte[] block) throws IOException {
OutputStream dataStream = null;
File file = ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, filename);
File file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename);
assert(!file.exists()) : "this test file has been already generated";
try {
dataStream = new FileOutputStream(file);
dataStream = new FileOutputStream(
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename));
dataStream.write(block);
} finally {
Closeables.close(dataStream, false);
Expand Down
121 changes: 98 additions & 23 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ import org.apache.spark.internal.config.Network
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.metrics.source.Source
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.client.StreamCallbackWithID
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle._
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.network.util.TransportConf
Expand Down Expand Up @@ -138,6 +139,8 @@ private[spark] class BlockManager(
private val remoteReadNioBufferConversion =
conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION)

private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES)

val diskBlockManager = {
// Only perform cleanup if an external service is not serving our shuffle files.
val deleteFilesOnStop =
Expand Down Expand Up @@ -411,6 +414,7 @@ private[spark] class BlockManager(

val idFromMaster = master.registerBlockManager(
id,
diskBlockManager.localDirsString,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)
Expand Down Expand Up @@ -445,7 +449,7 @@ private[spark] class BlockManager(
private def registerWithExternalShuffleServer() {
logInfo("Registering executor with local external shuffle service.")
val shuffleConfig = new ExecutorShuffleInfo(
diskBlockManager.localDirs.map(_.toString),
diskBlockManager.localDirsString,
diskBlockManager.subDirsPerLocalDir,
shuffleManager.getClass.getName)

Expand Down Expand Up @@ -500,7 +504,8 @@ private[spark] class BlockManager(
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint)
master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
maxOffHeapMemory, slaveEndpoint)
reportAllBlocks()
}

Expand Down Expand Up @@ -827,10 +832,63 @@ private[spark] class BlockManager(
*/
private[spark] def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
val ct = implicitly[ClassTag[T]]
getRemoteManagedBuffer(blockId).map { data =>
getRemoteBlock(blockId, (data: ManagedBuffer) => {
val values =
serializerManager.dataDeserializeStream(blockId, data.createInputStream())(ct)
new BlockResult(values, DataReadMethod.Network, data.size)
})
}

/**
* Get the remote block and transform it to the provided data type.
*
* If the block is persisted to the disk and stored at an executor running on the same host then
* first it is tried to be accessed using the local directories of the other executor directly.
* If the file is successfully identified then tried to be transformed by the provided
* transformation function which expected to open the file. If there is any exception during this
* transformation then block access falls back to fetching it from the remote executor via the
* network.
*
* @param blockId identifies the block to get
* @param bufferTransformer this transformer expected to open the file if the block is backed by a
* file by this it is guaranteed the whole content can be loaded
* @tparam T result type
* @return
*/
private[spark] def getRemoteBlock[T](
blockId: BlockId,
bufferTransformer: ManagedBuffer => T): Option[T] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")

// Because all the remote blocks are registered in driver, it is not necessary to ask
// all the slave executors to get block status.
val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host)
if (locationsAndStatusOption.isEmpty) {
logDebug(s"Block $blockId is unknown by block manager master")
None
} else {
val locationsAndStatus = locationsAndStatusOption.get
val blockSize = locationsAndStatus.status.diskSize.max(locationsAndStatus.status.memSize)

locationsAndStatus.localDirs.flatMap { localDirs =>
val blockDataOption =
readDiskBlockFromSameHostExecutor(blockId, localDirs, locationsAndStatus.status.diskSize)
val res = blockDataOption.flatMap { blockData =>
try {
Some(bufferTransformer(blockData))
} catch {
case NonFatal(e) =>
logDebug("Block from the same host executor cannot be opened: ", e)
None
}
}
logInfo(s"Read $blockId from the disk of a same host executor is " +
(if (res.isDefined) "successful." else "failed."))
res
}.orElse {
fetchRemoteManagedBuffer(blockId, blockSize, locationsAndStatus).map(bufferTransformer)
}
}
}

Expand Down Expand Up @@ -861,22 +919,12 @@ private[spark] class BlockManager(
}

/**
* Get block from remote block managers as a ManagedBuffer.
* Fetch the block from remote block managers as a ManagedBuffer.
*/
private def getRemoteManagedBuffer(blockId: BlockId): Option[ManagedBuffer] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
var totalFailureCount = 0

// Because all the remote blocks are registered in driver, it is not necessary to ask
// all the slave executors to get block status.
val locationsAndStatus = master.getLocationsAndStatus(blockId)
val blockSize = locationsAndStatus.map { b =>
b.status.diskSize.max(b.status.memSize)
}.getOrElse(0L)
val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)

private def fetchRemoteManagedBuffer(
blockId: BlockId,
blockSize: Long,
locationsAndStatus: BlockManagerMessages.BlockLocationsAndStatus): Option[ManagedBuffer] = {
// If the block size is above the threshold, we should pass our FileManger to
// BlockTransferService, which will leverage it to spill the block; if not, then passed-in
// null value means the block will be persisted in memory.
Expand All @@ -885,8 +933,9 @@ private[spark] class BlockManager(
} else {
null
}

val locations = sortLocations(blockLocations)
var runningFailureCount = 0
var totalFailureCount = 0
val locations = sortLocations(locationsAndStatus.locations)
val maxFetchFailures = locations.size
var locationIterator = locations.iterator
while (locationIterator.hasNext) {
Expand Down Expand Up @@ -946,11 +995,37 @@ private[spark] class BlockManager(
None
}

/**
* Reads the block from the local directories of another executor which runs on the same host.
*/
private[spark] def readDiskBlockFromSameHostExecutor(
blockId: BlockId,
localDirs: Array[String],
blockSize: Long): Option[ManagedBuffer] = {
val file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId.name)
if (file.exists()) {
val mangedBuffer = securityManager.getIOEncryptionKey() match {
case Some(key) =>
// Encrypted blocks cannot be memory mapped; return a special object that does decryption
// and provides InputStream / FileRegion implementations for reading the data.
new EncryptedManagedBuffer(
new EncryptedBlockData(file, blockSize, conf, key))

case _ =>
val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
}
Some(mangedBuffer)
} else {
None
}
}

/**
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
getRemoteManagedBuffer(blockId).map { data =>
getRemoteBlock(blockId, (data: ManagedBuffer) => {
// SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to
// ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if
// new path is stable.
Expand All @@ -959,7 +1034,7 @@ private[spark] class BlockManager(
} else {
ChunkedByteBuffer.fromManagedBuffer(data)
}
}
})
}

/**
Expand Down
Loading