-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors #35085
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0a2d0d7
92af8ae
2a0dfac
1190470
d12b6b2
b764141
b837362
f6b7560
3689b9e
7be49a0
4e62191
940a934
e4a71af
2b4373f
9698b09
126955b
cc79384
02057b8
b55eb20
84f2929
61aa9f0
d34be20
3d2fd7b
ad0f9eb
c39cdf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner( | |
| try { | ||
| if (mapOutputTrackerMaster.containsShuffle(shuffleId)) { | ||
| logDebug("Cleaning shuffle " + shuffleId) | ||
| mapOutputTrackerMaster.unregisterShuffle(shuffleId) | ||
| // Shuffle must be removed before it's unregistered from the output tracker | ||
| // to find blocks served by the shuffle service on deallocated executors | ||
| shuffleDriverComponents.removeShuffle(shuffleId, blocking) | ||
| mapOutputTrackerMaster.unregisterShuffle(shuffleId) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this new pattern safe in all cases? Any change of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It hasn't failed any tests or caused problems in heavy usage for the past couple months. Not sure how else to know There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 3.2 with my own PRs backported (most of which are already merged and will be in 3.3). I do know I need this change for this PR to work and I know why. I don't know how to create a test case for a scenario I don't even know about. If you can think of one knowing this code paths better please let me know. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so I agree that this would be better the other way from a logical viewpoint, it makes more sense to unregister before you remove in case it tried to use those shuffle locations and mid removal some were actually there and some were not. But in the context here normally it would get cleaned if nothing is referencing it anymore, so nothing could try to use it so it shouldn't matter the order. Now there is also the RDD.cleanShuffleDependencies though as well. The problem here I think is he needs map output tracker to figure out what removeShuffle removes. I haven't looked in detail but it might be better if unregistershuffle could return something that could then be passed to remove Shuffle for instance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah anything trying to use either of these things after cleanup is called sounds like a bug that should be fixed. I'm not sure what the expected behavior of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'm not sure it really matters, other then possibly the way it fails, either way if someone tries to use the shuffle mid way through the cleanup, things are going to fail. I would think this is fine. |
||
| listeners.asScala.foreach(_.shuffleCleaned(shuffleId)) | ||
| logDebug("Cleaned shuffle " + shuffleId) | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -654,6 +654,16 @@ package object config { | |
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED = | ||
| ConfigBuilder("spark.shuffle.service.removeShuffle") | ||
| .doc("Whether to use the ExternalShuffleService for deleting shuffle blocks for " + | ||
| "deallocated executors when the shuffle is no longer needed. Without this enabled, " + | ||
| "shuffle data on executors that are deallocated will remain on disk until the " + | ||
| "application ends.") | ||
| .version("3.3.0") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to discuss about this, @mridulm . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had to disable the external shuffle service for basically all of our large shuffling jobs until I got this working, and based on others interest and all the jira tickets that have been made over the years I'd consider this a critically missing feature. I'm fine disabling by default, especially if we try to limit the permission changes anymore, the more likely this will cause issues using the external shuffle service There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there are concerns with stability, I am fine with pushing this to master only - in which case we can change all the versions to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have strong opinion. I'm fine with whatever others think. It might be best to at least turn it off by default if it goes into 3.3. |
||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val SHUFFLE_SERVICE_FETCH_RDD_ENABLED = | ||
| ConfigBuilder(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) | ||
| .doc("Whether to use the ExternalShuffleService for fetching disk persisted RDD blocks. " + | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -84,6 +84,11 @@ private[spark] class IndexShuffleBlockResolver( | |
| shuffleFiles.map(_.length()).sum | ||
| } | ||
|
|
||
| /** Create a temporary file that will be renamed to the final resulting file */ | ||
| def createTempFile(file: File): File = { | ||
| blockManager.diskBlockManager.createTempFileWith(file) | ||
| } | ||
|
|
||
| /** | ||
| * Get the shuffle data file. | ||
| * | ||
|
|
@@ -234,7 +239,7 @@ private[spark] class IndexShuffleBlockResolver( | |
| throw new IllegalStateException(s"Unexpected shuffle block transfer ${blockId} as " + | ||
| s"${blockId.getClass().getSimpleName()}") | ||
| } | ||
| val fileTmp = Utils.tempFileWith(file) | ||
| val fileTmp = createTempFile(file) | ||
| val channel = Channels.newChannel( | ||
| serializerManager.wrapStream(blockId, | ||
| new FileOutputStream(fileTmp))) | ||
|
|
@@ -335,15 +340,15 @@ private[spark] class IndexShuffleBlockResolver( | |
| checksums: Array[Long], | ||
| dataTmp: File): Unit = { | ||
| val indexFile = getIndexFile(shuffleId, mapId) | ||
| val indexTmp = Utils.tempFileWith(indexFile) | ||
| val indexTmp = createTempFile(indexFile) | ||
|
|
||
| val checksumEnabled = checksums.nonEmpty | ||
| val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) { | ||
| assert(lengths.length == checksums.length, | ||
| "The size of partition lengths and checksums should be equal") | ||
| val checksumFile = | ||
| getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)) | ||
| (Some(checksumFile), Some(Utils.tempFileWith(checksumFile))) | ||
| (Some(checksumFile), Some(createTempFile(checksumFile))) | ||
| } else { | ||
| (None, None) | ||
| } | ||
|
|
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver( | |
| } | ||
| } | ||
|
|
||
| override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the real data, there is a case where a shuffle index without its data file. So, did we consider those cases in the upper layers where this method is invoked? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case the external shuffle service will just log unable to delete the file and continue. Is there anywhere to actually know that information? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Historically, WorkerDecommission fails with FileNotFound exception because it didn't know about the reality. So, I want to make it sure that the upper layer of this method is safe or not. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have a test coverage for that case about the part what you mentioned ?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll check if the original RDD cache serving and deletion had a test case for that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that's covered here: https://github.com/apache/spark/blob/master/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java#L270 Also curious what the case is for no data file. Is that just when a shuffle map has no output rows? |
||
| Seq( | ||
| ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), | ||
| ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) | ||
| ) | ||
| } | ||
|
|
||
| override def stop(): Unit = {} | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,14 @@ trait ShuffleBlockResolver { | |
| */ | ||
| def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): ManagedBuffer | ||
|
|
||
| /** | ||
| * Retrieve a list of BlockIds for a given shuffle map. Used to delete shuffle files | ||
| * from the external shuffle service after the associated executor has been removed. | ||
| */ | ||
| def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { | ||
| Seq.empty | ||
| } | ||
|
||
|
|
||
| /** | ||
| * Retrieve the data for the specified merged shuffle block as multiple chunks. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ import org.apache.spark.network.shuffle.ExternalBlockStoreClient | |
| import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} | ||
| import org.apache.spark.scheduler._ | ||
| import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend} | ||
| import org.apache.spark.shuffle.ShuffleManager | ||
| import org.apache.spark.storage.BlockManagerMessages._ | ||
| import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} | ||
|
|
||
|
|
@@ -52,6 +53,7 @@ class BlockManagerMasterEndpoint( | |
| externalBlockStoreClient: Option[ExternalBlockStoreClient], | ||
mridulm marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo], | ||
| mapOutputTracker: MapOutputTrackerMaster, | ||
| shuffleManager: ShuffleManager, | ||
| isDriver: Boolean) | ||
| extends IsolatedRpcEndpoint with Logging { | ||
|
|
||
|
|
@@ -104,9 +106,11 @@ class BlockManagerMasterEndpoint( | |
| private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver) | ||
|
|
||
| logInfo("BlockManagerMasterEndpoint up") | ||
| // same as `conf.get(config.SHUFFLE_SERVICE_ENABLED) | ||
| // && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)` | ||
| private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined | ||
|
|
||
| private val externalShuffleServiceRemoveShuffleEnabled: Boolean = | ||
| externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) | ||
| private val externalShuffleServiceRddFetchEnabled: Boolean = | ||
| externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) | ||
| private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf) | ||
|
|
||
| private lazy val driverEndpoint = | ||
|
|
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint( | |
| } | ||
| }.toSeq | ||
|
|
||
| val removeRddBlockViaExtShuffleServiceFutures = externalBlockStoreClient.map { shuffleClient => | ||
| blocksToDeleteByShuffleService.map { case (bmId, blockIds) => | ||
| Future[Int] { | ||
| val numRemovedBlocks = shuffleClient.removeBlocks( | ||
| bmId.host, | ||
| bmId.port, | ||
| bmId.executorId, | ||
| blockIds.map(_.toString).toArray) | ||
| numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS) | ||
| val removeRddBlockViaExtShuffleServiceFutures = if (externalShuffleServiceRddFetchEnabled) { | ||
| externalBlockStoreClient.map { shuffleClient => | ||
| blocksToDeleteByShuffleService.map { case (bmId, blockIds) => | ||
| Future[Int] { | ||
| val numRemovedBlocks = shuffleClient.removeBlocks( | ||
| bmId.host, | ||
| bmId.port, | ||
| bmId.executorId, | ||
| blockIds.map(_.toString).toArray) | ||
| numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS) | ||
| } | ||
| } | ||
| } | ||
| }.getOrElse(Seq.empty) | ||
| }.getOrElse(Seq.empty) | ||
| } else { | ||
| Seq.empty | ||
| } | ||
|
|
||
| Future.sequence(removeRddFromExecutorsFutures ++ removeRddBlockViaExtShuffleServiceFutures) | ||
| } | ||
|
|
||
| private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { | ||
| // Nothing to do in the BlockManagerMasterEndpoint data structures | ||
| val removeMsg = RemoveShuffle(shuffleId) | ||
| Future.sequence( | ||
| blockManagerInfo.values.map { bm => | ||
| bm.storageEndpoint.ask[Boolean](removeMsg).recover { | ||
| // use false as default value means no shuffle data were removed | ||
| handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) | ||
| val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => | ||
| bm.storageEndpoint.ask[Boolean](removeMsg).recover { | ||
| // use false as default value means no shuffle data were removed | ||
| handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) | ||
| } | ||
| }.toSeq | ||
|
|
||
| // Find all shuffle blocks on executors that are no longer running | ||
tgravescs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we unify these two cases together? e.g., mapOutputTracker.shuffleStatuses.get(shuffleId).map { shuffleStatus =>
val (mapStatusAtExecutor, mapStatusAtShuffleService) = {
shuffleStatus.mapStatuses.partition { mapStatus =>
blockManagerIdByExecutor.contains(mapStatus.location.executorId)
} match {
case (mapStatusAtExecutor, _) if !externalShuffleServiceRemoveShuffleEnabled =>
(mapStatusAtExecutor, Array.empty)
case (se, ss) =>
(se, ss)
}
}
mapStatusAtExecutor.map(_.location).toSet.map {
...
} ++ mapStatusAtShuffleService.map {
...
}
}This way makes us only send shuffle remove request to valid executors (not all active executors) in the first case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could, but there's already a lot going on in this PR I'd prefer not to change (or accidentally break) the current shuffle deletion. And I'm not sure if all block managers still track shuffles for some reason even if they don't have any of the shuffle files? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fine. We could do it in a follow-up PR.
If so, you should also send request to all the external shuffle service.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The shuffle service doesn't tracking anything about shuffles, which is why we need to send it each individual block to delete instead of just a |
||
| val blocksToDeleteByShuffleService = | ||
| new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] | ||
| if (externalShuffleServiceRemoveShuffleEnabled) { | ||
|
||
| mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => | ||
| shuffleStatus.withMapStatuses { mapStatuses => | ||
| mapStatuses.foreach { mapStatus => | ||
| // Check if the executor has been deallocated | ||
| if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { | ||
| val blocksToDel = | ||
| shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) | ||
| if (blocksToDel.nonEmpty) { | ||
| val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, | ||
| new mutable.HashSet[BlockId]) | ||
| blocks ++= blocksToDel | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }.toSeq | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| val removeShuffleFromShuffleServicesFutures = | ||
| externalBlockStoreClient.map { shuffleClient => | ||
| blocksToDeleteByShuffleService.map { case (bmId, blockIds) => | ||
Kimahriman marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Future[Boolean] { | ||
| val numRemovedBlocks = shuffleClient.removeBlocks( | ||
| bmId.host, | ||
| bmId.port, | ||
| bmId.executorId, | ||
| blockIds.map(_.toString).toArray) | ||
| numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, | ||
| TimeUnit.SECONDS) == blockIds.size | ||
| } | ||
| } | ||
| }.getOrElse(Seq.empty) | ||
|
|
||
| Future.sequence(removeShuffleFromExecutorsFutures ++ | ||
| removeShuffleFromShuffleServicesFutures) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package org.apache.spark.storage | |
|
|
||
| import java.io.{File, IOException} | ||
| import java.nio.file.Files | ||
| import java.nio.file.attribute.PosixFilePermission | ||
| import java.util.UUID | ||
|
|
||
| import scala.collection.mutable.HashMap | ||
|
|
@@ -77,6 +78,15 @@ private[spark] class DiskBlockManager( | |
|
|
||
| private val shutdownHook = addShutdownHook() | ||
|
|
||
| // If either of these features are enabled, we must change permissions on block manager | ||
| // directories and files to accomodate the shuffle service deleting files in a secure environment. | ||
| // Parent directories are assumed to be restrictive to prevent unauthorized users from accessing | ||
| // or modifying world readable files. | ||
| private val permissionChangingRequired = conf.get(config.SHUFFLE_SERVICE_ENABLED) && ( | ||
| conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) || | ||
| conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) | ||
| ) | ||
|
|
||
| /** Looks up a file by hashing it into one of our local subdirectories. */ | ||
| // This method should be kept in sync with | ||
| // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath(). | ||
|
|
@@ -94,7 +104,16 @@ private[spark] class DiskBlockManager( | |
| } else { | ||
| val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) | ||
| if (!newDir.exists()) { | ||
| Files.createDirectory(newDir.toPath) | ||
| val path = newDir.toPath | ||
| Files.createDirectory(path) | ||
| if (permissionChangingRequired) { | ||
| // SPARK-37618: Create dir as group writable so files within can be deleted by the | ||
| // shuffle service in a secure setup. This will remove the setgid bit so files created | ||
| // within won't be created with the parent folder group. | ||
| val currentPerms = Files.getPosixFilePermissions(path) | ||
| currentPerms.add(PosixFilePermission.GROUP_WRITE) | ||
| Files.setPosixFilePermissions(path, currentPerms) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder why we need this for shuffle files but not RDD files previously? Or I miss it somewhere? cc @attilapiros There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you're talking cache files served from the shuffle service, I expect they have the same issue and would just fail to delete when uncached in a secure environment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I meant the cache file, basically, it's generated by RDD. @attilapiros What do you think about the removal issue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirmed that RDD removal via shuffle service does not work in a secure yarn setup (unrelated to this PR) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the confirmation. I think we need a test for the RDD removal. cc @attilapiros There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It has a test but I'm not sure how to actually test the permissions environment of a secure yarn setup without manually running a task with a different umask There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Ngone51 If the permissions of dirs needed to be corrected for shuffle files I expect the same correction is needed for disk persisted RDDs too. |
||
| } | ||
| } | ||
| subDirs(dirId)(subDirId) = newDir | ||
| newDir | ||
|
|
@@ -166,6 +185,37 @@ private[spark] class DiskBlockManager( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * SPARK-37618: Makes sure that the file is created as world readable. This is to get | ||
| * around the fact that making the block manager sub dirs group writable removes | ||
| * the setgid bit in secure Yarn environments, which prevents the shuffle service | ||
| * from being able to read shuffle files. The outer directories will still not be | ||
| * world executable, so this doesn't allow access to these files except for the | ||
| * running user and shuffle service. | ||
| */ | ||
| def createWorldReadableFile(file: File): Unit = { | ||
Kimahriman marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val path = file.toPath | ||
| Files.createFile(path) | ||
| val currentPerms = Files.getPosixFilePermissions(path) | ||
| currentPerms.add(PosixFilePermission.OTHERS_READ) | ||
tgravescs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Files.setPosixFilePermissions(path, currentPerms) | ||
| } | ||
|
|
||
| /** | ||
| * Creates a temporary version of the given file with world readable permissions (if required). | ||
| * Used to create block files that will be renamed to the final version of the file. | ||
| */ | ||
| def createTempFileWith(file: File): File = { | ||
| val tmpFile = Utils.tempFileWith(file) | ||
| if (permissionChangingRequired) { | ||
| // SPARK-37618: we need to make the file world readable because the parent will | ||
| // lose the setgid bit when making it group writable. Without this the shuffle | ||
| // service can't read the shuffle files in a secure setup. | ||
| createWorldReadableFile(tmpFile) | ||
| } | ||
| tmpFile | ||
| } | ||
|
|
||
| /** Produces a unique block id and File suitable for storing local intermediate results. */ | ||
| def createTempLocalBlock(): (TempLocalBlockId, File) = { | ||
| var blockId = new TempLocalBlockId(UUID.randomUUID()) | ||
|
|
@@ -181,7 +231,14 @@ private[spark] class DiskBlockManager( | |
| while (getFile(blockId).exists()) { | ||
| blockId = new TempShuffleBlockId(UUID.randomUUID()) | ||
| } | ||
| (blockId, getFile(blockId)) | ||
| val tmpFile = getFile(blockId) | ||
| if (permissionChangingRequired) { | ||
| // SPARK-37618: we need to make the file world readable because the parent will | ||
| // lose the setgid bit when making it group writable. Without this the shuffle | ||
| // service can't read the shuffle files in a secure setup. | ||
| createWorldReadableFile(tmpFile) | ||
| } | ||
| (blockId, tmpFile) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.