@@ -29,6 +29,7 @@ import org.apache.spark.annotation.{Evolving, Since}
2929import org .apache .spark .internal .Logging
3030import org .apache .spark .internal .config ._
3131import org .apache .spark .internal .config .Python .PYSPARK_EXECUTOR_MEMORY
32+ import org .apache .spark .util .Utils
3233
3334/**
3435 * Resource profile to associate with an RDD. A ResourceProfile allows the user to
@@ -256,13 +257,16 @@ object ResourceProfile extends Logging {
256257 val UNKNOWN_RESOURCE_PROFILE_ID = - 1
257258 val DEFAULT_RESOURCE_PROFILE_ID = 0
258259
260+ private [spark] val MEMORY_OVERHEAD_MIN_MIB = 384L
261+
259262 private lazy val nextProfileId = new AtomicInteger (0 )
260263 private val DEFAULT_PROFILE_LOCK = new Object ()
261264
262265 // The default resource profile uses the application level configs.
263266 // var so that it can be reset for testing purposes.
264267 @ GuardedBy (" DEFAULT_PROFILE_LOCK" )
265268 private var defaultProfile : Option [ResourceProfile ] = None
269+ private var defaultProfileExecutorResources : Option [DefaultProfileExecutorResources ] = None
266270
267271 private [spark] def getNextProfileId : Int = nextProfileId.getAndIncrement()
268272
@@ -284,6 +288,14 @@ object ResourceProfile extends Logging {
284288 }
285289 }
286290
291+ private [spark] def getDefaultProfileExecutorResources (
292+ conf : SparkConf ): DefaultProfileExecutorResources = {
293+ defaultProfileExecutorResources.getOrElse {
294+ getOrCreateDefaultProfile(conf)
295+ defaultProfileExecutorResources.get
296+ }
297+ }
298+
287299 private def getDefaultTaskResources (conf : SparkConf ): Map [String , TaskResourceRequest ] = {
288300 val cpusPerTask = conf.get(CPUS_PER_TASK )
289301 val treqs = new TaskResourceRequests ().cpus(cpusPerTask)
@@ -293,20 +305,26 @@ object ResourceProfile extends Logging {
293305
294306 private def getDefaultExecutorResources (conf : SparkConf ): Map [String , ExecutorResourceRequest ] = {
295307 val ereqs = new ExecutorResourceRequests ()
296- ereqs.cores(conf.get(EXECUTOR_CORES ))
297- ereqs.memory(conf.get(EXECUTOR_MEMORY ).toString)
298- conf.get(EXECUTOR_MEMORY_OVERHEAD ).map(mem => ereqs.memoryOverhead(mem.toString))
299- conf.get(PYSPARK_EXECUTOR_MEMORY ).map(mem => ereqs.pysparkMemory(mem.toString))
300- if (conf.get(MEMORY_OFFHEAP_ENABLED )) {
301- // Explicitly add suffix b as default unit of offHeapMemory is Mib
302- ereqs.offHeapMemory(conf.get(MEMORY_OFFHEAP_SIZE ).toString + " b" )
303- }
308+ val cores = conf.get(EXECUTOR_CORES )
309+ ereqs.cores(cores)
310+ val memory = conf.get(EXECUTOR_MEMORY )
311+ ereqs.memory(memory.toString)
312+ val overheadMem = conf.get(EXECUTOR_MEMORY_OVERHEAD )
313+ overheadMem.map(mem => ereqs.memoryOverhead(mem.toString))
314+ val pysparkMem = conf.get(PYSPARK_EXECUTOR_MEMORY )
315+ pysparkMem.map(mem => ereqs.pysparkMemory(mem.toString))
316+ val offheapMem = Utils .executorOffHeapMemorySizeAsMb(conf)
317+ ereqs.offHeapMemory(offheapMem.toString)
304318 val execReq = ResourceUtils .parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX )
305319 execReq.foreach { req =>
306- val name = req.id.resourceName
307- ereqs.resource(name, req.amount, req.discoveryScript.orElse(" " ),
320+ ereqs.resource(req.id.resourceName, req.amount, req.discoveryScript.orElse(" " ),
308321 req.vendor.orElse(" " ))
309322 }
323+ val customResourceNames = execReq.map(_.id.resourceName).toSet
324+ val customResources = ereqs.requests.filter(v => customResourceNames.contains(v._1))
325+ defaultProfileExecutorResources =
326+ Some (DefaultProfileExecutorResources (cores, memory, offheapMem, pysparkMem,
327+ overheadMem, customResources))
310328 ereqs.requests
311329 }
312330
@@ -320,6 +338,7 @@ object ResourceProfile extends Logging {
320338 private [spark] def clearDefaultProfile (): Unit = {
321339 DEFAULT_PROFILE_LOCK .synchronized {
322340 defaultProfile = None
341+ defaultProfileExecutorResources = None
323342 }
324343 }
325344
@@ -342,6 +361,100 @@ object ResourceProfile extends Logging {
342361 rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK ))
343362 }
344363
364+ /**
365+ * Get offHeap memory size from [[ExecutorResourceRequest ]]
366+ * return 0 if MEMORY_OFFHEAP_ENABLED is false.
367+ */
368+ private [spark] def executorOffHeapMemorySizeAsMb (sparkConf : SparkConf ,
369+ execRequest : ExecutorResourceRequest ): Long = {
370+ Utils .checkOffHeapEnabled(sparkConf, execRequest.amount)
371+ }
372+
373+ private [spark] case class ExecutorResourcesOrDefaults (
374+ cores : Int ,
375+ executorMemoryMiB : Long ,
376+ memoryOffHeapMiB : Long ,
377+ pysparkMemoryMiB : Long ,
378+ memoryOverheadMiB : Long ,
379+ totalMemMiB : Long ,
380+ customResources : Map [String , ExecutorResourceRequest ])
381+
382+ private [spark] case class DefaultProfileExecutorResources (
383+ cores : Int ,
384+ executorMemoryMiB : Long ,
385+ memoryOffHeapMiB : Long ,
386+ pysparkMemoryMiB : Option [Long ],
387+ memoryOverheadMiB : Option [Long ],
388+ customResources : Map [String , ExecutorResourceRequest ])
389+
390+ private [spark] def calculateOverHeadMemory (
391+ overHeadMemFromConf : Option [Long ],
392+ executorMemoryMiB : Long ,
393+ overheadFactor : Double ): Long = {
394+ overHeadMemFromConf.getOrElse(math.max((overheadFactor * executorMemoryMiB).toInt,
395+ ResourceProfile .MEMORY_OVERHEAD_MIN_MIB ))
396+ }
397+
398+ /**
399+ * Gets the full list of resources to allow a cluster manager to request the appropriate
400+ * container. If the resource profile is not the default one we either get the resources
401+ * specified in the profile or fall back to the default profile resource size for everything
402+ * except for custom resources.
403+ */
404+ private [spark] def getResourcesForClusterManager (
405+ rpId : Int ,
406+ execResources : Map [String , ExecutorResourceRequest ],
407+ overheadFactor : Double ,
408+ conf : SparkConf ,
409+ isPythonApp : Boolean ,
410+ resourceMappings : Map [String , String ]): ExecutorResourcesOrDefaults = {
411+ val defaultResources = getDefaultProfileExecutorResources(conf)
412+ // set all the default values, which may change for custom ResourceProfiles
413+ var cores = defaultResources.cores
414+ var executorMemoryMiB = defaultResources.executorMemoryMiB
415+ var memoryOffHeapMiB = defaultResources.memoryOffHeapMiB
416+ var pysparkMemoryMiB = defaultResources.pysparkMemoryMiB.getOrElse(0L )
417+ var memoryOverheadMiB = calculateOverHeadMemory(defaultResources.memoryOverheadMiB,
418+ executorMemoryMiB, overheadFactor)
419+
420+ val finalCustomResources = if (rpId != DEFAULT_RESOURCE_PROFILE_ID ) {
421+ val customResources = new mutable.HashMap [String , ExecutorResourceRequest ]
422+ execResources.foreach { case (r, execReq) =>
423+ r match {
424+ case ResourceProfile .MEMORY =>
425+ executorMemoryMiB = execReq.amount
426+ case ResourceProfile .OVERHEAD_MEM =>
427+ memoryOverheadMiB = execReq.amount
428+ case ResourceProfile .PYSPARK_MEM =>
429+ pysparkMemoryMiB = execReq.amount
430+ case ResourceProfile .OFFHEAP_MEM =>
431+ memoryOffHeapMiB = executorOffHeapMemorySizeAsMb(conf, execReq)
432+ case ResourceProfile .CORES =>
433+ cores = execReq.amount.toInt
434+ case rName =>
435+ val nameToUse = resourceMappings.get(rName).getOrElse(rName)
436+ customResources(nameToUse) = execReq
437+ }
438+ }
439+ customResources.toMap
440+ } else {
441+ defaultResources.customResources.map { case (rName, execReq) =>
442+ val nameToUse = resourceMappings.get(rName).getOrElse(rName)
443+ (nameToUse, execReq)
444+ }
445+ }
446+ // only add in pyspark memory if actually a python application
447+ val pysparkMemToUseMiB = if (isPythonApp) {
448+ pysparkMemoryMiB
449+ } else {
450+ 0L
451+ }
452+ val totalMemMiB =
453+ (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB)
454+ ExecutorResourcesOrDefaults (cores, executorMemoryMiB, memoryOffHeapMiB,
455+ pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources)
456+ }
457+
345458 private [spark] val PYSPARK_MEMORY_LOCAL_PROPERTY = " resource.pyspark.memory"
346459 private [spark] val EXECUTOR_CORES_LOCAL_PROPERTY = " resource.executor.cores"
347460}
0 commit comments