Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0a2d0d7
Remove shuffle blocks using the shuffle service for released executors
Kimahriman Dec 22, 2021
92af8ae
Check for existing executor
Kimahriman Jan 4, 2022
2a0dfac
Fix to work through the context cleaner
Kimahriman Jan 12, 2022
1190470
Create shuffle files as group writable
Kimahriman Jan 14, 2022
d12b6b2
Make sure external shuffle is used and clean some things up
Kimahriman Jan 16, 2022
b764141
Create disk block dirs as group writable rather than files
Kimahriman Jan 16, 2022
b837362
Add test for block manager sub dir being group writable
Kimahriman Jan 16, 2022
f6b7560
Use the create dir with 770 helper
Kimahriman Jan 19, 2022
3689b9e
World readable block file approach
Kimahriman Jan 23, 2022
7be49a0
Create final shuffle files correctly with world readable
Kimahriman Jan 25, 2022
4e62191
Update LocalDiskSingleSpillMapOutputWriter.java
Kimahriman Jan 25, 2022
940a934
Fix tests and lint
Kimahriman Jan 26, 2022
e4a71af
Rework some conditional checks
Kimahriman Jan 29, 2022
2b4373f
Add feature flag
Kimahriman Feb 3, 2022
9698b09
Change config name
Kimahriman Feb 27, 2022
126955b
Only change permissions if removing shuffle through external shuffle …
Kimahriman Mar 17, 2022
cc79384
Update description, add to markdown, and reorder logic
Kimahriman Mar 17, 2022
02057b8
Enable settings for test
Kimahriman Mar 17, 2022
b55eb20
Add permission changing back to temp shuffle block
Kimahriman Mar 17, 2022
84f2929
Apply suggestions for comment change
Kimahriman Mar 18, 2022
61aa9f0
Fix typo, add negative test, and remove duplicate check
Kimahriman Mar 18, 2022
d34be20
Default config to false
Kimahriman Mar 18, 2022
3d2fd7b
Merge branch 'shuffle-service-remove-shuffle-blocks' of github.com:Ki…
Kimahriman Mar 18, 2022
ad0f9eb
Update permissions for RDD blocks if shuffle service fetching is enab…
Kimahriman Mar 22, 2022
c39cdf3
Update comments and use locking withMapStatuses
Kimahriman Mar 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,15 @@ public void onSuccess(ByteBuffer response) {
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks);
} catch (Throwable t) {
logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) +
logger.warn("Error trying to remove blocks " + Arrays.toString(blockIds) +
" via external shuffle service from executor: " + execId, t);
numRemovedBlocksFuture.complete(0);
}
}

@Override
public void onFailure(Throwable e) {
logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) +
logger.warn("Error trying to remove blocks " + Arrays.toString(blockIds) +
" via external shuffle service from executor: " + execId, e);
numRemovedBlocksFuture.complete(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.spark.internal.config.package$;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage;
import org.apache.spark.util.Utils;

/**
* Implementation of {@link ShuffleMapOutputWriter} that replicates the functionality of shuffle
Expand Down Expand Up @@ -87,7 +86,7 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I
}
lastPartitionId = reducePartitionId;
if (outputTempFile == null) {
outputTempFile = Utils.tempFileWith(outputFile);
outputTempFile = blockResolver.createTempFile(outputFile);
}
if (outputFileChannel != null) {
currChannelPosition = outputFileChannel.position();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
try {
if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
// Shuffle must be removed before it's unregistered from the output tracker
// to find blocks served by the shuffle service on deallocated executors
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this new pattern safe in all cases? Any change of FileNotFound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It hasn't failed any tests or caused problems in heavy usage for the past couple months. Not sure how else to know

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kimahriman .

  • Logically, this is counter intuitive, isn't it? New code is weaker than before.
  • I guess you are saying that your experience on your environment and it's usually not Apache Spark master or 3.3. Which forked version are you using now with this patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.2 with my own PRs backported (most of which are already merged and will be in 3.3).

I do know I need this change for this PR to work and I know why. I don't know how to create a test case for a scenario I don't even know about. If you can think of one knowing this code paths better please let me know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at unregisterShuffle I don't see anything that would depend on the ShuffleDriverComponents

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I agree that this would be better the other way from a logical viewpoint, it makes more sense to unregister before you remove in case it tried to use those shuffle locations and mid removal some were actually there and some were not. But in the context here normally it would get cleaned if nothing is referencing it anymore, so nothing could try to use it so it shouldn't matter the order. Now there is also the RDD.cleanShuffleDependencies though as well. The problem here I think is he needs map output tracker to figure out what removeShuffle removes. I haven't looked in detail but it might be better if unregistershuffle could return something that could then be passed to remove Shuffle for instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah anything trying to use either of these things after cleanup is called sounds like a bug that should be fixed. I'm not sure what the expected behavior of cleanShuffleDependencies is while something is actively trying to use the shuffle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the removeShuffle call goes through an RPC call from BlockManagerMaster to BlockManagerMasterEndpoint so I'm not sure how all that info could be passed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not sure it really matters, other then possibly the way it fails, either way if someone tries to use the shuffle mid way through the cleanup, things are going to fail. I would think this is fine.

listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logDebug("Cleaned shuffle " + shuffleId)
} else {
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ object SparkEnv extends Logging {
isLocal,
conf,
listenerBus,
if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) {
if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
externalShuffleClient
} else {
None
}, blockManagerInfo,
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], isDriver)),
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
shuffleManager,
isDriver)),
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,16 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED =
ConfigBuilder("spark.shuffle.service.removeShuffle")
.doc("Whether to use the ExternalShuffleService for deleting shuffle blocks for " +
"deallocated executors when the shuffle is no longer needed. Without this enabled, " +
"shuffle data on executors that are deallocated will remain on disk until the " +
"application ends.")
.version("3.3.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to discuss about this, @mridulm .
If this feature is not urgent, I'd like to recommend to have this master only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had to disable the external shuffle service for basically all of our large shuffling jobs until I got this working, and based on others interest and all the jira tickets that have been made over the years I'd consider this a critically missing feature. I'm fine disabling by default, especially if we try to limit the permission changes anymore, the more likely this will cause issues using the external shuffle service

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are concerns with stability, I am fine with pushing this to master only - in which case we can change all the versions to 3.4.0
Thoughts @tgravescs ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinion. I'm fine with whatever others think. It might be best to at least turn it off by default if it goes into 3.3.

.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_SERVICE_FETCH_RDD_ENABLED =
ConfigBuilder(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
.doc("Whether to use the ExternalShuffleService for fetching disk persisted RDD blocks. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ private[spark] class IndexShuffleBlockResolver(
shuffleFiles.map(_.length()).sum
}

/** Create a temporary file that will be renamed to the final resulting file */
def createTempFile(file: File): File = {
blockManager.diskBlockManager.createTempFileWith(file)
}

/**
* Get the shuffle data file.
*
Expand Down Expand Up @@ -234,7 +239,7 @@ private[spark] class IndexShuffleBlockResolver(
throw new IllegalStateException(s"Unexpected shuffle block transfer ${blockId} as " +
s"${blockId.getClass().getSimpleName()}")
}
val fileTmp = Utils.tempFileWith(file)
val fileTmp = createTempFile(file)
val channel = Channels.newChannel(
serializerManager.wrapStream(blockId,
new FileOutputStream(fileTmp)))
Expand Down Expand Up @@ -335,15 +340,15 @@ private[spark] class IndexShuffleBlockResolver(
checksums: Array[Long],
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
val indexTmp = createTempFile(indexFile)

val checksumEnabled = checksums.nonEmpty
val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) {
assert(lengths.length == checksums.length,
"The size of partition lengths and checksums should be equal")
val checksumFile =
getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
(Some(checksumFile), Some(Utils.tempFileWith(checksumFile)))
(Some(checksumFile), Some(createTempFile(checksumFile)))
} else {
(None, None)
}
Expand Down Expand Up @@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
}
}

override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the real data, there is a case where a shuffle index without its data file. So, did we consider those cases in the upper layers where this method is invoked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case the external shuffle service will just log unable to delete the file and continue. Is there anywhere to actually know that information?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historically, WorkerDecommission fails with FileNotFound exception because it didn't know about the reality. So, I want to make it sure that the upper layer of this method is safe or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test coverage for that case about the part what you mentioned ?

In that case the external shuffle service will just log unable to delete the file and continue. Is there anywhere to actually know that information?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check if the original RDD cache serving and deletion had a test case for that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that's covered here: https://github.com/apache/spark/blob/master/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java#L270

Also curious what the case is for no data file. Is that just when a shuffle map has no output rows?

Seq(
ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID),
ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
)
}

override def stop(): Unit = {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ trait ShuffleBlockResolver {
*/
def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): ManagedBuffer

/**
* Retrieve a list of BlockIds for a given shuffle map. Used to delete shuffle files
* from the external shuffle service after the associated executor has been removed.
*/
def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = {
Seq.empty
}
Comment on lines +48 to +50
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure whether I should make this backward compatible or not, so I added a default implementation. I can remove it if it doesn't need to be backward compatible (which it's marked private so probably doesn't need to be?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your assessment @Kimahriman and would prefer to avoid this methough, though @Ngone51 had expressed concern in the past that some of the private classes might be getting used for some custom implementations.
Thoughts @Ngone51 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be +1 for keeping backward compatible if you want to keep this method.


/**
* Retrieve the data for the specified merged shuffle block as multiple chunks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}

Expand All @@ -52,6 +53,7 @@ class BlockManagerMasterEndpoint(
externalBlockStoreClient: Option[ExternalBlockStoreClient],
blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],
mapOutputTracker: MapOutputTrackerMaster,
shuffleManager: ShuffleManager,
isDriver: Boolean)
extends IsolatedRpcEndpoint with Logging {

Expand Down Expand Up @@ -104,9 +106,11 @@ class BlockManagerMasterEndpoint(
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver)

logInfo("BlockManagerMasterEndpoint up")
// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)
// && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)`
private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined

private val externalShuffleServiceRemoveShuffleEnabled: Boolean =
externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED)
private val externalShuffleServiceRddFetchEnabled: Boolean =
externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf)

private lazy val driverEndpoint =
Expand Down Expand Up @@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
}
}.toSeq

val removeRddBlockViaExtShuffleServiceFutures = externalBlockStoreClient.map { shuffleClient =>
blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
Future[Int] {
val numRemovedBlocks = shuffleClient.removeBlocks(
bmId.host,
bmId.port,
bmId.executorId,
blockIds.map(_.toString).toArray)
numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS)
val removeRddBlockViaExtShuffleServiceFutures = if (externalShuffleServiceRddFetchEnabled) {
externalBlockStoreClient.map { shuffleClient =>
blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
Future[Int] {
val numRemovedBlocks = shuffleClient.removeBlocks(
bmId.host,
bmId.port,
bmId.executorId,
blockIds.map(_.toString).toArray)
numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS)
}
}
}
}.getOrElse(Seq.empty)
}.getOrElse(Seq.empty)
} else {
Seq.empty
}

Future.sequence(removeRddFromExecutorsFutures ++ removeRddBlockViaExtShuffleServiceFutures)
}

private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
// Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.storageEndpoint.ask[Boolean](removeMsg).recover {
// use false as default value means no shuffle data were removed
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
bm.storageEndpoint.ask[Boolean](removeMsg).recover {
// use false as default value means no shuffle data were removed
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
}
}.toSeq

// Find all shuffle blocks on executors that are no longer running
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we unify these two cases together? e.g.,

mapOutputTracker.shuffleStatuses.get(shuffleId).map { shuffleStatus =>
  val (mapStatusAtExecutor, mapStatusAtShuffleService) = {
    shuffleStatus.mapStatuses.partition { mapStatus =>
      blockManagerIdByExecutor.contains(mapStatus.location.executorId)
    } match {
      case (mapStatusAtExecutor, _) if !externalShuffleServiceRemoveShuffleEnabled =>
        (mapStatusAtExecutor, Array.empty)
      case (se, ss) =>
        (se, ss)
    }
  }
  mapStatusAtExecutor.map(_.location).toSet.map {
   ...
  } ++ mapStatusAtShuffleService.map {
   ...
  }
}

This way makes us only send shuffle remove request to valid executors (not all active executors) in the first case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but there's already a lot going on in this PR I'd prefer not to change (or accidentally break) the current shuffle deletion. And I'm not sure if all block managers still track shuffles for some reason even if they don't have any of the shuffle files?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine. We could do it in a follow-up PR.

And I'm not sure if all block managers still track shuffles for some reason even if they don't have any of the shuffle files?

If so, you should also send request to all the external shuffle service..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shuffle service doesn't tracking anything about shuffles, which is why we need to send it each individual block to delete instead of just a RemoveShuffle message

val blocksToDeleteByShuffleService =
new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
if (externalShuffleServiceRemoveShuffleEnabled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the existing externalBlockStoreClient.isDefined condition behavior independently from this new configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure what you mean. That variable is externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED)

mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus =>
shuffleStatus.withMapStatuses { mapStatuses =>
mapStatuses.foreach { mapStatus =>
// Check if the executor has been deallocated
if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
val blocksToDel =
shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId)
if (blocksToDel.nonEmpty) {
val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
new mutable.HashSet[BlockId])
blocks ++= blocksToDel
}
}
}
}
}.toSeq
)
}
}

val removeShuffleFromShuffleServicesFutures =
externalBlockStoreClient.map { shuffleClient =>
blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
Future[Boolean] {
val numRemovedBlocks = shuffleClient.removeBlocks(
bmId.host,
bmId.port,
bmId.executorId,
blockIds.map(_.toString).toArray)
numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS) == blockIds.size
}
}
}.getOrElse(Seq.empty)

Future.sequence(removeShuffleFromExecutorsFutures ++
removeShuffleFromShuffleServicesFutures)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import java.io.{File, IOException}
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermission
import java.util.UUID

import scala.collection.mutable.HashMap
Expand Down Expand Up @@ -77,6 +78,15 @@ private[spark] class DiskBlockManager(

private val shutdownHook = addShutdownHook()

// If either of these features are enabled, we must change permissions on block manager
// directories and files to accomodate the shuffle service deleting files in a secure environment.
// Parent directories are assumed to be restrictive to prevent unauthorized users from accessing
// or modifying world readable files.
private val permissionChangingRequired = conf.get(config.SHUFFLE_SERVICE_ENABLED) && (
conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) ||
conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
)

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath().
Expand All @@ -94,7 +104,16 @@ private[spark] class DiskBlockManager(
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists()) {
Files.createDirectory(newDir.toPath)
val path = newDir.toPath
Files.createDirectory(path)
if (permissionChangingRequired) {
// SPARK-37618: Create dir as group writable so files within can be deleted by the
// shuffle service in a secure setup. This will remove the setgid bit so files created
// within won't be created with the parent folder group.
val currentPerms = Files.getPosixFilePermissions(path)
currentPerms.add(PosixFilePermission.GROUP_WRITE)
Files.setPosixFilePermissions(path, currentPerms)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why we need this for shuffle files but not RDD files previously? Or I miss it somewhere? cc @attilapiros

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're talking cache files served from the shuffle service, I expect they have the same issue and would just fail to delete when uncached in a secure environment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I meant the cache file, basically, it's generated by RDD.

@attilapiros What do you think about the removal issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed that RDD removal via shuffle service does not work in a secure yarn setup (unrelated to this PR)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the confirmation.

I think we need a test for the RDD removal. cc @attilapiros

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has a test but I'm not sure how to actually test the permissions environment of a secure yarn setup without manually running a task with a different umask

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 If the permissions of dirs needed to be corrected for shuffle files I expect the same correction is needed for disk persisted RDDs too.
As exactly the same code is running when the directory is created for the disk persisted RDDs I suggest to extend the conditions of this if to cover that case too

}
}
subDirs(dirId)(subDirId) = newDir
newDir
Expand Down Expand Up @@ -166,6 +185,37 @@ private[spark] class DiskBlockManager(
}
}

/**
* SPARK-37618: Makes sure that the file is created as world readable. This is to get
* around the fact that making the block manager sub dirs group writable removes
* the setgid bit in secure Yarn environments, which prevents the shuffle service
* from being able to read shuffle files. The outer directories will still not be
* world executable, so this doesn't allow access to these files except for the
* running user and shuffle service.
*/
def createWorldReadableFile(file: File): Unit = {
val path = file.toPath
Files.createFile(path)
val currentPerms = Files.getPosixFilePermissions(path)
currentPerms.add(PosixFilePermission.OTHERS_READ)
Files.setPosixFilePermissions(path, currentPerms)
}

/**
* Creates a temporary version of the given file with world readable permissions (if required).
* Used to create block files that will be renamed to the final version of the file.
*/
def createTempFileWith(file: File): File = {
val tmpFile = Utils.tempFileWith(file)
if (permissionChangingRequired) {
// SPARK-37618: we need to make the file world readable because the parent will
// lose the setgid bit when making it group writable. Without this the shuffle
// service can't read the shuffle files in a secure setup.
createWorldReadableFile(tmpFile)
}
tmpFile
}

/** Produces a unique block id and File suitable for storing local intermediate results. */
def createTempLocalBlock(): (TempLocalBlockId, File) = {
var blockId = new TempLocalBlockId(UUID.randomUUID())
Expand All @@ -181,7 +231,14 @@ private[spark] class DiskBlockManager(
while (getFile(blockId).exists()) {
blockId = new TempShuffleBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))
val tmpFile = getFile(blockId)
if (permissionChangingRequired) {
// SPARK-37618: we need to make the file world readable because the parent will
// lose the setgid bit when making it group writable. Without this the shuffle
// service can't read the shuffle files in a secure setup.
createWorldReadableFile(tmpFile)
}
(blockId, tmpFile)
}

/**
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ private[spark] class DiskStore(
private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS)
private val blockSizes = new ConcurrentHashMap[BlockId, Long]()

private val shuffleServiceFetchRddEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) &&
conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)

def getSize(blockId: BlockId): Long = blockSizes.get(blockId)

/**
Expand All @@ -71,6 +74,13 @@ private[spark] class DiskStore(
logDebug(s"Attempting to put block $blockId")
val startTimeNs = System.nanoTime()
val file = diskManager.getFile(blockId)

// SPARK-37618: If fetching cached RDDs from the shuffle service is enabled, we must make
// the file world readable, as it will not be owned by the group running the shuffle service
// in a secure environment. This is due to changing directory permissions to allow deletion,
if (shuffleServiceFetchRddEnabled) {
diskManager.createWorldReadableFile(file)
}
val out = new CountingWritableChannel(openForWrite(file))
var threwException: Boolean = true
try {
Expand Down
Loading