@@ -533,6 +533,7 @@ private[master] class Master(
533533
534534 /**
535535 * Schedule executors to be launched on the workers.
536+ * Returns an array containing number of cores assigned to each worker.
536537 *
537538 * There are two modes of launching executors. The first attempts to spread out an application's
538539 * executors on as many workers as possible, while the second does the opposite (i.e. launch them
@@ -543,10 +544,18 @@ private[master] class Master(
543544 * multiple executors from the same application may be launched on the same worker if the worker
544545 * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
545546 * worker by default, in which case only one executor may be launched on each worker.
547+ *
548+ * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
549+ * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
550+ * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
551+ * allocated at a time, 12 cores from each worker would be assigned to each executor.
552+ * Since 12 < 16, no executors would launch [SPARK-8881].
546553 */
547-
548- private [master] def scheduleExecutorsOnWorkers (app : ApplicationInfo ,
549- usableWorkers : Array [WorkerInfo ], spreadOutApps : Boolean ): Array [Int ] = {
554+ private [master] def coresToAssign (
555+ app : ApplicationInfo ,
556+ usableWorkers : Array [WorkerInfo ],
557+ spreadOutApps : Boolean ): Array [Int ] = {
558+ // Default value for number of cores per executor is 1
550559 val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1 )
551560 val memoryPerExecutor = app.desc.memoryPerExecutorMB
552561 val numUsable = usableWorkers.length
@@ -588,24 +597,25 @@ private[master] class Master(
588597 // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
589598 // in the queue, then the second app, etc.
590599 for (app <- waitingApps if app.coresLeft > 0 ) {
600+ // Default value for number of cores per executor is 1
591601 val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1 )
592602 val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE )
593603 .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
594604 worker.coresFree >= coresPerExecutor)
595605 .sortBy(_.coresFree).reverse
596- val assignedCores = scheduleExecutorsOnWorkers (app, usableWorkers, spreadOutApps)
606+ val assignedCores = coresToAssign (app, usableWorkers, spreadOutApps)
597607
598608 // Now that we've decided how many cores to allocate on each worker, let's allocate them
599609 var pos = 0
600610 for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0 ) {
601- allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor,
602- usableWorkers(pos))
611+ allocateWorkerResourceToExecutors(
612+ app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
603613 }
604614 }
605615 }
606616
607617 /**
608- * Allocate a worker's resources to one or more executors
618+ * Allocate a worker's resources to one or more executors.
609619 * @param app the info of the application which the executors belong to
610620 * @param assignedCores number of cores on this worker for this application
611621 * @param coresPerExecutor number of cores per executor
@@ -616,8 +626,8 @@ private[master] class Master(
616626 assignedCores : Int ,
617627 coresPerExecutor : Int ,
618628 worker : WorkerInfo ): Unit = {
619-
620- var numExecutors = assignedCores/ coresPerExecutor
629+ // If cores per executor is specified, then this division should have a remainder of zero
630+ val numExecutors = assignedCores / coresPerExecutor
621631 for (i <- 1 to numExecutors) {
622632 val exec = app.addExecutor(worker, coresPerExecutor)
623633 launchExecutor(worker, exec)
0 commit comments